In [None]:
!pip install pyspark




In [None]:
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, FloatType, DateType

schema = StructType([
    StructField("Invoice ID", StringType(), True),
    StructField("Branch", StringType(), True),
    StructField("City", StringType(), True),
    StructField("Customer type", StringType(), True),
    StructField("Gender", StringType(), True),
    StructField("Product line", StringType(), True),
    StructField("Unit price", FloatType(), True),
    StructField("Quantity", IntegerType(), True),
    StructField("Tax 5%", FloatType(), True),
    StructField("Total", FloatType(), True),
    StructField("Date", StringType(), True),  # Will convert to DateType later
    StructField("Time", StringType(), True),
    StructField("Payment", StringType(), True),
    StructField("cogs", FloatType(), True),
    StructField("gross margin percentage", FloatType(), True),
    StructField("gross income", FloatType(), True),
    StructField("Rating", FloatType(), True)
])


loading the data using the spark session

In [None]:
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("RetailSales").getOrCreate()

df = spark.read.csv("/content/supermarket_sales - Sheet1.csv", header=True, schema=schema)
df.show(5)


+-----------+------+---------+-------------+------+--------------------+----------+--------+-------+--------+---------+-----+-----------+------+-----------------------+------------+------+
| Invoice ID|Branch|     City|Customer type|Gender|        Product line|Unit price|Quantity| Tax 5%|   Total|     Date| Time|    Payment|  cogs|gross margin percentage|gross income|Rating|
+-----------+------+---------+-------------+------+--------------------+----------+--------+-------+--------+---------+-----+-----------+------+-----------------------+------------+------+
|750-67-8428|     A|   Yangon|       Member|Female|   Health and beauty|     74.69|       7|26.1415|548.9715| 1/5/2019|13:08|    Ewallet|522.83|              4.7619047|     26.1415|   9.1|
|226-31-3081|     C|Naypyitaw|       Normal|Female|Electronic access...|     15.28|       5|   3.82|   80.22| 3/8/2019|10:29|       Cash|  76.4|              4.7619047|        3.82|   9.6|
|631-41-3108|     A|   Yangon|       Normal|  Male|  Ho

Drop duplcates

In [None]:
df = df.drop_duplicates()
df.show(15)

+-----------+------+---------+-------------+------+--------------------+----------+--------+-------+--------+---------+-----+-----------+------+-----------------------+------------+------+
| Invoice ID|Branch|     City|Customer type|Gender|        Product line|Unit price|Quantity| Tax 5%|   Total|     Date| Time|    Payment|  cogs|gross margin percentage|gross income|Rating|
+-----------+------+---------+-------------+------+--------------------+----------+--------+-------+--------+---------+-----+-----------+------+-----------------------+------------+------+
|563-91-7120|     A|   Yangon|       Normal|Female| Fashion accessories|     61.77|       5|15.4425|324.2925| 3/8/2019|13:21|       Cash|308.85|              4.7619047|     15.4425|   6.7|
|633-09-3463|     C|Naypyitaw|       Normal|Female|Electronic access...|     47.65|       3| 7.1475|150.0975|3/28/2019|12:58|Credit card|142.95|              4.7619047|      7.1475|   9.5|
|689-16-9784|     C|Naypyitaw|       Normal|  Male|  Fo

In [None]:
df = df.drop("gross margin percentage")
df.show()

+-----------+------+---------+-------------+------+--------------------+----------+--------+-------+--------+---------+-----+-----------+------+------------+------+
| Invoice ID|Branch|     City|Customer type|Gender|        Product line|Unit price|Quantity| Tax 5%|   Total|     Date| Time|    Payment|  cogs|gross income|Rating|
+-----------+------+---------+-------------+------+--------------------+----------+--------+-------+--------+---------+-----+-----------+------+------------+------+
|563-91-7120|     A|   Yangon|       Normal|Female| Fashion accessories|     61.77|       5|15.4425|324.2925| 3/8/2019|13:21|       Cash|308.85|     15.4425|   6.7|
|633-09-3463|     C|Naypyitaw|       Normal|Female|Electronic access...|     47.65|       3| 7.1475|150.0975|3/28/2019|12:58|Credit card|142.95|      7.1475|   9.5|
|689-16-9784|     C|Naypyitaw|       Normal|  Male|  Food and beverages|     46.77|       6| 14.031| 294.651|3/11/2019|13:37|       Cash|280.62|      14.031|   6.0|
|676-39-60

