# pyspark to delta

In [None]:
import os
os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages org.apache.spark:spark-streaming-kafka-0-10_2.12:3.2.0,org.apache.spark:spark-sql-kafka-0-10_2.12:3.2.0,io.delta:delta-core_2.12:2.4.0 pyspark-shell'

from pyspark.sql import SparkSession
from pyspark.streaming import StreamingContext
from pyspark.sql.functions import from_json
from pyspark.sql.types import StructType, StructField, StringType,DateType


spark = SparkSession \
        .builder \
        .appName("test") \
        .config("spark.sql.debug.maxToStringFields", "100") \
        .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension")\
        .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") \
        .getOrCreate()

from delta import *

In [5]:
DeltaTable.createOrReplace(spark) \
  .tableName("default.iot_sensor_data_test1") \
  .addColumn("dateHour", "TIMESTAMP") \
  .addColumn ("gpsSpeed", "FLOAT") \
  .addColumn ("gpsSatCount", "INT") \
  .addColumn("Gear", "INT").addColumn("Brake_pedal", "INT").addColumn("Accel_pedal", "INT") \
  .addColumn("Machine_Speed_Measured", "INT")\
  .addColumn("AST_Direction", "INT") \
  .addColumn("Ast_HPMB1_Pressure_bar", "INT") \
  .addColumn("Ast_HPMA_Pressure_bar", "INT") \
  .addColumn("Pressure_HighPressureReturn", "INT") \
  .addColumn("Pressure_HighPressure", "INT") \
  .addColumn("Oil_Temperature", "INT") \
  .addColumn("Ast_FrontAxleSpeed_Rpm", "INT") \
  .addColumn("Pump_Speed", "INT") \
  .addColumn("client_id", "STRING") \
  .addColumn("year", "INT", generatedAlwaysAs="YEAR(dateHour)") \
  .addColumn("month", "INT", generatedAlwaysAs="MONTH(dateHour)") \
  .addColumn("day", "INT", generatedAlwaysAs="DAY(dateHour)") \
  .addColumn("hour", "INT", generatedAlwaysAs="HOUR(dateHour)") \
  .partitionedBy("year", "month", "day", "hour") \
  .execute()

                                                                                

<delta.tables.DeltaTable at 0x7fb038453340>

In [7]:
# spark.sql('SELECT * FROM default.`/home/thaibatuana1k41pbc/project_local/spark/spark-warehouse/iot_sensor_data_test1` limit 100;')
spark.read.format("delta").load("/home/thaibatuana1k41pbc/project_local/spark/spark-warehouse/iot_sensor_data_test1").collect()

                                                                                

[]

In [8]:
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, TimestampType, FloatType, IntegerType, StringType, DateType
from pyspark.sql.functions import rand
from datetime import datetime
schema = StructType([
    StructField("dateHour", TimestampType(), True),
    StructField("gpsSpeed", FloatType(), True),
    StructField("gpsSatCount", IntegerType(), True),
    StructField("Gear", IntegerType(), True),
    StructField("Brake_pedal", IntegerType(), True),
    StructField("Accel_pedal", IntegerType(), True),
    StructField("Machine_Speed_Measured", IntegerType(), True),
    StructField("AST_Direction", IntegerType(), True),
    StructField("Ast_HPMB1_Pressure_bar", IntegerType(), True),
    StructField("Ast_HPMA_Pressure_bar", IntegerType(), True),
    StructField("Pressure_HighPressureReturn", IntegerType(), True),
    StructField("Pressure_HighPressure", IntegerType(), True),
    StructField("Oil_Temperature", IntegerType(), True),
    StructField("Ast_FrontAxleSpeed_Rpm", IntegerType(), True),
    StructField("Pump_Speed", IntegerType(), True),
    StructField("client_id", StringType(), True)
])
data = [{"gpsSpeed":41.60666084273356,"gpsSatCount":77,"dateHour":datetime.strptime("2023-06-02 14:18:16.474728", '%Y-%m-%d %H:%M:%S.%f'),"client_id":"cars/7","Pump_Speed":1785,"Pressure_HighPressureReturn":7,"Pressure_HighPressure":33159,"Oil_Temperature":152,"Machine_Speed_Measured":20,"Gear":124,"Brake_pedal":135,"Ast_HPMB1_Pressure_bar":6,"Ast_HPMA_Pressure_bar":11,"Ast_FrontAxleSpeed_Rpm":32925,"Accel_pedal":46,"AST_Direction":20},
{"gpsSpeed":30.16754615219846,"gpsSatCount":31,"dateHour": datetime.strptime("2023-06-02 14:18:16.475092", '%Y-%m-%d %H:%M:%S.%f'),"client_id":"cars/8","Pump_Speed":1258,"Pressure_HighPressureReturn":11,"Pressure_HighPressure":33183,"Oil_Temperature":143,"Machine_Speed_Measured":20,"Gear":125,"Brake_pedal":127,"Ast_HPMB1_Pressure_bar":7,"Ast_HPMA_Pressure_bar":3,"Ast_FrontAxleSpeed_Rpm":33135,"Accel_pedal":39,"AST_Direction":20},
{"gpsSpeed":86.68163413814607,"gpsSatCount":254,"dateHour":datetime.strptime("2023-06-02 14:18:16.479768", '%Y-%m-%d %H:%M:%S.%f'),"client_id":"cars/4","Pump_Speed":1285,"Pressure_HighPressureReturn":5,"Pressure_HighPressure":33043,"Oil_Temperature":155,"Machine_Speed_Measured":20,"Gear":132,"Brake_pedal":134,"Ast_HPMB1_Pressure_bar":11,"Ast_HPMA_Pressure_bar":8,"Ast_FrontAxleSpeed_Rpm":33063,"Accel_pedal":80,"AST_Direction":20},
{"gpsSpeed":0.48792041056762847,"gpsSatCount":77,"dateHour":datetime.strptime("2023-06-02 14:18:16.486976", '%Y-%m-%d %H:%M:%S.%f'),"client_id":"cars/9","Pump_Speed":822,"Pressure_HighPressureReturn":4,"Pressure_HighPressure":32772,"Oil_Temperature":23,"Machine_Speed_Measured":20,"Gear":127,"Brake_pedal":136,"Ast_HPMB1_Pressure_bar":13,"Ast_HPMA_Pressure_bar":8,"Ast_FrontAxleSpeed_Rpm":33003,"Accel_pedal":96,"AST_Direction":20}]

