In [1]:
# Import necessary libraries
from pyspark.sql import SparkSession
import json
import time
from pyspark.sql.types import *
from pyspark.sql import functions as fn

In [2]:
# Initialize SparkSession 
spark = SparkSession.builder\
    .master("spark://spark-master:7077")\
    .appName("test_streaming")\
    .config("spark.jars.packages", "org.apache.spark:spark-sql-kafka-0-10_2.12:3.5.1")\
    .config("hive.metastore.uris", "thrift://hive-metastore:9083")\
    .enableHiveSupport() \
    .getOrCreate()

In [3]:
# Read streaming data from Kafka
kafka_df = spark.readStream\
        .format("kafka")\
        .option("kafka.bootstrap.servers","broker:29092")\
        .option("subscribe","weather_data")\
        .option("startingOffsets","earliest")\
        .load()

In [4]:
# Define a function to parse streaming data
def parsing_streaming_data(data):
    # Defining a schema
    schema = StructType([
    StructField("city_id", StringType()),
    StructField("city", StringType(), True),
    StructField("weather_condition", StringType(), True),
    StructField("weather_description", StringType(), True),
    StructField("temperature", DecimalType(5,2), True),
    StructField("min_temp", DecimalType(5,2), True),
    StructField("max_temp", DecimalType(5,2), True),
    StructField("pressure", IntegerType(), True),
    StructField("humidity", DecimalType(4,2), True),
    StructField("wind_speed", DecimalType(4,2), True),
    StructField("visibility", DecimalType(6,1), True),
    StructField("creation_time", TimestampType())
])
    # Convert the value column to json string and store it in a new dataframe
    stream_data =data.selectExpr("Cast (value As string)")
    # Parse json and apply schema
    parsed_data = stream_data.select(fn.from_json(fn.col("value"), schema).alias("data"))
    # Return the parsed data
    return parsed_data

In [5]:
# Define a function to select the relevant fields from the streamed data
def select_columns(data): 
    # Create a dataframe with the relevant column
    selected_data = data.select('data.*')
    # Return a df with the relevant fields
    return selected_data

In [6]:
# Define to make some transformations
def transform_measurement_units(data):
    # Create a temp view from the data to do trnsformations with spark sql
    data.createOrReplaceTempView("streaming_data")
    # Making the necessary Transformations
    transformed_data = spark.sql("""SELECT 
                                        city_id,
                                        city,
                                        weather_condition,
                                        weather_description,
                                        Round(temperature-272.15,2) AS temperature,
                                        Round(min_temp-272.15,2) AS min_temp,
                                        Round(max_temp-272.15,2) AS max_temp,
                                        pressure,
                                        humidity,
                                        ROUND(wind_speed*3.6,2) AS wind_speed,
                                        visibility,
                                        creation_time,
                                        TO_DATE (creation_time,"yyy-MM-dd") AS creation_date
                                From streaming_data""")
    #Return the transformed df
    return transformed_data

In [7]:
# Parsing the streamed data
parsed_df = parsing_streaming_data(kafka_df)

In [None]:
# Selecting the relevant columns from the streamed data
weather_df = select_columns(parsed_df)

In [9]:
# Make transformations to the data
transformed_df = transform_measurement_units(weather_df)

In [11]:
# Define a function to create Hive table using spark sql
def creating_hive_taple(data):    
    hive_table_query = """
    CREATE TABLE IF NOT EXISTS default.weather_data (
        city_id STRING,
        city STRING,
        weather_condition STRING,
        weather_description STRING,
        temperature DOUBLE,
        min_temp DOUBLE,
        max_temp DOUBLE,
        pressure INTEGER,
        humidity DOUBLE,
        wind_speed DOUBLE,
        visibility DOUBLE,
        creation_time TIMESTAMP,
        creation_date DATE
    )
    USING Parquet
    PARTITIONED BY (creation_date)
    LOCATION 'hdfs://namenode:9000/data/streaming/weather_data/hive_table'
    """    
    # Creating hive table
    spark.sql(hive_table_query)

In [12]:
# Define a function to start Hive streaming query
def start_hive_streaming_query(data):
    query = data \
    .writeStream \
    .foreachBatch(lambda batch_df, batch_id: batch_df.write.mode("append").insertInto("weather_data")) \
    .outputMode("append") \
    .start()
    return query

In [13]:
# Create Hive table
creating_hive_taple(transformed_df)

In [14]:
# Start Hive streaming query
query = start_hive_streaming_query(transformed_df)
query.awaitTermination() # Wait for query termination

In [15]:
# Stop SparkSession
spark.stop()