In [None]:
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.functions import from_json, col, window
from pyspark.sql.types import StructType, StructField, StringType, DoubleType
from pyspark.ml.regression import LinearRegression
from pyspark.ml.feature import VectorAssembler
from pyspark.ml import Pipeline
import logging
from confluent_kafka import Producer

# Create a Spark session
spark = SparkSession.builder \
    .appName("Truck_Data_Stream (Machine Learning)") \
    .config("spark.jars", "./../mysql_conn-JAR-FILE/mysql-connector-java-8.0.32.jar") \
    .master("local[*]")\
    .getOrCreate()

# Set log level to INFO
spark.sparkContext.setLogLevel("INFO")

# Define Kafka consumer options
kafka_bootstrap_servers = "localhost:9092"
kafka_topic_name = "Truck-Data"
kafka_subscribe_type = "subscribe"
kafka_consumer_group = "test_group"

# Define the schema for the messages received from Kafka
schema = StructType([
    StructField("timestamp", StringType()),
    StructField("distance_covered", DoubleType()),
    StructField("engine_speed", DoubleType()),
    StructField("fuel_consumed", DoubleType())    
])

# Read data from Kafka and apply the schema
df = spark \
    .readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", kafka_bootstrap_servers) \
    .option(kafka_subscribe_type, kafka_topic_name) \
    .option("startingOffsets", "earliest") \
    .option("failOnDataLoss", "false") \
    .load() \
    .selectExpr("CAST(value AS STRING)") \
    .select(from_json(col("value"), schema).alias("json")) \
    .select("json.*")

