In [None]:
from kafka import KafkaConsumer
import sys, json, pymongo
import datetime


if __name__ == "__main__":

    # 與 MongoDB連線
    client = pymongo.MongoClient(host="mongodb", port=27017)
    # 指定為 test 資料庫
    db = client.hugo
    # 指定 temp_humidity 集合, MongoDB的每個資料庫又包含許多集合(collection), 類似於關聯性資料庫中的表
    collection = db.pm25_2020

    # 設定要連線到Kafka集群的相關設定, 產生一個Kafka的Consumer的實例
    consumer = KafkaConsumer(
        # 指定Kafka集群伺服器
        bootstrap_servers=["kafka:9092"],
        # ConsumerGroup的名稱, 可以不指定
        #group_id="cg_001",
        # 指定msgKey的反序列化器, 若Key為None, 無法反序列化
        # key_deserializer=bytes.decode,
        # 指定msgValue的反序列化器
        #value_deserializer=bytes.decode,
        value_deserializer=lambda m: json.loads(m.decode('ascii')),
        # 是否從這個ConsumerGroup尚未讀取的partition / offset開始讀
        auto_offset_reset="earliest",
    )
   
    # 讓Consumer向Kafka集群訂閱指定的topic
    consumer.subscribe(topics="pm25")
    
    # 持續的拉取Kafka有進來的訊息
    try:
        print("Now listening for incoming messages ...")
        # 持續監控是否有新的record進來
        for record in consumer:
            topic = record.topic
            partition = record.partition
            offset = record.offset
            timestamp = record.timestamp
            # 取出msgKey與msgValue
            msgKey = record.key
            msgValue = record.value
            msgValue["timestamp"] = datetime.datetime.strptime(msgValue["timestamp"], '%Y-%m-%d %H:%M:%S') + datetime.timedelta(hours = 8) 
            
            # 秀出metadata與msgKey & msgValue訊息
            print("topic=%s, partition=%s, offset=%s : (key=%s, value=%s)" % (record.topic, record.partition,record.offset, record.key, record.value))
            j = {"timestamp": msgValue["timestamp"].strftime("%Y/%m/%d %H:%M:%S"), "Temperature": msgValue["Temperature"], "Humidity": msgValue["Humidity"], "PM25": msgValue["PM2.5"], "PM25predict": msgValue["PM25predict"] }
            print("json=",  j)
            #print(record.value)
            # 將資料存入 mongodb
            # 存入單筆
            result = collection.insert_one(j)
            # 存入多筆
            #result = collection.insert_many()
            print(result)

    except:
        # 錯誤處理
        e_type, e_value, e_traceback = sys.exc_info()
        print("type ==> %s" % (e_type))
        print("value ==> %s" % (e_value))
        print("traceback ==> file name: %s" % (e_traceback.tb_frame.f_code.co_filename))
        print("traceback ==> line no: %s" % (e_traceback.tb_lineno))
        print("traceback ==> function name: %s" % (e_traceback.tb_frame.f_code.co_name))
    finally:
        consumer.close()

Now listening for incoming messages ...
topic=pm25, partition=0, offset=0 : (key=None, value={'timestamp': datetime.datetime(2019, 12, 30, 15, 31, 25), 'Temperature': '19.8', 'Humidity': '76.9', 'PM2.5': '21.0', 'PM25predict': '16.23'})
json= {'timestamp': '2019/12/30 15:31:25', 'Temperature': '19.8', 'Humidity': '76.9', 'PM25': '21.0', 'PM25predict': '16.23'}
<pymongo.results.InsertOneResult object at 0x7f770d789748>
topic=pm25, partition=0, offset=1 : (key=None, value={'timestamp': datetime.datetime(2019, 12, 30, 16, 0, 3), 'Temperature': '19.2', 'Humidity': '81.6', 'PM2.5': '12.0', 'PM25predict': '15.75'})
json= {'timestamp': '2019/12/30 16:00:03', 'Temperature': '19.2', 'Humidity': '81.6', 'PM25': '12.0', 'PM25predict': '15.75'}
<pymongo.results.InsertOneResult object at 0x7f770d789788>
topic=pm25, partition=0, offset=2 : (key=None, value={'timestamp': datetime.datetime(2019, 12, 30, 17, 0, 4), 'Temperature': '19.5', 'Humidity': '79.4', 'PM2.5': '11.0', 'PM25predict': '15.98'})
jso

