In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import from_json
from pyspark.sql.functions import from_json, when, col, to_date, date_format
from pyspark.sql.types import StructField, StructType, StringType, IntegerType, TimestampType, MapType, FloatType

In [2]:
# Define directories
outputDir = "hdfs://localhost:9000/user/itversity/stream_output"
checkpointDir = "hdfs://localhost:9000/user/itversity/stream_checkpoint/"

In [3]:
# Create SparkSession
spark = SparkSession.builder \
    .appName("KafkaStreamingExample") \
    .config("spark.jars.packages", "org.apache.spark:spark-sql-kafka-0-10_2.12:3.1.2") \
    .config("spark.hadoop.fs.defaultFS", "hdfs://localhost:9000") \
    .enableHiveSupport() \
    .getOrCreate()

# Define schema for the incoming JSON data
schema = StructType() \
    .add("eventType", StringType()) \
    .add("customerId", StringType()) \
    .add("productId", StringType()) \
    .add("timestamp", StringType()) \
    .add("metadata", MapType(StringType(), StringType())) \
    .add("quantity", IntegerType()) \
    .add("totalAmount", FloatType()) \
    .add("paymentMethod", StringType()) \
    .add("recommendedProductId", StringType()) 

In [4]:
# Define the function to create and verify an external table
def create_and_verify_table(create_table_query, table_name, schema_name, path):
    try:
        # Drop table if it exists
        spark.sql(f"DROP TABLE IF EXISTS {schema_name}.{table_name}")
        
        # Create new external table with schema and location
        full_create_query = f"""
        CREATE EXTERNAL TABLE {schema_name}.{table_name} {create_table_query}
        STORED AS PARQUET
        LOCATION '{path}'
        """
        spark.sql(full_create_query)
        
        # Verify table creation
        table_exists = spark.sql(f"SHOW TABLES IN {schema_name} LIKE '{table_name}'").count() > 0
        if table_exists:
            print(f"Success: Table '{schema_name}.{table_name}' created successfully at {path}.")
        else:
            print(f"Failure: Table '{schema_name}.{table_name}' creation failed.")
    except Exception as e:
        print(f"Error creating table '{schema_name}.{table_name}': {e}")

In [5]:
create_table_query = """
(
    eventType STRING,
    customerId STRING,
    productId STRING,
    timestamp STRING,
    quantity INT,
    totalAmount FLOAT,
    paymentMethod STRING,
    recommendedProductId STRING,
    category STRING,
    source STRING
)
PARTITIONED BY (date DATE)
"""

# Call the function to create and verify the table
create_and_verify_table(create_table_query, "stream_output", "BigData_DWH", outputDir)

Success: Table 'BigData_DWH.stream_output' created successfully at hdfs://localhost:9000/user/itversity/stream_output.


In [23]:
# Kafka connection details
bootstrap_servers = "pkc-56d1g.eastus.azure.confluent.cloud:9092"
kafka_topic = "mark_topic"  # add your Kafka topic name
kafka_username = "JUKQQM4ZM632RECA"
kafka_password = "UUkrPuSttgOC0U9lY3ZansNsKfN9fbxZPFwrGxudDrfv+knTD4rCwK+KdIzVPX0D"


# Initialize Spark session
spark = SparkSession.builder \
    .appName("KafkaToParquet") \
    .getOrCreate()

# Read data from Kafka topic as a streaming DataFrame
df = spark \
    .readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", bootstrap_servers) \
    .option("subscribe", kafka_topic) \
    .option("startingOffsets", "earliest") \
    .option("kafka.security.protocol", "SASL_SSL") \
    .option("kafka.sasl.mechanism", "PLAIN") \
    .option("kafka.sasl.jaas.config",
            f'org.apache.kafka.common.security.plain.PlainLoginModule required username="{kafka_username}" password="{kafka_password}";') \
    .load()

# Parse the JSON data
json_df = df.selectExpr("CAST(value AS STRING)") \
            .select(from_json("value", schema).alias("data")) \
            .select("data.*")

# Split the metadata column into category and source
json_df2 = json_df.withColumn("category", col("metadata")["category"]) \
                    .withColumn("source", col("metadata")["source"]) \
                    .drop("metadata")

# Add a new column with the date extracted from timestamp
partitioned_df = json_df2.withColumn("date", to_date(col("timestamp")))



# Write the stream to HDFS partitioned by 'date'
query = partitioned_df \
    .writeStream \
    .partitionBy("date") \
    .format("parquet") \
    .option("path", outputDir) \
    .option("checkpointLocation", checkpointDir) \
    .start()

query.awaitTermination()

KeyboardInterrupt: 

In [6]:
df = spark.read.parquet(outputDir)

df.show(5)