converting date time datatype to date time from string and extracting month and day, date from the date

In [None]:
from pyspark.sql.functions import col, to_date, dayofweek, hour, to_timestamp

# Set Spark to use the legacy time parser
spark.conf.set("spark.sql.legacy.timeParserPolicy", "LEGACY")

# Convert "date" column to proper date format
df = df.withColumn("date", to_date(col("date"), "M/d/yyyy"))

# Extract day of the week
df = df.withColumn("day_of_week", dayofweek(col("date")))

# Convert "time" column to timestamp and extract hour
df = df.withColumn("hour", hour(to_timestamp(col("time"), "HH:mm")))

df.show()



+-----------+------+---------+-------------+------+--------------------+----------+--------+-------+--------+----------+-----+-----------+------+------------+------+-----------+----+
| Invoice ID|Branch|     City|Customer type|Gender|        Product line|Unit price|Quantity| Tax 5%|   Total|      date| Time|    Payment|  cogs|gross income|Rating|day_of_week|hour|
+-----------+------+---------+-------------+------+--------------------+----------+--------+-------+--------+----------+-----+-----------+------+------------+------+-----------+----+
|563-91-7120|     A|   Yangon|       Normal|Female| Fashion accessories|     61.77|       5|15.4425|324.2925|2019-03-08|13:21|       Cash|308.85|     15.4425|   6.7|          6|  13|
|633-09-3463|     C|Naypyitaw|       Normal|Female|Electronic access...|     47.65|       3| 7.1475|150.0975|2019-03-28|12:58|Credit card|142.95|      7.1475|   9.5|          5|  12|
|689-16-9784|     C|Naypyitaw|       Normal|  Male|  Food and beverages|     46.77|  

Remove the leading and tailing spaces and convert the columns to lower cases

In [None]:
from pyspark.sql.functions import col, lower, trim

text_columns = ["Branch", "City", "Gender"]

# Apply trimming and lowercase conversion using PySpark functions
df = df.select([trim(lower(col(c))).alias(c) if c in text_columns else col(c) for c in df.columns])
df.show()



+-----------+------+---------+-------------+------+--------------------+----------+--------+-------+--------+----------+-----+-----------+------+------------+------+-----------+----+
| Invoice ID|Branch|     City|Customer type|Gender|        Product line|Unit price|Quantity| Tax 5%|   Total|      date| Time|    Payment|  cogs|gross income|Rating|day_of_week|hour|
+-----------+------+---------+-------------+------+--------------------+----------+--------+-------+--------+----------+-----+-----------+------+------------+------+-----------+----+
|563-91-7120|     a|   yangon|       Normal|female| Fashion accessories|     61.77|       5|15.4425|324.2925|2019-03-08|13:21|       Cash|308.85|     15.4425|   6.7|          6|  13|
|633-09-3463|     c|naypyitaw|       Normal|female|Electronic access...|     47.65|       3| 7.1475|150.0975|2019-03-28|12:58|Credit card|142.95|      7.1475|   9.5|          5|  12|
|689-16-9784|     c|naypyitaw|       Normal|  male|  Food and beverages|     46.77|  

fixing numerical values

In [None]:
from pyspark.sql.functions import col

# Define numeric columns
num_columns = ["Unit price", "Tax 5%", "Total", "cogs", "gross margin percentage", "Rating"]

# Convert columns to float type
df = df.select([col(c).cast("float").alias(c) if c in num_columns else col(c) for c in df.columns])

# Verify schema to ensure columns are correctly cast to float
df.printSchema()

# Show results
df.show()


root
 |-- Invoice ID: string (nullable = true)
 |-- Branch: string (nullable = true)
 |-- City: string (nullable = true)
 |-- Customer type: string (nullable = true)
 |-- Gender: string (nullable = true)
 |-- Product line: string (nullable = true)
 |-- Unit price: float (nullable = true)
 |-- Quantity: integer (nullable = true)
 |-- Tax 5%: float (nullable = true)
 |-- Total: float (nullable = true)
 |-- date: date (nullable = true)
 |-- Time: string (nullable = true)
 |-- Payment: string (nullable = true)
 |-- cogs: float (nullable = true)
 |-- gross income: float (nullable = true)
 |-- Rating: float (nullable = true)
 |-- day_of_week: integer (nullable = true)
 |-- hour: integer (nullable = true)

