In [1]:
## โค้ดนี้พัฒนาจาก A-aggValuePM3-alertDisplay-withTrigger.ipynb

In [2]:
## โจทย์ทางธุรกิจ คือ ธุรกิจต้องการรับการแจ้งเตือนเมื่อมีค่าฝุ่น (pm 2.5) เกินกว่าค่า threshold 
## โดยให้มีการคำนวณเปรียบเทียบกับ threshold ในทุกๆ 5 วินาทีที่ processing platform (Spark) ได้รับข้อมูล (Message Arrival)

In [3]:
# จำเป็นต้องเปลี่ยนเลขไอพีของ Kafka broker โดยใช้ INTERNAL IP Address พร้อมด้วยหมายเลข port 9092

kafka_broker = "10.128.0.12:9092"

In [4]:
# สร้าง Spark session โดยใช้การตั้งค่าที่กำหนด

from pyspark.sql import SparkSession

spark = SparkSession.\
        builder.\
        appName("KafkaSubscribe").\
        master("spark://spark-master:7077").\
        config("spark.executor.memory", "1000m").\
        config("spark.executor.cores", "2").\
        config("spark.cores.max", "6").\
        config("spark.jars.packages", "org.apache.spark:spark-sql-kafka-0-10_2.12:3.0.0").\
        getOrCreate()



Ivy Default Cache set to: /root/.ivy2/cache
The jars for the packages stored in: /root/.ivy2/jars
:: loading settings :: url = jar:file:/usr/local/lib/python3.9/dist-packages/pyspark/jars/ivy-2.4.0.jar!/org/apache/ivy/core/settings/ivysettings.xml
org.apache.spark#spark-sql-kafka-0-10_2.12 added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-7d7555c0-3aab-4d1f-818e-0ea5a9730abb;1.0
	confs: [default]
	found org.apache.spark#spark-sql-kafka-0-10_2.12;3.0.0 in central
	found org.apache.spark#spark-token-provider-kafka-0-10_2.12;3.0.0 in central
	found org.apache.kafka#kafka-clients;2.4.1 in central
	found com.github.luben#zstd-jni;1.4.4-3 in central
	found org.lz4#lz4-java;1.7.1 in central
	found org.xerial.snappy#snappy-java;1.1.7.5 in central
	found org.slf4j#slf4j-api;1.7.30 in central
	found org.spark-project.spark#unused;1.0.0 in central
	found org.apache.commons#commons-pool2;2.6.2 in central
:: resolution report :: resolve 687ms :: artifacts dl 15

# 1. ได้รับ Unbounded input table 

In [5]:
# สร้าง DataFrame โดยอ่านข้อมูลจาก Kafka stream

unboundInput_table_df = spark \
  .readStream \
  .format("kafka") \
  .option("kafka.bootstrap.servers", kafka_broker) \
  .option("subscribe", "quickstart-events") \
  .option("group.id", "Aekanun-Spark-App") \
  .load()

In [6]:
from pyspark.sql import functions as sparkf

In [7]:
# แสดง schema ของ column ต่างๆ ใน DataFrame

unboundInput_table_df.printSchema()

root
 |-- key: binary (nullable = true)
 |-- value: binary (nullable = true)
 |-- topic: string (nullable = true)
 |-- partition: integer (nullable = true)
 |-- offset: long (nullable = true)
 |-- timestamp: timestamp (nullable = true)
 |-- timestampType: integer (nullable = true)



# 2. เกิดการ Query - Stream Data Processing

In [8]:
# เลือกคอลัมน์ที่ต้องการจาก DataFrame

selectedCol_df = unboundInput_table_df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")

In [9]:
from pyspark.sql.types import *
from pyspark.sql.functions import from_json, col

# กำหนด schema สำหรับข้อมูลที่ต้องการ

schema = StructType([
    StructField("locationId", StringType()),
    StructField("location", StringType()),
    StructField("parameter", StringType()),
    StructField("value", StringType()),
    StructField("date", StringType()),
    StructField("unit", StringType()),
    StructField("coordinates", StringType()),
    StructField("country", StringType()),
    StructField("city", StringType()),
    StructField("isMobile", StringType()),
    StructField("isAnalysis", StringType()),
    StructField("entity", StringType()),
    StructField("sensorType", StringType())
])

date_schema = StructType([
    StructField("utc", StringType()),
    StructField("local", StringType())
])

coordinates_schema = StructType([
    StructField("latitude", DoubleType()),
    StructField("longitude", DoubleType())
])

