In [47]:
!pip install findspark

import findspark
findspark.init()

import pyspark
from pyspark import SparkContext
from pyspark.sql import SparkSession

# Creating Spark Session
spark = SparkSession.builder.getOrCreate()    
#getOrCreate() – This returns a SparkSession object if already exists, creates new one if not exists.

# Creating New Session
NewSparkSession = SparkSession.newSession



In [48]:
# Method:::: gtActiveSession() – returns an active Spark session.
spark.getActiveSession()

In [49]:
import os

file_path = "/Users/abhishekkumar/Downloads/Project_Dataset/Retail/retail_store_sales.csv"

# Check if file exists
if os.path.exists(file_path):
    print(f"File '{file_path}' exists.")
else:
    print(f"File '{file_path}' does not exist.")


File '/Users/abhishekkumar/Downloads/Project_Dataset/Retail/retail_store_sales.csv' exists.


In [50]:
import pandas as pd

# Read CSV into a DataFrame
retail_data = pd.read_csv(file_path)

# Display all columns properly
pd.set_option('display.max_columns', None)  
pd.set_option('display.width', 1000)  

# Preview the dataset
df.head(1000)


Unnamed: 0,Transaction ID,Customer ID,Category,Item,Price Per Unit,Quantity,Total Spent,Payment Method,Location,Transaction Date,Discount Applied
0,TXN_6867343,CUST_09,Patisserie,Item_10_PAT,18.5,10.0,185.0,Digital Wallet,Online,2024-04-08,True
1,TXN_3731986,CUST_22,Milk Products,Item_17_MILK,29.0,9.0,261.0,Digital Wallet,Online,2023-07-23,True
2,TXN_9303719,CUST_02,Butchers,Item_12_BUT,21.5,2.0,43.0,Credit Card,Online,2022-10-05,False
3,TXN_9458126,CUST_06,Beverages,Item_16_BEV,27.5,9.0,247.5,Credit Card,Online,2022-05-07,
4,TXN_4575373,CUST_05,Food,Item_6_FOOD,12.5,7.0,87.5,Digital Wallet,Online,2022-10-02,False
...,...,...,...,...,...,...,...,...,...,...,...
995,TXN_8420316,CUST_07,Food,Item_14_FOOD,24.5,7.0,171.5,Digital Wallet,In-store,2024-02-21,
996,TXN_8847569,CUST_16,Electric household essentials,Item_21_EHE,35.0,5.0,175.0,Cash,In-store,2023-07-19,True
997,TXN_1662766,CUST_18,Milk Products,Item_16_MILK,27.5,2.0,55.0,Digital Wallet,Online,2024-05-19,False
998,TXN_6276011,CUST_15,Butchers,Item_22_BUT,36.5,4.0,146.0,Cash,Online,2022-06-10,True


In [51]:
# Define file path
file_path = "/Users/abhishekkumar/Downloads/Project_Dataset/Retail/retail_store_sales.csv"

# Read CSV into a Spark DataFrame
retail_data = spark.read.option("header", True).csv(file_path)

# Show all columns
retail_data.show(truncate=False)


+--------------+-----------+-----------------------------+------------+--------------+--------+-----------+--------------+--------+----------------+----------------+
|Transaction ID|Customer ID|Category                     |Item        |Price Per Unit|Quantity|Total Spent|Payment Method|Location|Transaction Date|Discount Applied|
+--------------+-----------+-----------------------------+------------+--------------+--------+-----------+--------------+--------+----------------+----------------+
|TXN_6867343   |CUST_09    |Patisserie                   |Item_10_PAT |18.5          |10.0    |185.0      |Digital Wallet|Online  |2024-04-08      |True            |
|TXN_3731986   |CUST_22    |Milk Products                |Item_17_MILK|29.0          |9.0     |261.0      |Digital Wallet|Online  |2023-07-23      |True            |
|TXN_9303719   |CUST_02    |Butchers                     |Item_12_BUT |21.5          |2.0     |43.0       |Credit Card   |Online  |2022-10-05      |False           |
|TXN

In [52]:
# Dictionary of existing column names as keys and new column names as values
column_mapping = {
    'Transaction ID': 'transaction_id',
    'Customer ID': 'customer_id',
    'Category': 'category_type',
    'Item': 'item_type',
    'Price Per Unit': 'price_per_unit',
    'Quantity': 'no_of_product_purchased',
    'Total Spent': 'total_spent',
    'Payment Method': 'payment_method',
    'Location': 'address',
    'Transaction Date': 'transaction_date',
    'Discount Applied': 'discount_applied'
}

# Rename columns using the dictionary mapping
for old_col, new_col in column_mapping.items():
    spark_df = spark_df.withColumnRenamed(old_col, new_col)

# Show the updated DataFrame
spark_df.show(truncate=False)




+--------------+-----------+-----------------------------+------------+--------------+-----------------------+-----------+--------------+--------+----------------+----------------+
|transaction_id|customer_id|category_type                |item_type   |price_per_unit|no_of_product_purchased|total_spent|payment_method|address |transaction_date|discount_applied|
+--------------+-----------+-----------------------------+------------+--------------+-----------------------+-----------+--------------+--------+----------------+----------------+
|TXN_6867343   |CUST_09    |Patisserie                   |Item_10_PAT |18.5          |10.0                   |185.0      |Digital Wallet|Online  |2024-04-08      |true            |
|TXN_3731986   |CUST_22    |Milk Products                |Item_17_MILK|29.0          |9.0                    |261.0      |Digital Wallet|Online  |2023-07-23      |true            |
|TXN_9303719   |CUST_02    |Butchers                     |Item_12_BUT |21.5          |2.0      

