In [None]:
from kafka import KafkaConsumer
import sys, json, pymysql

if __name__ == "__main__":
    # 與 MySQL 連線，並建立 database 和 table
    conn = pymysql.connect(host="mysql", port=3306, user="root",  passwd="iii", db="mysql", charset="utf8mb4") 
    cur = conn.cursor() 
    cur.execute("CREATE DATABASE IF NOT EXISTS db01;")
    cur.execute("USE db01;")
    cur.execute("CREATE TABLE IF NOT EXISTS test_table(\
                    record_id    int    PRIMARY KEY    auto_increment,\
                    mechine    char(1)    NOT NULL,\
                    temperature    float    NOT NULL,\
                    relative_humidity    int    NOT NULL,\
                    timestamp    datetime    NOT NULL);")
    cur.close () 
    conn.close()


    # 設定要連線到Kafka集群的相關設定, 產生一個Kafka的Consumer的實例
    consumer = KafkaConsumer(
        # 指定Kafka集群伺服器
        bootstrap_servers=["kafka_docker_kafka_1:9092"],
        # ConsumerGroup的名稱, 可以不指定
        group_id="cg_001",
        # 指定msgKey的反序列化器, 若Key為None, 無法反序列化
        # key_deserializer=bytes.decode,
        # 指定msgValue的反序列化器
        #value_deserializer=bytes.decode,
        value_deserializer=lambda m: json.loads(m.decode('ascii')),
        # 是否從這個ConsumerGroup尚未讀取的partition / offset開始讀
        auto_offset_reset="earliest",
    )
   
    # 讓Consumer向Kafka集群訂閱指定的topic
    consumer.subscribe(topics="test")

    # 持續的拉取Kafka有進來的訊息
    try:
        print("Now listening for incoming messages ...")
        # 持續監控是否有新的record進來
        for record in consumer:
            topic = record.topic
            partition = record.partition
            offset = record.offset
            timestamp = record.timestamp
            # 取出msgKey與msgValue
            msgKey = record.key
            msgValue = record.value
            # 秀出metadata與msgKey & msgValue訊息
            print("topic=%s, partition=%s, offset=%s : (key=%s, value=%s)" % (record.topic, record.partition, 
                                                                              record.offset, record.key, record.value))
            # 與 MySQL 連線並放入資料
            conn = pymysql.connect(host="mysql", port=3306, user="root",  passwd="iii", db="mysql", charset="utf8mb4") 
            cur = conn.cursor() 
            try:
                cur.execute("USE db01;")
                cur.execute("INSERT INTO test_table(mechine, temperature, relative_humidity, timestamp) VALUES(%s, %s, %s, %s);", 
                            (msgValue['Mechine'], msgValue['Temperature'], msgValue['Relative_Humidity'], msgValue['timestamp']))
                #提交修改
                conn.commit()
                print("Data was sent to MySQL successfully!")
            except:
                #發生錯誤時停止執行MySQL
                conn.rollback()
                print("Data was sent to MySQL unsuccessfully!")
            cur.close () 
            conn.close()

    except:
        # 錯誤處理
        e_type, e_value, e_traceback = sys.exc_info()
        print("type ==> %s" % (e_type))
        print("value ==> %s" % (e_value))
        print("traceback ==> file name: %s" % (e_traceback.tb_frame.f_code.co_filename))
        print("traceback ==> line no: %s" % (e_traceback.tb_lineno))
        print("traceback ==> function name: %s" % (e_traceback.tb_frame.f_code.co_name))
    finally:
        consumer.close()