SparkSession - hive

SparkContext

Spark UI

Version
v3.5.0
Master
local[8]
AppName
Databricks Shell

In [None]:
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("Sales Data ETL Pipeline").getOrCreate()

#Amazon IAM user credentials for accessing S3 bucket and fetching data.

In [None]:
access_key = '###########################'
secret_key = '####################################
sc._jsc.hadoopConfiguration().set("fs.s3a.access.key", access_key)
sc._jsc.hadoopConfiguration().set("fs.s3a.secret.key", secret_key)

# If you are using Auto Loader file notification mode to load files, provide the AWS Region ID.
aws_region = "ap-south-1"
sc._jsc.hadoopConfiguration().set("fs.s3a.endpoint", "s3." + aws_region + ".amazonaws.com")

In [None]:
from pyspark.sql.types import StructType, StructField, IntegerType, FloatType,StringType
from pyspark.sql.functions import col

Schema for datasets

In [None]:
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, FloatType, TimestampType, DateType

# Define the schema
schema = StructType([
    StructField("Transaction_ID", StringType(), True),
    StructField("Customer_ID", StringType(), True),
    StructField("Name", StringType(), True),
    StructField("Email", StringType(), True),
    StructField("Phone", StringType(), True),
    StructField("Address", StringType(), True),
    StructField("City", StringType(), True),
    StructField("State", StringType(), True),
    StructField("Zipcode", StringType(), True),
    StructField("Country", StringType(), True),
    StructField("Age", FloatType(), True),
    StructField("Gender", StringType(), True),
    StructField("Income", StringType(), True),
    StructField("Customer_Segment", StringType(), True),
    StructField("Date", StringType() , True),  # Date in 'MM/dd/yyyy' format
    StructField("Year", StringType() , True),  # Year as integer
    StructField("Month", StringType(), True),  # Month as string
    StructField("Time", StringType(), True),  # Time in 'HH:mm:ss' format
    StructField("Total_Purchases", FloatType(), True),
    StructField("Amount", FloatType(), True),
    StructField("Total_Amount", FloatType(), True),
    StructField("Product_Category", StringType(), True),
    StructField("Product_Brand", StringType(), True),
    StructField("Product_Type", StringType(), True),
    StructField("Feedback", StringType(), True),
    StructField("Shipping_Method", StringType(), True),
    StructField("Payment_Method", StringType(), True),
    StructField("Order_Status", StringType(), True),
    StructField("Ratings", FloatType(), True),
    StructField("products", StringType(), True)
])


Loading Datasets

In [None]:
df = spark.read.format("csv").option("header", "true").schema(schema).load("s3://osamaharsh/new_retail_data.csv")

In [None]:
df.show(1)

In [None]:
from pyspark.sql.functions import to_date, col

# Convert the Date column from string to DateType
df = df.withColumn("Date", to_date(col("Date"), "M/d/yyyy"))

# Convert the Year column to IntegerType
df = df.withColumn("Year", col("Year").cast(IntegerType()))



# Show the DataFrame to verify the schema and data
df.show(5)

In [None]:
df.printSchema()

In [None]:
num_cols = len(df.columns)
print(f"Number of columns in the DataFrame: {num_cols}")


#Analyzing Data

In [None]:
df.count()

In [None]:
# Drop all rows with any NULL values
df = df.dropna()
df.show()
df.count()

In [None]:
df= df.dropDuplicates()

In [None]:
from pyspark.sql.functions import col, sum

# Initialize a list to hold the results
null_counts = {}

# Iterate over each column in the DataFrame
for column in df.columns:
    null_count = df.filter(col(column).isNull()).count()
    null_counts[column] = null_count

# Print the count of NULL values for each column
for column, count in null_counts.items():
    print(f"Column '{column}' has {count} NULL values.")

In [None]:
from pyspark.sql.functions import col

# Get the count of unique values for each column
unique_counts = {col_name: df.select(col_name).distinct().count() for col_name in df.columns}

# Display the unique counts for each column
for col_name, count in unique_counts.items():
    print(f"{col_name}: {count} unique values")

In [None]:
from pyspark.sql.functions import col, count

# Step 1: Group by Transaction_ID and Customer_ID, and count occurrences
duplicates_df = df.groupBy("Transaction_ID", "Customer_ID").agg(count("*").alias("count"))

# Step 2: Filter for Transaction_IDs with duplicates
duplicate_transactions = duplicates_df.filter(col("count") > 1)

# Step 3: Count how many duplicate Transaction_IDs have the same Customer_ID
matching_cust_ids_count = duplicate_transactions.count()

print(f"Number of duplicate Transaction_IDs with the same Customer_ID: {matching_cust_ids_count}")

In [None]:
from pyspark.sql.functions import count

duplicate_count = df.groupBy(df.columns).agg(count("*").alias("count")).filter(col("count") > 1).count()
print(f"Number of duplicate rows in the DataFrame: {duplicate_count}")


Count of Transactions Per Year

In [None]:
df.createOrReplaceTempView("sales_data")

In [None]:
%sql
SELECT Year, COUNT(Transaction_ID) AS transaction_count
FROM sales_data
GROUP BY Year
ORDER BY Year;


Total Amount Spent by top 10 customer

In [None]:
%sql
SELECT Customer_ID, SUM(Total_Amount) AS total_spent
FROM sales_data
GROUP BY Customer_ID
ORDER BY total_spent DESC limit 10;



Average Age of Customers by Customer Segment

In [None]:
%sql
SELECT Customer_Segment, AVG(Age) AS avg_age
FROM sales_data
GROUP BY Customer_Segment
ORDER BY avg_age DESC;

Top 10 Products by Total Sales Amount

In [None]:
%sql
SELECT Product_Brand, Product_Type, SUM(Total_Amount) AS total_sales
FROM sales_data
GROUP BY Product_Brand, Product_Type
ORDER BY total_sales DESC
LIMIT 10;

Monthly Sales Performance

In [None]:
%sql
SELECT Year, Month, SUM(Total_Amount) AS monthly_sales
FROM sales_data
GROUP BY Year, Month
ORDER BY Year, Month;


Distribution of Order Status

In [None]:
%sql
SELECT Order_Status, COUNT(*) AS order_count
FROM sales_data
GROUP BY Order_Status
ORDER BY order_count DESC;


Busiest Hours

In [None]:
%sql
-- SQL query to find the busiest hours overall
SELECT
    EXTRACT(HOUR FROM CAST(Time AS TIMESTAMP)) AS Hour,
    COUNT(*) AS order_count
FROM sales_data
GROUP BY Hour
ORDER BY order_count DESC;

Most Orders Country-Wise

In [None]:
%sql
-- SQL query to find the most orders by country
SELECT
    Country,
    COUNT(*) AS order_count
FROM sales_data
GROUP BY Country
ORDER BY order_count DESC;

Most order regionwise

In [None]:
%sql
-- SQL query to find the most orders by region
SELECT
    state,
    COUNT(*) AS order_count
FROM sales_data
GROUP BY state
ORDER BY order_count DESC;



#Loading Data in S3 bucket in parquet format

In [None]:
result.coalesce(1).write.parquet('s3a://trypiyush/try123.parquet')