# แปลงข้อมูลที่มีโครงสร้างเป็น JSON ใน column value เป็น schema ที่กำหนดไว้ก่อนหน้านี้
parsedData_df = selectedCol_df.withColumn("data", from_json("value", schema))

In [10]:
parsedData_df.printSchema()

root
 |-- key: string (nullable = true)
 |-- value: string (nullable = true)
 |-- data: struct (nullable = true)
 |    |-- locationId: string (nullable = true)
 |    |-- location: string (nullable = true)
 |    |-- parameter: string (nullable = true)
 |    |-- value: string (nullable = true)
 |    |-- date: string (nullable = true)
 |    |-- unit: string (nullable = true)
 |    |-- coordinates: string (nullable = true)
 |    |-- country: string (nullable = true)
 |    |-- city: string (nullable = true)
 |    |-- isMobile: string (nullable = true)
 |    |-- isAnalysis: string (nullable = true)
 |    |-- entity: string (nullable = true)
 |    |-- sensorType: string (nullable = true)



In [11]:
# เลือกคอลัมน์ที่ต้องการจาก DataFrame และแยก nested column ออกมา (ทำ unnest)

unNested_df = parsedData_df.select("key", "data.*")

In [12]:
unNested_df.printSchema()

root
 |-- key: string (nullable = true)
 |-- locationId: string (nullable = true)
 |-- location: string (nullable = true)
 |-- parameter: string (nullable = true)
 |-- value: string (nullable = true)
 |-- date: string (nullable = true)
 |-- unit: string (nullable = true)
 |-- coordinates: string (nullable = true)
 |-- country: string (nullable = true)
 |-- city: string (nullable = true)
 |-- isMobile: string (nullable = true)
 |-- isAnalysis: string (nullable = true)
 |-- entity: string (nullable = true)
 |-- sensorType: string (nullable = true)



In [13]:
# แสดง schema เพื่อจะใช้ในการแปลงข้อมูลที่มีโครงสร้างเป็น JSON ใน column value (คนละตัวกับ value ก่อนหน้านี้) เป็น schema ที่กำหนดไว้
# แสดง schema เพื่อจะใช้ในการแปลงข้อมูลที่มีโครงสร้างเป็น JSON ใน column date เป็น schema ที่กำหนดไว้
# แสดง schema เพื่อจะใช้ในการแปลงข้อมูลที่มีโครงสร้างเป็น JSON ใน column coordiates เป็น schema ที่กำหนดไว้

unNested_df.withColumn("data", from_json("value", schema))\
.withColumn("date", from_json("date", date_schema))\
.withColumn("coordinates", from_json("coordinates", coordinates_schema))\
.printSchema()

root
 |-- key: string (nullable = true)
 |-- locationId: string (nullable = true)
 |-- location: string (nullable = true)
 |-- parameter: string (nullable = true)
 |-- value: string (nullable = true)
 |-- date: struct (nullable = true)
 |    |-- utc: string (nullable = true)
 |    |-- local: string (nullable = true)
 |-- unit: string (nullable = true)
 |-- coordinates: struct (nullable = true)
 |    |-- latitude: double (nullable = true)
 |    |-- longitude: double (nullable = true)
 |-- country: string (nullable = true)
 |-- city: string (nullable = true)
 |-- isMobile: string (nullable = true)
 |-- isAnalysis: string (nullable = true)
 |-- entity: string (nullable = true)
 |-- sensorType: string (nullable = true)
 |-- data: struct (nullable = true)
 |    |-- locationId: string (nullable = true)
 |    |-- location: string (nullable = true)
 |    |-- parameter: string (nullable = true)
 |    |-- value: string (nullable = true)
 |    |-- date: string (nullable = true)
 |    |-- unit: st

In [14]:
# แปลงข้อมูลที่มีโครงสร้างเป็น JSON ใน column value (คนละตัวกับ value ก่อนหน้านี้) เป็น schema ที่กำหนดไว้
# แปลงข้อมูลที่มีโครงสร้างเป็น JSON ใน column date เป็น schema ที่กำหนดไว้
# แปลงข้อมูลที่มีโครงสร้างเป็น JSON ใน column coordiates เป็น schema ที่กำหนดไว้
# แล้วจึงทำ unnest ด้วย .select กับ col. data, date, coordinate