+-----------+------+---------+-------------+------+--------------------+----------+--------+-------+--------+----------+-----+-----------+------+------------+------+-----------+----+
| Invoice ID|Branch|     City|Customer type|Gender|        Product line|Unit price|Quantity| Tax 5%|   Total

Handling outliers

In [None]:
df = df[(df["rating"] >= 1) & (df["rating"] <= 10)]
df = df[(df["quantity"] >= 1) & (df["quantity"] <= 50)]
df = df[df["total"] > 0]  # Remove rows with total = 0
df.show(10)

+-----------+------+---------+-------------+------+--------------------+----------+--------+-------+--------+----------+-----+-----------+------+------------+------+-----------+----+
| Invoice ID|Branch|     City|Customer type|Gender|        Product line|Unit price|Quantity| Tax 5%|   Total|      date| Time|    Payment|  cogs|gross income|Rating|day_of_week|hour|
+-----------+------+---------+-------------+------+--------------------+----------+--------+-------+--------+----------+-----+-----------+------+------------+------+-----------+----+
|563-91-7120|     a|   yangon|       Normal|female| Fashion accessories|     61.77|       5|15.4425|324.2925|2019-03-08|13:21|       Cash|308.85|     15.4425|   6.7|          6|  13|
|633-09-3463|     c|naypyitaw|       Normal|female|Electronic access...|     47.65|       3| 7.1475|150.0975|2019-03-28|12:58|Credit card|142.95|      7.1475|   9.5|          5|  12|
|689-16-9784|     c|naypyitaw|       Normal|  male|  Food and beverages|     46.77|  

caluclating profit margin

In [None]:
from pyspark.sql.functions import when

# Avoid division by zero
df = df.withColumn(
    "profit_margin",
    when(col("Total") != 0, col("gross income") / col("Total")).otherwise(None)
)

df.show(10)


+-----------+------+---------+-------------+------+--------------------+----------+--------+-------+--------+----------+-----+-----------+------+------------+------+-----------+----+--------------------+
| Invoice ID|Branch|     City|Customer type|Gender|        Product line|Unit price|Quantity| Tax 5%|   Total|      date| Time|    Payment|  cogs|gross income|Rating|day_of_week|hour|       profit_margin|
+-----------+------+---------+-------------+------+--------------------+----------+--------+-------+--------+----------+-----+-----------+------+------------+------+-----------+----+--------------------+
|563-91-7120|     a|   yangon|       Normal|female| Fashion accessories|     61.77|       5|15.4425|324.2925|2019-03-08|13:21|       Cash|308.85|     15.4425|   6.7|          6|  13|  0.0476190463587115|
|633-09-3463|     c|naypyitaw|       Normal|female|Electronic access...|     47.65|       3| 7.1475|150.0975|2019-03-28|12:58|Credit card|142.95|      7.1475|   9.5|          5|  12| 0

identify the rush hours

In [None]:
from pyspark.sql.functions import col, when

# Create 'rush_hour' column based on conditions
df = df.withColumn(
    "rush_hour",
    when(col("hour").isin([12, 13, 17, 18]), "yes").otherwise("no")
)

# Show the first 10 rows
df.show(10)


+-----------+------+---------+-------------+------+--------------------+----------+--------+-------+--------+----------+-----+-----------+------+------------+------+-----------+----+--------------------+---------+
| Invoice ID|Branch|     City|Customer type|Gender|        Product line|Unit price|Quantity| Tax 5%|   Total|      date| Time|    Payment|  cogs|gross income|Rating|day_of_week|hour|       profit_margin|rush_hour|
+-----------+------+---------+-------------+------+--------------------+----------+--------+-------+--------+----------+-----+-----------+------+------------+------+-----------+----+--------------------+---------+
|563-91-7120|     a|   yangon|       Normal|female| Fashion accessories|     61.77|       5|15.4425|324.2925|2019-03-08|13:21|       Cash|308.85|     15.4425|   6.7|          6|  13|  0.0476190463587115|      yes|
|633-09-3463|     c|naypyitaw|       Normal|female|Electronic access...|     47.65|       3| 7.1475|150.0975|2019-03-28|12:58|Credit card|142.95

Categorise customer spending

