# DigiB Assignment

In [1]:
spark

## Testing Connection to Kafka Topic

In [1]:
!pip install kafka-python

Collecting kafka-python
  Downloading kafka_python-2.0.2-py2.py3-none-any.whl (246 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m246.5/246.5 KB[0m [31m5.4 MB/s[0m eta [36m0:00:00[0ma [36m0:00:01[0m
[?25hInstalling collected packages: kafka-python
Successfully installed kafka-python-2.0.2
You should consider upgrading via the '/usr/local/bin/python -m pip install --upgrade pip' command.[0m[33m
[0m

In [2]:
from kafka import KafkaConsumer

bootstrap_servers = ['kafka:9092']
topicName = 'orders'

consumer = KafkaConsumer(
    topicName, 
    group_id='group1', 
    bootstrap_servers=bootstrap_servers)

In [3]:
# Read and print message from consumer
counter = 3
for msg in consumer:
    if counter == 0:
        break
    print("Topic Name=%s,Message=%s"%(msg.topic,msg.value))
    counter = counter - 1

Topic Name=orders,Message=b'{"order_id": "c8947380-30f3-4633-a36f-3a5e53dd994c", "customer_id": "894e10e5-ced1-436f-8347-d0f928c06c7e", "order_lines": [{"product_id": "d6e929fc-1fbf-43ff-accc-99add6b6b799", "volume": 26, "price": 43.22}, {"product_id": "f6068fbc-2e75-4088-9041-ab46959a7cd1", "volume": 6, "price": 16.53}, {"product_id": "92252cf5-cb90-45a2-bb41-92aea8fb9e2c", "volume": 91, "price": 79.69}, {"product_id": "cd453ab7-b92d-4c87-b506-7e6df6fcb072", "volume": 3, "price": 73.84}, {"product_id": "881010e7-73c1-46d6-89a3-edfa9370f286", "volume": 46, "price": 22.76}, {"product_id": "b0750dcf-37eb-4ebe-bb47-ecb288dc50f8", "volume": 23, "price": 54.41}, {"product_id": "bbe74932-6bd5-4b0e-8aaf-7f161b6da325", "volume": 8, "price": 48.16}, {"product_id": "0b020e8c-9f7f-400b-bbb0-f682a36580fc", "volume": 73, "price": 64.16}, {"product_id": "d55543b9-cf84-4e1a-9dc6-f7ef9d7c63c7", "volume": 35, "price": 42.36}, {"product_id": "f2312b33-70c1-4b62-a9d7-09f4a9ccc857", "volume": 60, "price":

## Functions for Data Processing

In [4]:
# Importing necessary libraries
from pyspark.sql import DataFrame, Window, SparkSession
import pyspark.sql.functions as fs

# Setting variables
NS = "dal"

### Utility Functions

In [5]:
def deduplicate(df, partition_key, order_column):
    
    window = Window.partitionBy(*partition_key).orderBy(order_column)
    return (
        df.withColumn("RowNr", fs.row_number().over(window)).where("RowNr = 1").drop("RowNr")
    )

def check_for_table(spark: SparkSession, model, table_name):
    
    try:
        spark.read.table(f"{model}.{table_name}")
        return True        
    except:
        pass

### Database Object Functions

In [6]:
def create_iceberg_table(spark: SparkSession, db_name, table_name, columns, replace: bool = False):
    
    try:
        if replace:
            spark.sql(f"CREATE OR REPLACE TABLE {NS}.{db_name}.{table_name} ({columns}) USING iceberg")
            print(f"Table {table_name} replaced.")
        else:
            spark.sql(f"CREATE TABLE IF NOT EXISTS {NS}.{db_name}.{table_name} ({columns}) USING iceberg")
            print(f"Table {table_name} created.")
            
    except Exception as e:
        print(f"Table creation failed with error {e}")
        
def append_to_iceberg_table(spark: SparkSession, df, model, table_name):
    
    try:
        if not check_for_table(spark, model, table_name):
            print(f"Table {table_name} does not exist.")
        df.writeTo(f"{NS}.{model}.{table_name}").append()
        print(f"Data appended to table {table_name}.")
    except Exception as e:
        print(f"Failed to append to table {table_name} with error {e}")        
        
def read_minio_data(spark: SparkSession, source_bucket, file_name, is_json = False):
    
    inputPath = f"s3a://{source_bucket}/{file_name}"
    
    if is_json:
        return spark.read.json(inputPath).drop("order_lines")
    else:
        return spark.read.option("header", "true").format("s3selectCSV").csv(inputPath)   

### Database creation

In [7]:
def create_database(spark: SparkSession, db_name: str):
    
    try:
        spark.sql(f"CREATE DATABASE IF NOT EXISTS {NS}.{db_name} LOCATION 's3a://{NS}/{db_name}'")
        print(f"Database {db_name} created.")
    except Exception as e:
        print(f"Database creation failed with error {e}")

### Data Transformation

In [12]:
def process_customer_data(spark: SparkSession, df):
    
    df = (spark.sql("select * from staging.stg_Customer")
        .withColumnRenamed("customer_id", "CustomerId")
        .withColumnRenamed("company_name", "CompanyName")
        .withColumnRenamed("specialized_industries", "SpecializedIndustries"))
    
    return deduplicate(df, ["CustomerId"], fs.col("CompanyName"))
    
def process_industry_data(spark: SparkSession, df):
    
    df = (spark.sql("select * from staging.stg_Industry")
        .withColumnRenamed("industry", "Industry"))
    
    return deduplicate(df, ["Industry"], fs.col("Industry"))
    
def process_order_data(spark: SparkSession, df):
    
    df = (spark.sql("select * from staging.stg_StreamOrders")
        .withColumnRenamed("order_id", "OrderId")
        .withColumnRenamed("customer_id", "CustomerId")
        .withColumnRenamed("amoung", "Amount")
        .withColumnRenamed("timestamp", "Timestamp"))
    
    return deduplicate(df, ["OrderId"], fs.col("Timestamp"))
    
def process_product_data(spark: SparkSession, df):
    
    df = (spark.sql("select * from staging.stg_Product")
        .withColumnRenamed("product_id", "ProductId")
        .withColumnRenamed("product_name", "ProductName")
        .withColumnRenamed("price", "Price"))
    
    return deduplicate(df, ["ProductId"], fs.col("ProductName"))

In [19]:
def create_db(spark: SparkSession):
    
    create_database(spark, "staging")
    create_database(spark, "prod")
    
def create_tables(spark: SparkSession):
    
    create_iceberg_table(spark, "staging","stg_Product", "product_id string, product_name string, price string")
    create_iceberg_table(spark, "staging","stg_StreamOrders", "order_id string, customer_id string, amount double, timestamp string")
    create_iceberg_table(spark, "staging","stg_Customer", "customer_id string, company_name string, specialized_industries string")
    create_iceberg_table(spark, "staging","stg_Industry", "industry string")
    
    
def insert_into_product_staging(spark: SparkSession):
    
    df = read_minio_data(spark, "demo-data", "Products.csv")
    append_to_iceberg_table(spark, df, "staging", "stg_Product")

    
def insert_into_order_staging(spark: SparkSession):
    
    df = read_minio_data(spark, "demo-data", "orders.json", True)
    append_to_iceberg_table(spark, df, "staging", "stg_StreamOrders")

    
def insert_into_customer_staging(spark: SparkSession):
    
    df = read_minio_data(spark, "demo-data", "Customers.csv")
    append_to_iceberg_table(spark, df, "staging", "stg_Customer")
  
    
def insert_into_industry_staging(spark: SparkSession):
    
    df = read_minio_data(spark, "demo-data", "Industries.csv")
    append_to_iceberg_table(spark, df, "staging", "stg_Industry")
    

def product_stage_to_main(spark: SparkSession, replace: bool):
    
    df = spark.sql("select * from staging.stg_Product")
    df = process_product(spark, df)
    create_iceberg_table(spark, "prod", "Product", "ProductId string, ProductName string, Price double", replace=replace)
    append_to_iceberg_table(spark, df, "prod", "Product")
    

def order_stage_to_main(spark: SparkSession, replace: bool):
    
    df = spark.sql("select * from staging.stg_StreamOrders")
    df = process_order(spark, df)
    create_iceberg_table(spark, "prod", "StreamOrders", "OrderId string, CustomerId string, Amount double, Timestamp timestamp", replace=replace)
    append_to_iceberg_table(spark, df, "prod", "StreamOrders")
    
    
def industry_stage_to_main(spark: SparkSession, replace: bool):
    
    df = spark.sql("select * from staging.stg_Industry")
    df = process_industry(spark, df)
    create_iceberg_table(spark, "prod", "Industry", "Industry string", replace=replace)
    append_to_iceberg_table(spark, df, "prod", "Industry")
    
    
def customer_stage_to_main(spark: SparkSession, replace: bool):
    
    df = spark.sql("select * from staging.stg_Customer")
    df = process_customer(spark, df)
    create_iceberg_table(spark, "prod", "Customer", "CustomerId string, CompanyName string, SpecializedIndustries string", replace=replace)
    append_to_iceberg_table(spark, df, "prod", "Customer")

### Assignment Run

In [14]:
create_db(spark)
create_tables(spark)

Database staging created.
Database prod created.
Table stg_Product created.
Table stg_StreamOrders created.
Table stg_Customer created.
Table stg_Industry created.


In [15]:
insert_into_product_staging(spark)
insert_into_order_staging(spark)
insert_into_customer_staging(spark)
insert_into_industry_staging(spark)

Failed to append to table stg_Product with error Cannot write incompatible data to table 'dal.staging.stg_Product':
- Cannot safely cast 'price': string to double


                                                                                

Data appended to table stg_StreamOrders.
Data appended to table stg_Customer.
Data appended to table stg_Industry.