topic=pm25, partition=0, offset=22 : (key=None, value={'timestamp': datetime.datetime(2019, 12, 31, 14, 0, 3), 'Temperature': '15.9', 'Humidity': '69.2', 'PM2.5': '35.0', 'PM25predict': '15.42'})
json= {'timestamp': '2019/12/31 14:00:03', 'Temperature': '15.9', 'Humidity': '69.2', 'PM25': '35.0', 'PM25predict': '15.42'}
<pymongo.results.InsertOneResult object at 0x7f770d7a6d48>
topic=pm25, partition=0, offset=23 : (key=None, value={'timestamp': datetime.datetime(2019, 12, 31, 15, 0, 3), 'Temperature': '15.7', 'Humidity': '68.8', 'PM2.5': '24.0', 'PM25predict': '15.38'})
json= {'timestamp': '2019/12/31 15:00:03', 'Temperature': '15.7', 'Humidity': '68.8', 'PM25': '24.0', 'PM25predict': '15.38'}
<pymongo.results.InsertOneResult object at 0x7f770d7897c8>
topic=pm25, partition=0, offset=24 : (key=None, value={'timestamp': datetime.datetime(2019, 12, 31, 16, 0, 2), 'Temperature': '15.3', 'Humidity': '71.4', 'PM2.5': '32.0', 'PM25predict': '15.09'})
json= {'timestamp': '2019/12/31 16:00:02',

topic=pm25, partition=0, offset=44 : (key=None, value={'timestamp': datetime.datetime(2020, 1, 1, 12, 0, 3), 'Temperature': '17.4', 'Humidity': '67.1', 'PM2.5': '7.0', 'PM25predict': '16.04'})
json= {'timestamp': '2020/01/01 12:00:03', 'Temperature': '17.4', 'Humidity': '67.1', 'PM25': '7.0', 'PM25predict': '16.04'}
<pymongo.results.InsertOneResult object at 0x7f770d7a6d48>
topic=pm25, partition=0, offset=45 : (key=None, value={'timestamp': datetime.datetime(2020, 1, 1, 13, 0, 3), 'Temperature': '17.1', 'Humidity': '67.9', 'PM2.5': '8.0', 'PM25predict': '15.89'})
json= {'timestamp': '2020/01/01 13:00:03', 'Temperature': '17.1', 'Humidity': '67.9', 'PM25': '8.0', 'PM25predict': '15.89'}
<pymongo.results.InsertOneResult object at 0x7f771f706848>
topic=pm25, partition=0, offset=46 : (key=None, value={'timestamp': datetime.datetime(2020, 1, 1, 14, 0, 3), 'Temperature': '16.9', 'Humidity': '68.5', 'PM2.5': '11.0', 'PM25predict': '15.79'})
json= {'timestamp': '2020/01/01 14:00:03', 'Temperat

topic=pm25, partition=0, offset=66 : (key=None, value={'timestamp': datetime.datetime(2020, 1, 2, 10, 0, 2), 'Temperature': '19.5', 'Humidity': '60.8', 'PM2.5': '12.0', 'PM25predict': '17.11'})
json= {'timestamp': '2020/01/02 10:00:02', 'Temperature': '19.5', 'Humidity': '60.8', 'PM25': '12.0', 'PM25predict': '17.11'}
<pymongo.results.InsertOneResult object at 0x7f770d7c3ec8>
topic=pm25, partition=0, offset=67 : (key=None, value={'timestamp': datetime.datetime(2020, 1, 2, 11, 0, 2), 'Temperature': '20.5', 'Humidity': '58.5', 'PM2.5': '14.0', 'PM25predict': '17.57'})
json= {'timestamp': '2020/01/02 11:00:02', 'Temperature': '20.5', 'Humidity': '58.5', 'PM25': '14.0', 'PM25predict': '17.57'}
<pymongo.results.InsertOneResult object at 0x7f770d7c3b48>
topic=pm25, partition=0, offset=68 : (key=None, value={'timestamp': datetime.datetime(2020, 1, 2, 12, 0, 3), 'Temperature': '21.5', 'Humidity': '55.5', 'PM2.5': '11.0', 'PM25predict': '18.07'})
json= {'timestamp': '2020/01/02 12:00:03', 'Temp

topic=pm25, partition=0, offset=88 : (key=None, value={'timestamp': datetime.datetime(2020, 1, 3, 8, 0, 3), 'Temperature': '17.9', 'Humidity': '65.2', 'PM2.5': '29.0', 'PM25predict': '16.32'})
json= {'timestamp': '2020/01/03 08:00:03', 'Temperature': '17.9', 'Humidity': '65.2', 'PM25': '29.0', 'PM25predict': '16.32'}
<pymongo.results.InsertOneResult object at 0x7f770d7c3ec8>
topic=pm25, partition=0, offset=89 : (key=None, value={'timestamp': datetime.datetime(2020, 1, 3, 9, 0, 3), 'Temperature': '19.3', 'Humidity': '62.5', 'PM2.5': '25.0', 'PM25predict': '16.94'})
json= {'timestamp': '2020/01/03 09:00:03', 'Temperature': '19.3', 'Humidity': '62.5', 'PM25': '25.0', 'PM25predict': '16.94'}
<pymongo.results.InsertOneResult object at 0x7f770d789808>
