In [0]:
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, DoubleType, DateType
from pyspark.sql.functions import col, regexp_replace, split, year, month, to_date

# Initialize Spark Session
spark = SparkSession.builder.appName("NexGen_ETL_Pipeline").getOrCreate()

# Define the Schema
schema = StructType([
    StructField("customer_id", StringType(), True),
    StructField("product_id", StringType(), True),
    StructField("product_name", StringType(), True),
    StructField("category", StringType(), True),
    StructField("date_of_purchase", StringType(), True),  # We will convert it to Date
    StructField("amount", StringType(), True),  # We will convert ₹ to Double
    StructField("quantity", IntegerType(), True),
    StructField("location", StringType(), True)
])

# Define new file paths
input_csv_path = "dbfs:/FileStore/tables/NexGen/final_nexgen_data.csv"
cleaned_output_path = "dbfs:/FileStore/tables/NexGen_Cleaned"

#  Load CSV into a Spark DataFrame
df = spark.read.option("header", True).csv(input_csv_path)

In [0]:
# Define the correct schema and clean the data
df_cleaned = df \
    .withColumn("amount", regexp_replace(col("amount"), "₹", "")) \
    .withColumn("amount", col("amount").cast("double")) \
    .withColumn("customer_id", split(col("customer_id"), ",").getItem(0)) \
    .withColumn("year", year(col("date_of_purchase")).cast("integer")) \
    .withColumn("month", month(col("date_of_purchase")).cast("integer")) \
    .withColumn("quantity", col("quantity").cast("integer")) 

# Show cleaned data preview
df_cleaned.show(5)

+-----------+----------+------------+-----------+----------------+-------+--------+-------------+----+-----+
|customer_id|product_id|product_name|   category|date_of_purchase| amount|quantity|     location|year|month|
+-----------+----------+------------+-----------+----------------+-------+--------+-------------+----+-----+
|   CUST6597|   PROD477|     MacBook|Electronics|      2024-10-01|1581.29|    2000|San Francisco|2024|   10|
|   CUST9605|   PROD746|     MacBook|Electronics|      2024-10-30| 534.75|    4000|       Dallas|2024|   10|
|   CUST9329|   PROD278|      iPhone|Electronics|      2024-04-16|1358.19|    1000|      Chicago|2024|    4|
|   CUST6601|   PROD669|  Galaxy Tab|  Computers|      2024-12-20|1670.74|    5000|      Seattle|2024|   12|
|   CUST6616|   PROD392|        iPad|Electronics|      2024-04-15| 953.72|    1000|      Seattle|2024|    4|
+-----------+----------+------------+-----------+----------------+-------+--------+-------------+----+-----+
only showing top 5 

In [0]:
# Save as CSV in a new clean location
df_cleaned.write.format("csv") \
    .mode("overwrite") \
    .option("header", "true") \
    .save(cleaned_output_path)

print("Cleaned CSV saved at:", cleaned_output_path)

Cleaned CSV saved at: dbfs:/FileStore/tables/NexGen_Cleaned


In [0]:

# List files in the output directory
dbutils.fs.ls(cleaned_output_path)

