In [2]:
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, DoubleType, IntegerType
from pyspark.sql.functions import format_number

In [3]:
# Function to create Spark session
def create_spark_session(app_name="WalmartSalesDataAnalysis"):
    return SparkSession.builder.appName(app_name).getOrCreate()

In [4]:
# Function to read data from files
def read_data(spark_session, customers_path, salestxns_path):
    # Define schema for customers
    customers_schema = StructType([
        StructField("customer_id", IntegerType(), True),
        StructField("name", StringType(), True),
        StructField("city", StringType(), True),
        StructField("state", StringType(), True),
        StructField("zipcode", StringType(), True)
    ])
    # Define schema for salestxns
    salestxns_schema = StructType([
        StructField("sales_txn_id", IntegerType(), True),
        StructField("category_id", IntegerType(), True),
        StructField("category_name", StringType(), True),
        StructField("product_id", IntegerType(), True),
        StructField("product_name", StringType(), True),
        StructField("price", DoubleType(), True),
        StructField("quantity", IntegerType(), True),
        StructField("customer_id", IntegerType(), True)
    ])
    # Read customers and salestxns files with defined schema
    customers_df = spark_session.read.option("delimiter", "\t").schema(customers_schema).csv(customers_path)
    salestxns_df = spark_session.read.option("delimiter", "\t").schema(salestxns_schema).csv(salestxns_path)
    return customers_df, salestxns_df

In [5]:
# Function to join dataframes and perform aggregation
def join_and_aggregate_data(customers_df, salestxns_df):
    # Join salestxns with customers to add customer details
    joined_df = salestxns_df.join(customers_df, on="customer_id")
    # Group by customer id, product id, and aggregate quantity
    aggregated_df = joined_df.groupBy("customer_id", "name", "product_id", "product_name", "price").agg({"quantity": "sum"}).withColumnRenamed("sum(quantity)", "total_quantity")
    return aggregated_df

In [6]:
# Function to calculate total amount
def calculate_total_amount(aggregated_df):
    # Calculate total amount using price and total quantity, format to two decimal places
    result_df = aggregated_df.withColumn("total_amount", format_number(aggregated_df["price"] * aggregated_df["total_quantity"], 2))
    return result_df

In [7]:
# Function to select relevant columns
def select_relevant_columns(result_df):
    # Select relevant columns for display
    return result_df.select("customer_id", "name", "product_id", "product_name", "price", "total_quantity", "total_amount")

In [8]:
# Function to write output to CSV
def write_output_to_csv(result_df, output_path):
    # Convert PySpark DataFrame to Pandas DataFrame and write to CSV
    output_df = result_df.toPandas()
    output_df.to_csv(output_path, index=False)
    print("File uploaded successfully to: ", output_path)

In [9]:
# Function to stop Spark session
def stop_spark_session(spark_session):
    spark_session.stop()

In [10]:
# Main function
def main():
    # Create Spark session
    spark = create_spark_session()
    # Paths to input and output files
    customers_path = "data\\customers.txt"
    salestxns_path = "data\\salestxns.txt"
    output_path = "data\\output.csv"
    
    # Read data from files
    customers_df, salestxns_df = read_data(spark, customers_path, salestxns_path)
    # Join dataframes and perform aggregation
    aggregated_df = join_and_aggregate_data(customers_df, salestxns_df)
    # Calculate total amount
    result_df = calculate_total_amount(aggregated_df)
    # Select relevant columns
    result_df = select_relevant_columns(result_df)
    # Write output to CSV
    write_output_to_csv(result_df, output_path)
    # Stop Spark session
    stop_spark_session(spark)

# Entry point of the script
if __name__ == "__main__":
    main()


File uploaded successfully to:  data\output.csv