In [None]:
from pyspark.sql.functions import col, when

# Create 'spending_category' column based on the 'total' column
df = df.withColumn(
    "spending_category",
    when(col("total") <= 100, "low")
    .when((col("total") > 100) & (col("total") <= 500), "medium")
    .otherwise("high")
)

# Show first 10 rows
df.show(10)




+-----------+------+---------+-------------+------+--------------------+----------+--------+-------+--------+----------+-----+-----------+------+------------+------+-----------+----+--------------------+---------+-----------------+
| Invoice ID|Branch|     City|Customer type|Gender|        Product line|Unit price|Quantity| Tax 5%|   Total|      date| Time|    Payment|  cogs|gross income|Rating|day_of_week|hour|       profit_margin|rush_hour|spending_category|
+-----------+------+---------+-------------+------+--------------------+----------+--------+-------+--------+----------+-----+-----------+------+------------+------+-----------+----+--------------------+---------+-----------------+
|563-91-7120|     a|   yangon|       Normal|female| Fashion accessories|     61.77|       5|15.4425|324.2925|2019-03-08|13:21|       Cash|308.85|     15.4425|   6.7|          6|  13|  0.0476190463587115|      yes|           medium|
|633-09-3463|     c|naypyitaw|       Normal|female|Electronic access...|

  Remove the special characters from the specific column

removing the special charcaters

In [None]:
from pyspark.sql.functions import regexp_replace

# Define a function to clean special characters from all string columns
def remove_special_chars(df):
    for col_name in df.columns:
        df = df.withColumn(col_name, regexp_replace(col(col_name), r"[^a-zA-Z0-9\s]", ""))
    return df

# Apply function to clean dataset
df = remove_special_chars(df)

# Show cleaned dataset
df.show(truncate=False)


+----------+------+---------+-------------+------+----------------------+----------+--------+------+-------+--------+----+-----------+-----+------------+------+-----------+----+-------------------+---------+-----------------+
|Invoice ID|Branch|City     |Customer type|Gender|Product line          |Unit price|Quantity|Tax 5%|Total  |date    |Time|Payment    |cogs |gross income|Rating|day_of_week|hour|profit_margin      |rush_hour|spending_category|
+----------+------+---------+-------------+------+----------------------+----------+--------+------+-------+--------+----+-----------+-----+------------+------+-----------+----+-------------------+---------+-----------------+
|563917120 |a     |yangon   |Normal       |female|Fashion accessories   |6177      |5       |154425|3242925|20190308|1321|Cash       |30885|154425      |67    |6          |13  |00476190463587115  |yes      |medium           |
|633093463 |c     |naypyitaw|Normal       |female|Electronic accessories|4765      |3       |714

checking the data types and null values

In [None]:
from pyspark.sql.functions import col, sum

# Check for missing values in each column
print("Missing Values:")
df.select([sum(col(c).isNull().cast("int")).alias(c) for c in df.columns]).show()

# Check data types of each column
print("\nData Types:")
df.printSchema()

# Show first few rows
df.show(5)


Missing Values:
+----------+------+----+-------------+------+------------+----------+--------+------+-----+----+----+-------+----+------------+------+-----------+----+-------------+---------+-----------------+
|Invoice ID|Branch|City|Customer type|Gender|Product line|Unit price|Quantity|Tax 5%|Total|date|Time|Payment|cogs|gross income|Rating|day_of_week|hour|profit_margin|rush_hour|spending_category|
+----------+------+----+-------------+------+------------+----------+--------+------+-----+----+----+-------+----+------------+------+-----------+----+-------------+---------+-----------------+
|         0|     0|   0|            0|     0|           0|         0|       0|     0|    0|   0|   0|      0|   0|           0|     0|          0|   0|            0|        0|                0|
+----------+------+----+-------------+------+------------+----------+--------+------+-----+----+----+-------+----+------------+------+-----------+----+-------------+---------+-----------------+


Data Types:


Monhly sales trend

In [None]:
from pyspark.sql.functions import year, month, sum

monthly_sales = df.groupBy((df["spending_category"]).alias("spending_Range"))\
                  .agg(sum("Total").alias("Total Revenue"))
monthly_sales.show()


+--------------+-------------+
|spending_Range|Total Revenue|
+--------------+-------------+
|           low|  4.9841379E7|
|          high| 3.96386655E8|
|        medium| 4.30843812E8|
+--------------+-------------+



