/
pump_csv.py
executable file
·133 lines (109 loc) · 4.36 KB
/
pump_csv.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
#!/usr/bin/env python
import csv
import os
import simplejson as json
import sys
import memcacheConstants
import pump
class CSVSource(pump.Source):
"""Reads csv file, where first line is field names and one field
should be 'id'."""
def __init__(self, opts, spec, source_bucket, source_node,
source_map, sink_map, ctl, cur):
super(CSVSource, self).__init__(opts, spec, source_bucket, source_node,
source_map, sink_map, ctl, cur)
self.done = False
self.r = None # An iterator of csv.reader()
@staticmethod
def can_handle(opts, spec):
return spec.endswith(".csv") and os.path.isfile(spec)
@staticmethod
def check(opts, spec):
return 0, {'spec': spec,
'buckets': [{'name': os.path.basename(spec),
'nodes': [{'hostname': 'N/A'}]}]}
@staticmethod
def provide_design(opts, source_spec, source_bucket, source_map):
return 0, None
def provide_batch(self):
if self.done:
return 0, None
if not self.r:
try:
self.r = csv.reader(open(self.spec, 'rU'))
self.fields = self.r.next()
if not 'id' in self.fields:
return ("error: no 'id' field in 1st line of csv: %s" %
(self.spec)), None
except StopIteration:
return ("error: could not read 1st line of csv: %s" %
(self.spec)), None
except IOError as e:
return ("error: could not open csv: %s; exception: %s" %
(self.spec, e)), None
batch = pump.Batch(self)
batch_max_size = self.opts.extra['batch_max_size']
batch_max_bytes = self.opts.extra['batch_max_bytes']
cmd = memcacheConstants.CMD_TAP_MUTATION
vbucket_id = 0x0000ffff
cas, exp, flg = 0, 0, 0
while (self.r and
batch.size() < batch_max_size and
batch.bytes < batch_max_bytes):
try:
vals = self.r.next()
doc = {}
for i, field in enumerate(self.fields):
doc[field] = vals[i]
doc_json = json.dumps(doc)
msg = (cmd, vbucket_id, doc['id'], flg, exp, cas, '', doc_json)
batch.append(msg, len(doc))
except StopIteration:
self.done = True
self.r = None
if batch.size() <= 0:
return 0, None
return 0, batch
class CSVSink(pump.Sink):
"""Emits batches to stdout in CSV format."""
def __init__(self, opts, spec, source_bucket, source_node,
source_map, sink_map, ctl, cur):
super(CSVSink, self).__init__(opts, spec, source_bucket, source_node,
source_map, sink_map, ctl, cur)
self.writer = None
@staticmethod
def can_handle(opts, spec):
if spec == "csv:":
opts.threads = 1 # Force 1 thread to not overlap stdout.
return True
return False
@staticmethod
def check(opts, spec, source_map):
return 0, None
@staticmethod
def consume_design(opts, sink_spec, sink_map,
source_bucket, source_map, source_design):
if source_design:
logging.warn("warning: cannot save bucket design"
" on a CSV destination")
return 0
def consume_batch_async(self, batch):
if not self.writer:
self.writer = csv.writer(sys.stdout)
self.writer.writerow(['id', 'flags', 'expiration', 'cas', 'value'])
for msg in batch.msgs:
cmd, vbucket_id, key, flg, exp, cas, meta, val = msg
if self.skip(key, vbucket_id):
continue
try:
if cmd == memcacheConstants.CMD_TAP_MUTATION:
self.writer.writerow([key, flg, exp, cas, val])
elif cmd == memcacheConstants.CMD_TAP_DELETE:
pass
else:
return "error: CSVSink - unknown cmd: " + str(cmd), None
except IOError:
return "error: could not write csv to stdout", None
future = pump.SinkBatchFuture(self, batch)
self.future_done(future, 0)
return 0, future