extractedDateLatLong_df = unNested_df.withColumn("data", from_json("value", schema))\
.withColumn("date", from_json("date", date_schema))\
.withColumn("coordinates", from_json("coordinates", coordinates_schema))\
.select('key',
 'locationId',
 'location',
 'parameter',
 'value',
 'date.*',
 'unit',
 'coordinates.*',
 'country',
 'city',
 'isMobile',
 'isAnalysis',
 'entity',
 'sensorType')

In [15]:
extractedDateLatLong_df.printSchema()

root
 |-- key: string (nullable = true)
 |-- locationId: string (nullable = true)
 |-- location: string (nullable = true)
 |-- parameter: string (nullable = true)
 |-- value: string (nullable = true)
 |-- utc: string (nullable = true)
 |-- local: string (nullable = true)
 |-- unit: string (nullable = true)
 |-- latitude: double (nullable = true)
 |-- longitude: double (nullable = true)
 |-- country: string (nullable = true)
 |-- city: string (nullable = true)
 |-- isMobile: string (nullable = true)
 |-- isAnalysis: string (nullable = true)
 |-- entity: string (nullable = true)
 |-- sensorType: string (nullable = true)



In [16]:
#extractedDateLatLong_df.filter(sparkf.col('isMobile').isNotNull())

# 3. ได้รับ Result table

In [17]:
result_table_df = extractedDateLatLong_df

In [18]:
# แสดงรายชื่อ column เป็น list

result_table_df.columns

['key',
 'locationId',
 'location',
 'parameter',
 'value',
 'utc',
 'local',
 'unit',
 'latitude',
 'longitude',
 'country',
 'city',
 'isMobile',
 'isAnalysis',
 'entity',
 'sensorType']

In [19]:
result_table_df.printSchema()

root
 |-- key: string (nullable = true)
 |-- locationId: string (nullable = true)
 |-- location: string (nullable = true)
 |-- parameter: string (nullable = true)
 |-- value: string (nullable = true)
 |-- utc: string (nullable = true)
 |-- local: string (nullable = true)
 |-- unit: string (nullable = true)
 |-- latitude: double (nullable = true)
 |-- longitude: double (nullable = true)
 |-- country: string (nullable = true)
 |-- city: string (nullable = true)
 |-- isMobile: string (nullable = true)
 |-- isAnalysis: string (nullable = true)
 |-- entity: string (nullable = true)
 |-- sensorType: string (nullable = true)



# 4. ได้รับ Output (table)

In [None]:
from pyspark.sql import functions as F

# กำหนดฟังก์ชันสำหรับการตรวจสอบค่ามากที่สุดของฝุ่น ในแต่ละ trigger ที่ Spark ไป subscribe จาก Kafka มา (ทุกๆ 5 sec.)
def max_value(df, epoch_id):
    df = df.withColumn("value", df["value"].cast("float"))
    max_value_row = df.orderBy(df.value.desc()).first()
    if max_value_row is not None and max_value_row.value > 60:
        print(f"ALERT! Batch: {epoch_id}, Max value: {max_value_row.value} exceeded 60 at date: {max_value_row.local}!")
    elif max_value_row is not None:
        print(f"Batch: {epoch_id}, Max value: {max_value_row.value} at date: {max_value_row.local}")
    else:
        print(f"Batch: {epoch_id}, No data")

output_df = result_table_df.writeStream \
    .foreachBatch(max_value) \
    .outputMode("update") \
    .trigger(processingTime='5 seconds') \
    .start()

output_df.awaitTermination()


23/06/21 16:10:11 WARN StreamingQueryManager: Temporary checkpoint location created which is deleted normally when the query didn't fail: /tmp/temporary-0b681d83-f41e-42d9-b0da-3c34942f5a6d. If it's required to delete it under any circumstances, please set spark.sql.streaming.forceDeleteTempCheckpointLocation to true. Important to know deleting temp checkpoint folder is best effort.


Batch: 0, No data


23/06/21 16:12:18 WARN ProcessingTimeExecutor: Current batch is falling behind. The trigger interval is 5000 milliseconds, but spent 8392 milliseconds


ALERT! Batch: 1, Max value: 64.0 exceeded 60 at date: 2022-03-09T20:00:00+07:00!


                                                                                

ALERT! Batch: 2, Max value: 73.0 exceeded 60 at date: 2022-04-10T03:00:00+07:00!
ALERT! Batch: 3, Max value: 62.0 exceeded 60 at date: 2022-12-23T07:00:00+07:00!


In [None]:
output_df.stop()