In [1]:
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.types import StructField , StructType , StringType , IntegerType , ArrayType , LongType
from pyspark.sql.functions import from_json , col , explode



In [5]:
spark = (
    SparkSession.builder
    .appName("Streaming from Kafka")
    .config("spark.streaming.stopGracefullyOnShutdown", "true")
    .config("spark.jars.packages", "org.apache.spark:spark-sql-kafka-0-10_2.13:4.0.1")
    .config("spark.jars" , '/home/mhai/test/postgresql-42.7.3.jar')
    .config("spark.sql.shuffle.partitions", "5")
    .master("local[*]")
    .getOrCreate()
)


25/10/29 17:54:12 WARN SparkSession: Using an existing Spark session; only runtime SQL configurations will take effect.


In [3]:
spark

In [4]:
print(spark.sparkContext._jsc.sc().listJars())


List(spark://10.255.255.254:44879/jars/postgresql-42.7.3.jar)


In [None]:
device = spark.readStream.format('kafka') \
                   .option('kafka.bootstrap.servers' , '172.23.152.231:9092') \
                   .option('subscribe' , 'device-data') \
                   .option('startingOffsets' , 'earliest') \
                   .load()

In [None]:
device.printSchema()

In [None]:
device = device.withColumn('value' , device['value'].cast('string'))

In [None]:
device.printSchema()

In [None]:
device_schema = StructType(
    [
        StructField('customerId' , StringType() , True) ,
        StructField('data' , StructType(
            [StructField('devices' ,
                ArrayType(StructType([
                    StructField('deviceId' , StringType() , True) ,
                    StructField('measure' , StringType() , True) ,
                    StructField('status' , StringType() , True) ,
                    StructField('temperature' , StringType() , True)
                ]),True ) , True ) 
            ]) , True) ,
        StructField('eventId' , StringType() , True) ,
        StructField('eventOffset' , LongType() , True) ,
        StructField('eventPublisher' , StringType() , True) ,
        StructField('eventTime' , StringType(), True )          
    ]
)

In [None]:
device = device.withColumn('value_json' , from_json(col('value') , device_schema)).select('value_json.*')

In [None]:
device_explode = device.withColumn('devices' , explode(col('data.devices')))

In [None]:
device = device_explode.withColumn('deviceId', col('devices.deviceId')) \
                        .withColumn('measure', col('devices.measure')) \
                        .withColumn('status', col('devices.status')) \
                        .withColumn('temperature', col('devices.temperature')) \
                        .drop('data') \
                        .drop('devices')

In [None]:
def save_psql(data , batch_id) :
    print('batch id : ' , str(batch_id))    
    data.write.format('jdbc') \
            .mode('append') \
            .option('driver' , 'org.postgresql.Driver') \
            .option('url' , 'jdbc:postgresql://192.168.1.5:5432/test') \
            .option('dbtable' , 'public.device_data') \
            .option('user' , 'postgres') \
            .option('password' , 'MINHHAI123') \
            .save()
    data.show()
    
    

In [None]:
device.printSchema()

In [None]:
device.writeStream \
        .foreachBatch(save_psql) \
        .outputMode('append') \
        .trigger(processingTime = '10 seconds') \
        .option('checkpointLocation' , 'checkpoint_dir_kafka') \
        .start() \
        .awaitTermination() 