-
Notifications
You must be signed in to change notification settings - Fork 2
/
receiver.py
44 lines (39 loc) · 1.48 KB
/
receiver.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# @Time : 2018/7/11 21:13
# @Author : MengnanChen
# @Site :
# @File : receiver.py
# @Software: PyCharm Community Edition
'''
receive data coming from kafka (producer.py) and insert data into mongodb
'''
# debug spark—submit by pycharm (Chinese): https://blog.csdn.net/zj1244/article/details/78893837
# (English): https://stackoverflow.com/questions/35560767/pyspark-streaming-with-kafka-in-pycharm
from pyspark import SparkContext
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils
import json
import global_vals
from mongo_utils import mongo_utils
def insert_row(x):
if x is None or len(x)<1:
return
data_list=x.split(',')
mongo_utils.insert_data({
'timestamp': data_list[0],
'uid': data_list[1],
'heart_rate': data_list[2],
'steps': data_list[3]
})
sc=SparkContext(master='local[*]',appName='test')
ssc=StreamingContext(sc,batchDuration=global_vals.data_produce_duration)
brokers='localhost:9092'
topic=global_vals.kafka_topic
kvs=KafkaUtils.createDirectStream(ssc,[topic],kafkaParams={"metadata.broker.list":brokers})
kvs.pprint()
lines=kvs.map(lambda x:'{},{},{},{}'.format(json.loads(x[1])['timestamp'],json.loads(x[1])['uid'],
json.loads(x[1])['heart_rate'],json.loads(x[1])['steps']))
lines.foreachRDD(lambda rdd:rdd.foreach(insert_row))
ssc.start()
ssc.awaitTermination()