Out[4]: [FileInfo(path='dbfs:/FileStore/tables/NexGen_Cleaned/_committed_1762984584231884728', name='_committed_1762984584231884728', size=212, modificationTime=1739624165000),
 FileInfo(path='dbfs:/FileStore/tables/NexGen_Cleaned/_committed_3948208819167917375', name='_committed_3948208819167917375', size=113, modificationTime=1739623835000),
 FileInfo(path='dbfs:/FileStore/tables/NexGen_Cleaned/_committed_4891935975649277998', name='_committed_4891935975649277998', size=197, modificationTime=1739640953000),
 FileInfo(path='dbfs:/FileStore/tables/NexGen_Cleaned/_committed_7164279139343481000', name='_committed_7164279139343481000', size=199, modificationTime=1739639938000),
 FileInfo(path='dbfs:/FileStore/tables/NexGen_Cleaned/_committed_vacuum7218355034752194416', name='_committed_vacuum7218355034752194416', size=96, modificationTime=1739639939000),
 FileInfo(path='dbfs:/FileStore/tables/NexGen_Cleaned/_started_4891935975649277998', name='_started_4891935975649277998', size=0, modifi

In [0]:
dbutils.fs.cp(cleaned_output_path, "dbfs:/FileStore/NexGen_Cleaned", recurse=True)

print("Download your cleaned CSV from:")
print("https://community.cloud.databricks.com/files/NexGen_Cleaned/")

Download your cleaned CSV from:
https://community.cloud.databricks.com/files/NexGen_Cleaned/


In [0]:
# Save as a SINGLE CSV file
df_cleaned.coalesce(1).write.format("csv") \
    .mode("overwrite") \
    .option("header", "true") \
    .save("dbfs:/FileStore/NexGen_Cleaned_Single")

print(" Cleaned CSV saved at: dbfs:/FileStore/NexGen_Cleaned_Single")


 Cleaned CSV saved at: dbfs:/FileStore/NexGen_Cleaned_Single


In [0]:
files = dbutils.fs.ls("dbfs:/FileStore/NexGen_Cleaned_Single")
csv_file = [f.path for f in files if f.name.endswith(".csv")][0]

# Move to a proper download location
dbutils.fs.cp(csv_file, "dbfs:/FileStore/final_nexgen_cleaned.csv")

print(" Download your cleaned CSV from:")
print("https://community.cloud.databricks.com/files/final_nexgen_cleaned.csv")

 Download your cleaned CSV from:
https://community.cloud.databricks.com/files/final_nexgen_cleaned.csv


Below 
Partitioning of the dataset so to have ease in analysis 

In [0]:
from pyspark.sql.functions import col
import shutil

# Define Paths
partitioned_output_path = "dbfs:/FileStore/NexGen_Partitioned_CSV"

# Filter Data for 2024 and 2025
df_filtered = df_cleaned.filter(col("year").isin([2024, 2025]))

# Repartition Data by Location, Year, and Month
df_partitioned = df_filtered.repartition("location", "year", "month")

# Save the Partitioned CSVs
df_partitioned.write.format("csv") \
    .mode("overwrite") \
    .option("header", "true") \
    .partitionBy("location", "year", "month") \
    .save(partitioned_output_path)

print(" Partitioned CSVs saved at:", partitioned_output_path)

 Partitioned CSVs saved at: dbfs:/FileStore/NexGen_Partitioned_CSV


In [0]:
# Function to Merge CSVs for Easy Download
def merge_csv_files(src_folder, dest_file):
    """Merges multiple part CSVs into a single file for download."""
    files = dbutils.fs.ls(src_folder)
    csv_files = [f.path for f in files if f.name.endswith(".csv")]
    
    # Combine all part files into one
    with open("/tmp/temp_combined.csv", "wb") as outfile:
        for file in csv_files:
            with open("/dbfs" + file[5:], "rb") as infile:
                outfile.write(infile.read())
    
    # Move merged file to FileStore for download
    dbutils.fs.cp("file:/tmp/temp_combined.csv", dest_file)
    print(f"✅ Downloadable CSV saved: {dest_file}")

# Merge & Move CSVs to FileStore for Download
download_links = []
locations = [f.name.split("=")[1] for f in dbutils.fs.ls(partitioned_output_path) if "location=" in f.name]

for location in locations:
    src_path = f"{partitioned_output_path}/location={location}"
    dest_path = f"dbfs:/FileStore/final_nexgen_{location}.csv"
    
    merge_csv_files(src_path, dest_path)
    
    download_links.append(f"https://community.cloud.databricks.com/files/final_nexgen_{location}.csv")

print("✅ Download Links for Each Location:")
for link in download_links:
    print(link)


✅ Downloadable CSV saved: dbfs:/FileStore/final_nexgen_Boston/.csv
✅ Downloadable CSV saved: dbfs:/FileStore/final_nexgen_Chicago/.csv
✅ Downloadable CSV saved: dbfs:/FileStore/final_nexgen_Dallas/.csv
✅ Downloadable CSV saved: dbfs:/FileStore/final_nexgen_Denver/.csv
✅ Downloadable CSV saved: dbfs:/FileStore/final_nexgen_Houston/.csv
✅ Downloadable CSV saved: dbfs:/FileStore/final_nexgen_Los Angeles/.csv
✅ Downloadable CSV saved: dbfs:/FileStore/final_nexgen_Miami/.csv
✅ Downloadable CSV saved: dbfs:/FileStore/final_nexgen_New York/.csv
✅ Downloadable CSV saved: dbfs:/FileStore/final_nexgen_San Francisco/.csv
✅ Downloadable CSV saved: dbfs:/FileStore/final_nexgen_Seattle/.csv
✅ Download Links for Each Location:
https://community.cloud.databricks.com/files/final_nexgen_Boston/.csv
https://community.cloud.databricks.com/files/final_nexgen_Chicago/.csv
https://community.cloud.databricks.com/files/final_nexgen_Dallas/.csv
https://community.cloud.databricks.com/files/final_nexgen_Denver/.c