-
Notifications
You must be signed in to change notification settings - Fork 0
/
load2.py
75 lines (61 loc) · 2 KB
/
load2.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
# Import Libraries
from datetime import datetime
import pyelasticsearch
es = pyelasticsearch.ElasticSearch('http://localhost:9200/')
import fileinput
## set to true to test parsing only
debugTrip = False
def indexBulk(bulkObjs):
if(not debugTrip):
es.bulk((es.index_op(doc) for doc in bulkObjs), index='cabi', doc_type='trips')
def checkDateFormat( dateStr, format ):
try:
datetime.strptime( dateStr, format )
return True
except ValueError:
return False
def convertToISO8601( dateStr ):
format1 = "%m/%d/%Y %H:%M"
format2 = "%Y-%m-%d %H:%M"
if(checkDateFormat(dateStr, format1)):
return datetime.strptime(dateStr, format1).isoformat()
if(checkDateFormat(dateStr, format2)):
return datetime.strptime(dateStr, format2).isoformat()
raise Exception( 'dateString was not of known format: ' + dateStr)
def constructObj( line, index ):
parts = line.split(',')
if(debugTrip):
print index, str(parts)
timeparts = parts[0].translate( None, ''.join([ 'min.', 'sec.', 'h','m','s'])).split(' ')
duration = int(timeparts[0]) * 3600 + int(timeparts[1]) * 60 + int(timeparts[2])
obj = {
'duration': duration,
'startDate': convertToISO8601(parts[1]) + ' -0400',
'startStation': parts[2],
'endDate': convertToISO8601(parts[3]) + ' -0400',
'endStation': parts[4],
'bikeNum': parts[5],
'subType': parts[6]
}
if(debugTrip):
print obj
return obj
counter = 0
bulkCounter = 0
bulkObjs = []
for line in fileinput.input():
if counter > 0:
bulkObjs.append( constructObj(line.strip(), counter) )
bulkCounter = bulkCounter + 1
if(bulkCounter >= 500):
indexBulk(bulkObjs)
bulkObjs = []
bulkCounter = 0
if debugTrip and counter > 5:
break
counter = counter + 1
if counter % 1000 == 0:
print "On line: ", counter
if(bulkCounter > 0):
indexBulk(bulkObjs)
bulkCounter = 0