/
FeedsFTProcessor.inc
145 lines (133 loc) · 4.08 KB
/
FeedsFTProcessor.inc
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
<?php
// Create FEEDS_FT_BATCH_SIZE at a time.
define('FEEDS_FT_BATCH_SIZE', 100);
/**
* Upload to Google Fusion Tables.
*/
class FeedsFTProcessor extends FeedsProcessor {
/**
* Implementation of FeedsProcessor::process().
*/
public function process(FeedsImportBatch $batch, FeedsSource $source) {
// @todo Use feed node's owner?
global $user;
$fusion = gdata_fusion_user_get_client($user);
$source_config = $source->getConfigFor($this);
// Allocate a Fusion table if it does not exist yet.
// @todo: modify table if mapping has changed in meantime.
$ft_id = $source_config['fusion_table_id'];
$schema = $this->getFTSchema();
if (empty($ft_id)) {
if ($ft_id = $fusion->createTable('feeds_'. $this->id .'_'. $source->feed_nid, $schema)) {
$source_config['fusion_table_id'] = $ft_id;
$source->addConfig(array(get_class($this) => $source_config));
}
else {
throw new Exception(t('Could not create Fusion Table.'));
}
}
// Iterate through all items and store to FT.
$created = 0;
$data = array();
$complete = FEEDS_BATCH_COMPLETE;
while ($item = $batch->shiftItem()) {
$data[] = $this->map($item);
$created++;
$batch->created++;
if ($created > variable_get('feeds_ft_batch_size', FEEDS_FT_BATCH_SIZE)) {
$complete = (1.0 / ($batch->total + 1)) * $batch->created; // @todo: move this into $batch->progress();
break;
}
}
if (!empty($data)) {
$rows = $fusion->insertData($ft_id, array_keys($schema), $data);
if (count($rows) != count($data)) {
drupal_set_message(t('Error transferring data to Fusion tables.'), 'error');
return FEEDS_BATCH_COMPLETE;
}
}
if ($complete == FEEDS_BATCH_COMPLETE) {
// Set messages.
if ($batch->created) {
drupal_set_message(t('Created !created items in Google Fusion Tables', array('!created' => $batch->created)));
}
else {
drupal_set_message(t('There was no data to be transferred to Google Fusion Tables.'));
}
}
return $complete;
}
/**
* Implementation of FeedsProcessor::clear().
*/
public function clear(FeedsBatch $batch, FeedsSource $source) {
throw new Exception(t('Not supported'));
}
/**
* Declare source defaults.
*
* Declare a fusion table ID to be stored for each source.
*/
public function sourceDefaults() {
return array(
'fusion_table_id' => NULL,
);
}
/**
* Declare that we have source configuration.
*
* @todo FeedsPlugin::hasSourceConfig() should use sourceDefaults() for
* determining whether a plugin has source configuration or not.
*/
public function hasSourceConfig() {
return TRUE;
}
/**
* Override parent::map().
*/
protected function map($source_item, $target_item = NULL) {
$parser = feeds_importer($this->id)->parser;
if (empty($target_item)) {
$target_item = array();
}
foreach ($this->config['mappings'] as $mapping) {
$value = $parser->getSourceElement($source_item, $mapping['source']);
$this->setTargetElement($target_item, $mapping['target'], $value);
}
return $target_item;
}
/**
* Handle adding a mapping.
*
* @todo: sanitize $source names as not all characters are allowed in FT
* tables. Use "'". $row[0] ."'", move it to getFTSchema().
*/
public function addMapping($source, $target, $unique = FALSE) {
$target = preg_replace('/\s+/', '_', $source);
parent::addMapping($source, $target, $unique);
}
/**
* Override parent::getMappingTargets().
*
* @todo: implement types STRING, NUMBER, LOCATION, DATETIME
*/
public function getMappingTargets() {
return array(
'new' => array(
'name' => t('New'),
'description' => t('Add a new mapping target.'),
),
);
}
/**
* Build a Fusion Table schema from mappings.
*/
public function getFTSchema() {
$schema = array();
$mappings = $this->getMappings();
foreach ($mappings as $mapping) {
$schema[$mapping['target']] = 'STRING';
}
return $schema;
}
}