Creating an etl Job

In [None]:
from spark_session_factory import create_spark_session
from pyspark.sql import SparkSession, DataFrame
from pyspark.sql import functions as F
from pyspark.sql.window import Window
from pyspark.sql.types import DecimalType



In [191]:
def run_etl(spark: SparkSession, input_path_topic1: str, input_path_topic2: str, output_path: str):
    """
    Main ETL pipeline: read -> transform -> write.
    
    Args:
        spark: Active SparkSession
        input_path: Landing zone path (e.g., '/opt/spark-data/landing/*.json')
        output_path: Gold zone path (e.g., '/opt/spark-data/gold')
    """
    # TODO: Implement
    # Accessing Spark Session
    spark = spark
    
    # Reading from the two json files and cache them for more use
    df_topic_transaction = spark.read.json(input_path_topic1).cache()
    df_topic_user = spark.read.json(input_path_topic2).withColumnRenamed("timestamp","u_timestamp").cache()
    
    # df_topic_transaction.show(truncate=False)
    # df_topic_user.show(truncate=False)
    
    # print(df_topic_transaction.printSchema())
    # print(df_topic_user.printSchema())
    
    # Each product gets its own row called product and renaming timestamp
    df_exploded_transaction = df_topic_transaction.withColumn("product", F.explode("products")) \
        .withColumnRenamed("timestamp", "t_timestamp")
        
    # df_exploded_transaction.select("product.product_id").show(truncate= False)
    
    # print(df_exploded_transaction.printSchema())
    # print(df_topic1.select("*").filter(F.col("user_id") == '10f1bc81').count())
    # print(df_topic2.select('*').filter(F.col("user_id") == '10f1bc81').count())
    
    
    # Join on user_id and the product_id being the same
    # Might lead to some data explosion but best bet to limit that
    df_topics_combined = df_exploded_transaction.alias("transaction") \
        .join(df_topic_user.alias("user"), 
              ((F.col("user.user_id") == F.col("transaction.user_id")) & (F.col("user.product_id") == F.col("transaction.product.product_id"))), 
              "inner")
    
    df_topics_combined.show(truncate=False)
    
    # Split up the columns with more items in it like address, or the products itself
    df_advertising_dept = df_topics_combined \
        .select("product","browser",
            "transaction.user_id","t_timestamp", "device", 
            "shipping_address.state", "shipping_address.city", "shipping_address.country") 
                # .withColumn("product", F.explode("products")).drop("products")
    
    # Collecting each name of the product row
    prod_column_names = df_advertising_dept.select("product").schema["product"].dataType.__dict__["names"]
    
    # df_advertising_dept.printSchema()
    
    # Create a new column for each product_id,Name,Quantity, and unit_price. Also solves for the line revenue
    df_advertising_dept = df_advertising_dept.select("*",*[F.col(f"product.{x}").alias(x) for x in prod_column_names]).drop("product") \
        .withColumn("line_revenue", F.round(F.col("quantity")*F.col("unit_price"),2).cast(DecimalType(10,2))) 

        
    

    # print(df_topic_user.count())
    # print(df_topic_transaction.count())
    # print(df_topics_combined.show(truncate=False))
    
    
    # print(df_advertising_dept.printSchema())
    # print(df_advertising_dept.show(truncate=False))
    

    # The tables we need to write to the gold layer
    df_advertising_dept.show(truncate=False)
    
    
    spark.stop() 
    pass


Questions we want to answer!

We can join on by users,
We can rank devices most likely to spend the most

In [192]:
run_etl(create_spark_session("ETL_job"),'../data/landing/transaction_events_1767986916.1293132.json', '../data/landing/user_events_1767986960.934074.json', "")

                                                                                

+-----------------------------------------------------------------------+--------+------------------------------------+--------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+------------------------------------------------------------------+---------+--------+------+---------------------------+--------+------------------------------------+----------------+--------+------------------------------------------------------------+-------+----------------+-------+-------+----------+------------------------------------+----------------+---------------+--------------+----------+--------+------------+------------+---------------------------+--------+
|billing_address                                                        |currency|original_transaction_id             |payment_method|products 

In [35]:
import os
os.getcwd()

'/home/subre/Revature/revature-trainercode/revature-github-folder/Group6/Assets/Jobs'

In [41]:
starting_path = "../../Assets/spark_data/landing/"
from spark_session_factory import create_spark_session
from pyspark.sql import SparkSession, DataFrame
from pyspark.sql import functions as F
from pyspark.sql.window import Window
from pyspark.sql.types import DecimalType
from pathlib import Path
import argparse
transaction_file = max(
    Path(starting_path).glob("transaction_*.json"),
    key = lambda p: p.name,
    default = None
    )
# print(next(Path(starting_path).glob("transaction_*.json"),None))
user_file = max(
    Path(starting_path).glob("./user_*.json"),
    key = lambda p: p.name,
    default = None
    )
print(transaction_file, user_file)
# transaction_file = next(Path(starting_path).glob("transaction_*.json"),None)
# user_file = next(Path(starting_path).glob("user*.json"), None)
    # transaction_file = next(Path(starting_path).glob("transaction*.json"),None)
    # user_file = next(Path(starting_path).glob("user*.json"), None)
    # Returns the two files we expect
# print(transaction_file, user_file)

../../Assets/spark_data/landing/transaction_events_999999999999999999999.json ../../Assets/spark_data/landing/user_events_99999999999999999.json