+-------------------+----------+---------+-------------------+--------+-----------+-------------+--------------------+--------+------+----------+
|          eventType|customerId|productId|          timestamp|quantity|totalAmount|paymentMethod|recommendedProductId|category|source|      date|
+-------------------+----------+---------+-------------------+--------+-----------+-------------+--------------------+--------+------+----------+
|        productView|     19081|     4048|2024-07-05T07:31:42|    null|       null|         null|                null|Clothing|Direct|2024-07-05|
|recommendationClick|     75455|     4928|2024-07-05T07:31:44|    null|       null|         null|                2818|    null|  null|2024-07-05|
|          addToCart|     68726|     3266|2024-07-05T07:31:46|       3|       null|         null|                null|    null|  null|2024-07-05|
|           purchase|     56748|     1431|2024-07-05T07:31:48|       5|     384.24|       PayPal|                null|    nu

In [7]:
spark.sql("MSCK REPAIR TABLE BigData_DWH.stream_output")

In [8]:
schema_name="BigData_DWH"
table_name="stream_output"

# 1. Check if the Hive table has data
row_count_query = f"SELECT COUNT(*) AS row_count FROM {schema_name}.{table_name}"
row_count_result = spark.sql(row_count_query)
print("Row count in the table:")
row_count_result.show()


Row count in the table:
+---------+
|row_count|
+---------+
|      449|
+---------+



In [9]:
sample_data_query = f"SELECT * FROM {schema_name}.{table_name} LIMIT 5"
sample_data_result = spark.sql(sample_data_query)
print("Sample data from the table:")
sample_data_result.show()

Sample data from the table:
+-----------+----------+---------+-------------------+--------+-----------+-------------+--------------------+--------+-------------+----------+
|  eventType|customerId|productId|          timestamp|quantity|totalAmount|paymentMethod|recommendedProductId|category|       source|      date|
+-----------+----------+---------+-------------------+--------+-----------+-------------+--------------------+--------+-------------+----------+
|  addToCart|     60945|     3783|2024-07-04T17:51:40|       2|       null|         null|                null|    null|         null|2024-07-04|
|productView|     90819|     7855|2024-07-04T17:51:41|    null|       null|         null|                null|Clothing|Advertisement|2024-07-04|
|  addToCart|     66955|     4881|2024-07-04T17:51:43|       2|       null|         null|                null|    null|         null|2024-07-04|
|   purchase|     48504|     3564|2024-07-04T17:51:47|       1|     424.63|   Debit Card|             

In [10]:
query1 = """
SELECT 
    productId,
    SUM(totalAmount) AS total_sales_amount,
    SUM(quantity) AS total_quantity_sold
FROM 
    BigData_DWH.stream_output
GROUP BY 
    productId
ORDER BY 
    total_sales_amount DESC
"""

# Execute Query 1
result1 = spark.sql(query1)
result1.show()

+---------+------------------+-------------------+
|productId|total_sales_amount|total_quantity_sold|
+---------+------------------+-------------------+
|     1322| 830.2200012207031|                  7|
|     1309| 635.3699951171875|                  4|
|     8059| 499.8999938964844|                  5|
|     8084| 495.6099853515625|                  5|
|     3950| 488.6400146484375|                  4|
|     9783| 481.5799865722656|                  2|
|     8367| 477.7300109863281|                  2|
|     5907| 474.2200012207031|                  2|
|     1241| 468.9599914550781|                  5|
|     1446| 468.5399932861328|                  4|
|     1064|468.04998779296875|                  3|
|     1424|466.05999755859375|                  1|
|     6219| 460.7099914550781|                  3|
|     1153| 455.1199951171875|                  5|
|     1264| 453.1099853515625|                  4|
|     1013| 451.2799987792969|                  3|
|     1282| 448.9700012207031| 

In [11]:
# Query 2: Daily Sales and Quantity by Payment Method
query2 = """
SELECT 
    date,
    paymentMethod,
    SUM(totalAmount) AS daily_sales_amount,
    SUM(quantity) AS daily_quantity_sold
FROM 
    BigData_DWH.stream_output
GROUP BY 
    date, paymentMethod
ORDER BY 
    date, daily_sales_amount DESC
"""

# Execute Query 2
result2 = spark.sql(query2)
result2.show()

+----------+-------------+------------------+-------------------+
|      date|paymentMethod|daily_sales_amount|daily_quantity_sold|
+----------+-------------+------------------+-------------------+
|2024-07-04|  Credit Card|3293.6600189208984|                 37|
|2024-07-04|   Debit Card|3099.7600326538086|                 26|
|2024-07-04|       PayPal| 2838.580047607422|                 42|
|2024-07-04|         null|              null|                130|
|2024-07-05|  Credit Card| 6837.699935913086|                 69|
|2024-07-05|       PayPal| 6566.679931640625|                 85|
|2024-07-05|   Debit Card| 4298.559982299805|                 64|
|2024-07-05|         null|              null|                225|
+----------+-------------+------------------+-------------------+



In [12]:
spark.stop()