In [53]:
# Verify the new column names
print(spark_df.columns)

['transaction_id', 'customer_id', 'category_type', 'item_type', 'price_per_unit', 'no_of_product_purchased', 'total_spent', 'payment_method', 'address', 'transaction_date', 'discount_applied']


In [54]:
# Check for duplicate rows based on all columns
duplicate_count = spark_df.count() - spark_df.dropDuplicates().count()
print(f"Number of duplicate rows: {duplicate_count}")


Number of duplicate rows: 0


In [55]:
from pyspark.sql import functions as F

# Count null values in each column
null_counts = spark_df.select([F.count(F.when(F.col(col).isNull(), col)).alias(col) for col in spark_df.columns])

# Show null counts per column
null_counts.show(truncate=False)


+--------------+-----------+-------------+---------+--------------+-----------------------+-----------+--------------+-------+----------------+----------------+
|transaction_id|customer_id|category_type|item_type|price_per_unit|no_of_product_purchased|total_spent|payment_method|address|transaction_date|discount_applied|
+--------------+-----------+-------------+---------+--------------+-----------------------+-----------+--------------+-------+----------------+----------------+
|0             |0          |0            |1213     |609           |604                    |604        |0             |0      |0               |4199            |
+--------------+-----------+-------------+---------+--------------+-----------------------+-----------+--------------+-------+----------------+----------------+



In [56]:
# Replace null values in specific columns
spark_df = spark_df.fillna({               
    'category_type': 'Uncategorized',        
    'item_type': 'Not Specified',                  
    'price_per_unit': 0.0,                   
    'no_of_product_purchased': 0,            
    'total_spent': 0.0,                      
    'payment_method': 'Unknown',             
    'address': 'Unknown',                    
    'transaction_date': 'Unknown',           
    'discount_applied': 'No'                 
})

spark_df.show(truncate=False)


+--------------+-----------+-----------------------------+-------------+--------------+-----------------------+-----------+--------------+--------+----------------+----------------+
|transaction_id|customer_id|category_type                |item_type    |price_per_unit|no_of_product_purchased|total_spent|payment_method|address |transaction_date|discount_applied|
+--------------+-----------+-----------------------------+-------------+--------------+-----------------------+-----------+--------------+--------+----------------+----------------+
|TXN_6867343   |CUST_09    |Patisserie                   |Item_10_PAT  |18.5          |10.0                   |185.0      |Digital Wallet|Online  |2024-04-08      |true            |
|TXN_3731986   |CUST_22    |Milk Products                |Item_17_MILK |29.0          |9.0                    |261.0      |Digital Wallet|Online  |2023-07-23      |true            |
|TXN_9303719   |CUST_02    |Butchers                     |Item_12_BUT  |21.5          |2.0

In [57]:
# Filter out customers who haven't purchased any products (i.e., no_of_product_purchased = 0 or isNull)
spark_df = spark_df.filter((F.col('no_of_product_purchased') > 0) & (F.col('no_of_product_purchased').isNotNull()))

# Show the filtered DataFrame
spark_df.show(truncate=False)


+--------------+-----------+-----------------------------+-------------+--------------+-----------------------+-----------+--------------+--------+----------------+----------------+
|transaction_id|customer_id|category_type                |item_type    |price_per_unit|no_of_product_purchased|total_spent|payment_method|address |transaction_date|discount_applied|
+--------------+-----------+-----------------------------+-------------+--------------+-----------------------+-----------+--------------+--------+----------------+----------------+
|TXN_6867343   |CUST_09    |Patisserie                   |Item_10_PAT  |18.5          |10.0                   |185.0      |Digital Wallet|Online  |2024-04-08      |true            |
|TXN_3731986   |CUST_22    |Milk Products                |Item_17_MILK |29.0          |9.0                    |261.0      |Digital Wallet|Online  |2023-07-23      |true            |
|TXN_9303719   |CUST_02    |Butchers                     |Item_12_BUT  |21.5          |2.0

In [58]:
# Save the filtered DataFrame to CSV
#output_path = "/Users/abhishekkumar/Downloads/Project_Dataset/Retail/filtered_customers.csv"

#spark_df.write.option("header", "true").csv(output_path)


[Stage 72:>                                                         (0 + 1) / 1]                                                                                

In [59]:
# Save the DataFrame as Parquet file
#output_path_parquet = "/Users/abhishekkumar/Downloads/Project_Dataset/Retail/filtered_customers.parquet"
#spark_df.write.option("header", "true").parquet(output_path_parquet)


                                                                                

In [61]:
import os
from pyspark.sql import functions as F


# Specify output file path
output_path = "/Users/abhishekkumar/Downloads/Project_Dataset/Retail/filtered_customers.csv"
output_path = "/Users/abhishekkumar/Downloads/Project_Dataset/Retail/filtered_customers.parquet"
try:
    # Check if the directory exists, if not raise an exception
    if not os.path.exists(os.path.dirname(output_path)):
        raise FileNotFoundError(f"The directory {os.path.dirname(output_path)} does not exist.")

    # Save the DataFrame as CSV
    spark_df.write.option("header", "true").csv(output_path)

    # If everything goes fine, print success message
    print(f"File has been successfully saved to {output_path}")

except FileNotFoundError as fnf_error:
    print(f"Error: {fnf_error}")

except Exception as e:
    # Catch any other exceptions that may arise (e.g., file system issues, permissions, etc.)
    print(f"An error occurred while saving the file: {e}")


File has been successfully saved to /Users/abhishekkumar/Downloads/Project_Dataset/Retail/filtered_customers.parquet