top selling product categories

In [None]:
top_products = df.groupBy("Product line")\
                 .agg(sum("Quantity").alias("Total Sold"))\
                 .orderBy("Total Sold", ascending=False)

top_products.show()


+--------------------+----------+
|        Product line|Total Sold|
+--------------------+----------+
|Electronic access...|     971.0|
|  Food and beverages|     952.0|
|   Sports and travel|     920.0|
|  Home and lifestyle|     911.0|
| Fashion accessories|     902.0|
|   Health and beauty|     854.0|
+--------------------+----------+



best performance store loactions

In [None]:
store_performance = df.groupBy("City")\
                      .agg(sum("Total").alias("Total Sales"))\
                      .orderBy("Total Sales", ascending=False)

store_performance.show()


+---------+------------+
|     City| Total Sales|
+---------+------------+
|naypyitaw|3.03677094E8|
| mandalay|2.91910206E8|
|   yangon|2.81484546E8|
+---------+------------+



Average rating by each store location

In [None]:
from pyspark.sql.functions import avg

avg_ratings = df.groupBy("City")\
                .agg(avg("Rating").alias("Avg Rating"))\
                .orderBy("Avg Rating", ascending=False)

avg_ratings.show()


+---------+-----------------+
|     City|       Avg Rating|
+---------+-----------------+
|naypyitaw|70.72865853658537|
|   yangon|70.27058823529411|
| mandalay|68.18072289156626|
+---------+-----------------+



categorise rating

In [None]:
from pyspark.sql.functions import col, when, avg

# Categorize rating into sentiment
df = df.withColumn(
    "rating_sentiment",
    when(col("rating") <= 40, "negative")
    .when(col("rating") <= 70, "neutral")
    .otherwise("positive")
)

# Compute the average rating per product line
rating_avg = df.groupBy("product line").agg(avg("rating").alias("avg_product_rating"))

# Merge the average rating back to the original DataFrame
df = df.join(rating_avg, on="product line", how="left")

# Show results
df.show()



+--------------------+----------+------+---------+-------------+------+----------+--------+------+-------+--------+----+-----------+-----+------------+------+-----------+----+-------------------+---------+-----------------+----------------+------------------+
|        Product line|Invoice ID|Branch|     City|Customer type|Gender|Unit price|Quantity|Tax 5%|  Total|    date|Time|    Payment| cogs|gross income|Rating|day_of_week|hour|      profit_margin|rush_hour|spending_category|rating_sentiment|avg_product_rating|
+--------------------+----------+------+---------+-------------+------+----------+--------+------+-------+--------+----+-----------+-----+------------+------+-----------+----+-------------------+---------+-----------------+----------------+------------------+
| Fashion accessories| 563917120|     a|   yangon|       Normal|female|      6177|       5|154425|3242925|20190308|1321|       Cash|30885|      154425|    67|          6|  13|  00476190463587115|      yes|           medi

In [None]:
from pyspark.sql.functions import sum, avg, count

# Compute payment summary
payment_summary = df.groupBy("payment").agg(
    sum("total").alias("total_revenue"),
    avg("total").alias("avg_transaction"),
    count("payment").alias("num_transactions")
)

# Join the summary back to the original dataframe
df = df.join(payment_summary, on="payment", how="left")

# Show the results
df.show()


+-----------+--------------------+----------+------+---------+-------------+------+----------+--------+------+-------+--------+----+-----+------------+------+-----------+----+-------------------+---------+-----------------+----------------+------------------+-------------+-----------------+----------------+
|    Payment|        Product line|Invoice ID|Branch|     City|Customer type|Gender|Unit price|Quantity|Tax 5%|  Total|    date|Time| cogs|gross income|Rating|day_of_week|hour|      profit_margin|rush_hour|spending_category|rating_sentiment|avg_product_rating|total_revenue|  avg_transaction|num_transactions|
+-----------+--------------------+----------+------+---------+-------------+------+----------+--------+------+-------+--------+----+-----+------------+------+-----------+----+-------------------+---------+-----------------+----------------+------------------+-------------+-----------------+----------------+
|       Cash| Fashion accessories| 563917120|     a|   yangon|       Norm

In [None]:


from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [None]:
df.write.csv("/content/drive/My Drive/transformed_supermarket_sales", header=True, mode="overwrite")