-
Notifications
You must be signed in to change notification settings - Fork 2
/
worker.py
60 lines (40 loc) · 1.87 KB
/
worker.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
import datetime
import json
from prediction import predict
import psycopg2
from redis import Redis
from rows import DumpRow, PredictionRow
import time
def predict_thread(data, kind, cursor, connection):
prediction_table, rows_table = data["prediction_table"], data["rows_table"]
station_id = data["station_id"]
dump_rows = DumpRow.fetch_all(cursor, rows_table, station_id)
prediction_rows = PredictionRow.fetch_all(cursor, rows_table, data["ids"])
predictions = predict(data, dump_rows, prediction_rows, kind)
results = []
for prediction in predictions:
timestamp = datetime.datetime.utcfromtimestamp(prediction["timestamp"])
timestamp = datetime.datetime(timestamp.year, timestamp.month, timestamp.day, timestamp.hour)
now = datetime.datetime.utcnow()
result = "(%d, TIMESTAMP '%s', %d, '%s', TIMESTAMP '%s', TIMESTAMP '%s')" % (station_id, timestamp, prediction["value"], kind, now, now)
results.append(result)
cursor.execute("INSERT INTO %s (station_id, datetime, available_bikes, kind, created_at, updated_at) VALUES %s" % (prediction_table, ", ".join(results)))
connection.commit()
if __name__ == '__main__':
print "Connecting to PostgreSQL..."
connection = psycopg2.connect("dbname=velib_development user=velib password=velib")
cursor = connection.cursor()
print "Connecting to Redis..."
redis = Redis(host="localhost", port=6379, db=0)
pubsub = redis.pubsub()
pubsub.subscribe("prediction")
print "Subscribed to 'prediction' channel"
for item in pubsub.listen():
if item["type"] != "message":
continue
data = json.loads(item["data"])
print "Received data for station id: %s..." % str(data["station_id"])
for kind in ("scikit_lasso", "scikit_ridge"):
predict_thread(data, kind, cursor, connection)
cursor.close()
connection.close()