In [None]:
from pyspark import *
from pyspark.sql import *
from pyspark.sql.types import *
from pyspark.sql.functions import *

In [None]:
spark = (
        SparkSession
        .builder
        .appName("Pyspark_Kafka")
        .config("spark.streaming.stopGracefullyOnShutdown" ,True)
        .config("spark.jars.packages" ,"org.apache.spark:spark-sql-kafka-0-10_2.12:3.3.0")
        .config('spark.jars', 'config/postgresql-42.7.3.jar')
        .config("spark.sql.shuffle.partitions" ,4)
        .master("local[*]")
        .getOrCreate()
    )

In [None]:
spark

In [None]:
data_df = (
    spark
    .read
    .format("kafka")
    .option("kafka.bootstrap.servers" , "ed-kafka:29092")
    .option("subscribe" ,"real-time-pipeline")
    .option("startingOffsets" ,"latest")
    .load()
    )

In [None]:
data_df.printSchema()

In [None]:
df_json = data_df.withColumn("data" ,expr("cast(value as string)"))

In [None]:
schema = StructType([
    StructField("transaction_id", StringType(), True),
    StructField("customer_data", StructType([
        StructField("customer_id", IntegerType(), True),
        StructField("customer_first_name", StringType(), True),
        StructField("customer_last_name", StringType(), True),
        StructField("customer_gender", StringType(), True),
        StructField("customer_country", StringType(), True),
        StructField("customer_email", StringType(), True),
        StructField("device_type", StringType(), True),
        StructField("customer_age", IntegerType(), True)
    ]), True),
    StructField("product_data", StructType([
        StructField("product_id", IntegerType(), True),
        StructField("category", StringType(), True),
        StructField("product_name", StringType(), True),
        StructField("product_price", DoubleType(), True)
    ]), True),
    StructField("timestamp", StringType(), True),
    StructField("payment_method", StringType(), True),
    StructField("quantity", IntegerType(), True),
    StructField("total_amount", DoubleType(), True),
    StructField("status", StringType(), True)
])

In [None]:
parsed_df = df_json.select(from_json(col("data"), schema).alias("data"))

In [None]:
flattened_df = parsed_df.select(
    col("data.transaction_id"),
    col("data.customer_data.customer_id"),
    col("data.customer_data.customer_first_name"),
    col("data.customer_data.customer_last_name"),
    col("data.customer_data.customer_gender"),
    col("data.customer_data.customer_country"),
    col("data.customer_data.customer_email"),
    col("data.customer_data.device_type"),
    col("data.customer_data.customer_age"),
    col("data.product_data.product_id"),
    col("data.product_data.category"),
    col("data.product_data.product_name"),
    col("data.product_data.product_price"),
    col("data.timestamp"),
    col("data.payment_method"),
    col("data.quantity"),
    col("data.total_amount"),
    col("data.status")
)

In [None]:
from pyspark.sql import functions as F

In [None]:
flattened_df = flattened_df.withColumn("timestamp",F.date_format("timestamp", "yyyy-MM-dd HH:mm:ss"))

### Customer dimension table

In [None]:
customer_dim = flattened_df.select("customer_id" ,"customer_first_name" ,"customer_last_name" ,"customer_gender" ,"customer_country" \
                                  ,"customer_email" ,"device_type" ,"customer_age")

In [None]:
customer_dim.write \
    .mode('append') \
    .format("jdbc") \
    .option("url", 'jdbc:postgresql://localhost:5432/E-commerce') \
    .option('dbtable', 'customer_dim') \
    .option("driver", "org.postgresql.Driver") \
    .option("user", "postgres") \
    .option("password", "test1234") \
    .save()

### Date dimension table

In [None]:
date_dim = flattened_df.select(
    F.col("timestamp"),
    F.dayofmonth("timestamp").alias("day"),
    F.month("timestamp").alias("month"),
    F.dayofweek("timestamp").alias("Day of Week"),
    F.date_format("timestamp", "MMMM").alias("Month Name"),
    F.weekofyear("timestamp").alias("Week Number"),
    F.hour("timestamp").alias("hours"),
    F.minute("timestamp").alias("minutes")
)

In [None]:
date_dim.write \
    .mode('append') \
    .format("jdbc") \
    .option("url", 'jdbc:postgresql://localhost:5432/E-commerce') \
    .option('dbtable', 'date_dim') \
    .option("driver", "org.postgresql.Driver") \
    .option("user", "postgres") \
    .option("password", "test1234") \
    .save()

### Product dimension table

In [None]:
product_dim = flattened_df.select("product_id" ,"category" ,"product_name" ,"product_price")

In [None]:
product_dim.write \
    .mode('append') \
    .format("jdbc") \
    .option("url", 'jdbc:postgresql://localhost:5432/E-commerce') \
    .option('dbtable', 'product_dim') \
    .option("driver", "org.postgresql.Driver") \
    .option("user", "postgres") \
    .option("password", "test1234") \
    .save()

### Fact dimension table

In [None]:
fact_table = flattened_df.select("transaction_id" ,"customer_id" ,"timestamp" ,"product_id" ,"payment_method" ,"quantity" ,"total_amount" ,"status")

In [None]:
fact_table.write \
    .mode('append') \
    .format("jdbc") \
    .option("url", 'jdbc:postgresql://localhost:5432/E-commerce') \
    .option('dbtable', 'fact_table') \
    .option("driver", "org.postgresql.Driver") \
    .option("user", "postgres") \
    .option("password", "test1234") \
    .save()