header = ['dateHour', 'gpsSpeed', 'gpsSatCount', 'Gear', 'Brake_pedal', 'Accel_pedal', 'Machine_Speed_Measured', 'AST_Direction', 'Ast_HPMB1_Pressure_bar', 'Ast_HPMA_Pressure_bar', 'Pressure_HighPressureReturn', 'Pressure_HighPressure', 'Oil_Temperature', 'Ast_FrontAxleSpeed_Rpm', 'Pump_Speed', 'client_id']

data_list = [[x[key] for key in header] for x in data]

# # Create a DataFrame from the dictionary
df = spark.createDataFrame(data_list, schema)
df.show()
df.printSchema()

                                                                                

+--------------------+---------+-----------+----+-----------+-----------+----------------------+-------------+----------------------+---------------------+---------------------------+---------------------+---------------+----------------------+----------+---------+
|            dateHour| gpsSpeed|gpsSatCount|Gear|Brake_pedal|Accel_pedal|Machine_Speed_Measured|AST_Direction|Ast_HPMB1_Pressure_bar|Ast_HPMA_Pressure_bar|Pressure_HighPressureReturn|Pressure_HighPressure|Oil_Temperature|Ast_FrontAxleSpeed_Rpm|Pump_Speed|client_id|
+--------------------+---------+-----------+----+-----------+-----------+----------------------+-------------+----------------------+---------------------+---------------------------+---------------------+---------------+----------------------+----------+---------+
|2023-06-02 14:18:...| 41.60666|         77| 124|        135|         46|                    20|           20|                     6|                   11|                          7|                331

In [9]:
from pyspark.sql.functions import *
df.write \
  .format("delta") \
  .mode("overwrite") \
  .option("partitionOverwriteMode", "dynamic") \
  .saveAsTable("default.iot_sensor_data_test1")

                                                                                

In [9]:
df_kafka = spark \
  .readStream \
  .format("kafka") \
  .option("kafka.bootstrap.servers", "localhost:9092") \
  .option("subscribe", "cars_sensor_data_delta") \
  .option("startingOffsets", "latest") \
  .load()

In [11]:
def process_batch(df, batch_id):
    # Collect the rows of the DataFrame as a list
    rows = df.collect()
    # print(rows)
    # df_rows = spark.createDataFrame(rows)
    print(rows)
    # df_rows.write \
    # .format("delta") \
    # .mode("overwrite") \
    # .option("partitionOverwriteMode", "dynamic") \
    # .saveAsTable("default.iot_sensor_data_test1")

In [None]:
query = df_kafka.selectExpr("CAST(value AS STRING)").select("value").writeStream.foreachBatch(process_batch).start()
query.awaitTermination()
  # .format("delta") \
  # .outputMode("append") \
  # .option("checkpointLocation", "/tmp/delta/events/_checkpoints/") \
  # .toTable("default.iot_sensor_data_test")