In [1]:
from pyspark.sql import SparkSession

In [2]:
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, FloatType, DateType

In [3]:
from pyspark.sql.functions import col

In [4]:
from pyspark.sql import functions as f

In [5]:
spark = SparkSession.builder.appName("EcommerceSales_DataCleaningandAggregation").getOrCreate()

In [7]:
custom_schema = StructType([
    StructField("TransactionID", StringType(), True),
    StructField("ProductID", StringType(), True),
    StructField("ProductCategory", StringType(), True),
    StructField("Quantity", IntegerType(), True),
    StructField("Price", FloatType(), True),
    StructField("CustomerCity", StringType(), True),
    StructField("TransactionDate", DateType(), True)
])

In [9]:
df = spark.read.csv("E-commerce Sales Data - Sheet1.csv", schema=custom_schema)

In [10]:
df.show()

+-------------+---------+---------------+--------+-----+------------+---------------+
|TransactionID|ProductID|ProductCategory|Quantity|Price|CustomerCity|TransactionDate|
+-------------+---------+---------------+--------+-----+------------+---------------+
|TransactionID|ProductID|ProductCategory|    NULL| NULL|CustomerCity|           NULL|
|        T1001|     P001|    Electronics|      20|150.0|    New York|     2023-10-01|
|        T1002|     P005|        Apparel|      10| 45.5|      London|     2023-10-01|
|        T1003|     P010|        Apparel|      30| 20.0|    New York|     2023-10-02|
|        T1004|     P002|    Electronics|    NULL|300.0|       Tokyo|     2023-10-02|
|        T1005|     P007|          Books|      10|10.99|      London|     2023-10-03|
|        T1006|     P003|    Electronics|      50|150.0|    New York|     2023-10-03|
|        T1007|     P012|          Books|      20| 12.5|       Tokyo|     2023-10-04|
|        T1008|     P008|        Apparel|      10| NUL

In [17]:
df_valid_data = df.filter((col("Quantity") > 0) | col("Quantity").isNull())

In [18]:
df_valid_data.show()

+-------------+---------+---------------+--------+-----+------------+---------------+
|TransactionID|ProductID|ProductCategory|Quantity|Price|CustomerCity|TransactionDate|
+-------------+---------+---------------+--------+-----+------------+---------------+
|TransactionID|ProductID|ProductCategory|    NULL| NULL|CustomerCity|           NULL|
|        T1001|     P001|    Electronics|      20|150.0|    New York|     2023-10-01|
|        T1002|     P005|        Apparel|      10| 45.5|      London|     2023-10-01|
|        T1003|     P010|        Apparel|      30| 20.0|    New York|     2023-10-02|
|        T1004|     P002|    Electronics|    NULL|300.0|       Tokyo|     2023-10-02|
|        T1005|     P007|          Books|      10|10.99|      London|     2023-10-03|
|        T1006|     P003|    Electronics|      50|150.0|    New York|     2023-10-03|
|        T1007|     P012|          Books|      20| 12.5|       Tokyo|     2023-10-04|
|        T1008|     P008|        Apparel|      10| NUL

In [19]:
df_updated_quantity = df_valid_data.fillna(1, subset=["Quantity"])

In [20]:
df_updated_quantity.show()

+-------------+---------+---------------+--------+-----+------------+---------------+
|TransactionID|ProductID|ProductCategory|Quantity|Price|CustomerCity|TransactionDate|
+-------------+---------+---------------+--------+-----+------------+---------------+
|TransactionID|ProductID|ProductCategory|       1| NULL|CustomerCity|           NULL|
|        T1001|     P001|    Electronics|      20|150.0|    New York|     2023-10-01|
|        T1002|     P005|        Apparel|      10| 45.5|      London|     2023-10-01|
|        T1003|     P010|        Apparel|      30| 20.0|    New York|     2023-10-02|
|        T1004|     P002|    Electronics|       1|300.0|       Tokyo|     2023-10-02|
|        T1005|     P007|          Books|      10|10.99|      London|     2023-10-03|
|        T1006|     P003|    Electronics|      50|150.0|    New York|     2023-10-03|
|        T1007|     P012|          Books|      20| 12.5|       Tokyo|     2023-10-04|
|        T1008|     P008|        Apparel|      10| NUL

In [23]:
df_updated_quantity_price = df_updated_quantity.dropna(subset=["Price"])

In [24]:
df_updated_quantity_price.show()

+-------------+---------+---------------+--------+-----+------------+---------------+
|TransactionID|ProductID|ProductCategory|Quantity|Price|CustomerCity|TransactionDate|
+-------------+---------+---------------+--------+-----+------------+---------------+
|        T1001|     P001|    Electronics|      20|150.0|    New York|     2023-10-01|
|        T1002|     P005|        Apparel|      10| 45.5|      London|     2023-10-01|
|        T1003|     P010|        Apparel|      30| 20.0|    New York|     2023-10-02|
|        T1004|     P002|    Electronics|       1|300.0|       Tokyo|     2023-10-02|
|        T1005|     P007|          Books|      10|10.99|      London|     2023-10-03|
|        T1006|     P003|    Electronics|      50|150.0|    New York|     2023-10-03|
|        T1007|     P012|          Books|      20| 12.5|       Tokyo|     2023-10-04|
|        T1009|     P004|    Electronics|      20|450.0|    New York|     2023-10-05|
|        T1010|     P006|        Apparel|      10| 50.

In [25]:
df_total_per_transaction = df_updated_quantity_price.withColumn("total_per_transaction",
 (col("Quantity")*col("Price")))

In [26]:
df_total_per_transaction.show()

+-------------+---------+---------------+--------+-----+------------+---------------+---------------------+
|TransactionID|ProductID|ProductCategory|Quantity|Price|CustomerCity|TransactionDate|total_per_transaction|
+-------------+---------+---------------+--------+-----+------------+---------------+---------------------+
|        T1001|     P001|    Electronics|      20|150.0|    New York|     2023-10-01|               3000.0|
|        T1002|     P005|        Apparel|      10| 45.5|      London|     2023-10-01|                455.0|
|        T1003|     P010|        Apparel|      30| 20.0|    New York|     2023-10-02|                600.0|
|        T1004|     P002|    Electronics|       1|300.0|       Tokyo|     2023-10-02|                300.0|
|        T1005|     P007|          Books|      10|10.99|      London|     2023-10-03|           109.899994|
|        T1006|     P003|    Electronics|      50|150.0|    New York|     2023-10-03|               7500.0|
|        T1007|     P012|   

In [27]:
from pyspark.sql.window import Window

In [32]:
from pyspark.sql.functions import sum

In [38]:
window_spec = Window.partitionBy("TransactionDate")

In [39]:
df_total_per_day = df_total_per_transaction.withColumn("total_per_day",
                                                       sum(col("total_per_transaction")).over(window_spec))

In [40]:
df_result = df_total_per_day.select("TransactionDate","total_per_day").distinct().orderBy("TransactionDate")

In [41]:
df_result.show()

+---------------+--------------------+
|TransactionDate|       total_per_day|
+---------------+--------------------+
|     2023-10-01|1.7811416446777344E7|
|     2023-10-02|1.0622557099752426E7|
|     2023-10-03|2.0533203557634354E7|
|     2023-10-04|1.9511109635772705E7|
|     2023-10-05|1.6964503536132812E7|
+---------------+--------------------+

