# Data Ingestion

In [1]:
from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .appName("Data Preprocessing Online Store UK") \
    .config("spark.jars.packages", "org.apache.hadoop:hadoop-aws:3.3.1") \
    .config("spark.sql.legacy.timeParserPolicy", "LEGACY") \
    .getOrCreate()

hadoop_conf = spark._jsc.hadoopConfiguration()
hadoop_conf.set("fs.s3a.endpoint", "s3.amazonaws.com")

s3_path = f"s3a://project-data-alvan/final_data_v1.csv"

try:
    df = spark.read.csv(s3_path, header=True, inferSchema=True)
    print("Dataset loaded successfully!")
except Exception as e:
    print(f"Error loading dataset: {e}")
    spark.stop()
    exit()

print("Schema of the dataset:")
df.printSchema()

:: loading settings :: url = jar:file:/home/ubuntu/venv/lib/python3.12/site-packages/pyspark/jars/ivy-2.5.1.jar!/org/apache/ivy/core/settings/ivysettings.xml


Ivy Default Cache set to: /home/ubuntu/.ivy2/cache
The jars for the packages stored in: /home/ubuntu/.ivy2/jars
org.apache.hadoop#hadoop-aws added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-f1b3bc02-247f-428e-8471-e8e689cb0f47;1.0
	confs: [default]
	found org.apache.hadoop#hadoop-aws;3.3.1 in central
	found com.amazonaws#aws-java-sdk-bundle;1.11.901 in central
	found org.wildfly.openssl#wildfly-openssl;1.0.7.Final in central
:: resolution report :: resolve 597ms :: artifacts dl 25ms
	:: modules in use:
	com.amazonaws#aws-java-sdk-bundle;1.11.901 from central in [default]
	org.apache.hadoop#hadoop-aws;3.3.1 from central in [default]
	org.wildfly.openssl#wildfly-openssl;1.0.7.Final from central in [default]
	---------------------------------------------------------------------
	|                  |            modules            ||   artifacts   |
	|       conf       | number| search|dwnlded|evicted|| number|dwnlded|
	--------------------------------

Dataset loaded successfully!
Schema of the dataset:
root
 |-- InvoiceNo: string (nullable = true)
 |-- StockCode: string (nullable = true)
 |-- Description: string (nullable = true)
 |-- Quantity: integer (nullable = true)
 |-- InvoiceDate: string (nullable = true)
 |-- UnitPrice: double (nullable = true)
 |-- CustomerID: double (nullable = true)
 |-- Country: string (nullable = true)



In [3]:
# Displaying the first 5 rows of the data
df.show(5)

+---------+---------+--------------------+--------+----------------+---------+----------+--------------+
|InvoiceNo|StockCode|         Description|Quantity|     InvoiceDate|UnitPrice|CustomerID|       Country|
+---------+---------+--------------------+--------+----------------+---------+----------+--------------+
|   543451|    22167| OVAL WALL MIRROR...|       1|  2/8/2011 12:13|    19.96|      NULL|United Kingdom|
|   577522|    22944|CHRISTMAS METAL P...|       6|11/20/2011 13:23|     0.39|   15988.0|United Kingdom|
|   580367|    22284|HEN HOUSE DECORATION|       1| 12/2/2011 16:39|     3.29|      NULL|United Kingdom|
|   576245|    23569|TRADTIONAL ALPHAB...|       4|11/14/2011 13:40|     4.95|   12553.0|        France|
|   578293|    22086|PAPER CHAIN KIT 5...|      12|11/23/2011 14:36|     2.95|   15640.0|United Kingdom|
+---------+---------+--------------------+--------+----------------+---------+----------+--------------+
only showing top 5 rows



# Data Preprocessing

## Data Cleaning

In [4]:
df.describe().show()

24/12/09 10:08:01 WARN SparkStringUtils: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.
[Stage 7:>                                                          (0 + 1) / 1]

+-------+------------------+------------------+--------------------+------------------+---------------+-----------------+------------------+-----------+
|summary|         InvoiceNo|         StockCode|         Description|          Quantity|    InvoiceDate|        UnitPrice|        CustomerID|    Country|
+-------+------------------+------------------+--------------------+------------------+---------------+-----------------+------------------+-----------+
|  count|              4064|              4064|                4056|              4064|           4064|             4064|              3075|       4064|
|   mean| 560124.2974191932|27645.306286942505|                NULL|  9.00492125984252|           NULL|4.067093996063008|15360.709268292683|       NULL|
| stddev|13290.537207394751| 16692.57366634167|                NULL|22.930074346124094|           NULL|38.23537435510261|1705.3954473312326|       NULL|
|    min|            536367|             10002| OVAL WALL MIRROR...|              

                                                                                

In [5]:
from pyspark.sql import functions as F

# Count null values in each column
null_counts = df.select([F.sum(F.col(c).isNull().cast("int")).alias(c) for c in df.columns])
null_counts.show()

+---------+---------+-----------+--------+-----------+---------+----------+-------+
|InvoiceNo|StockCode|Description|Quantity|InvoiceDate|UnitPrice|CustomerID|Country|
+---------+---------+-----------+--------+-----------+---------+----------+-------+
|        0|        0|          8|       0|          0|        0|       989|      0|
+---------+---------+-----------+--------+-----------+---------+----------+-------+



In [2]:
# Drop rows with any null values in any column
df = df.dropna()

In [7]:
df.show(5)

+---------+---------+--------------------+--------+----------------+---------+----------+--------------+
|InvoiceNo|StockCode|         Description|Quantity|     InvoiceDate|UnitPrice|CustomerID|       Country|
+---------+---------+--------------------+--------+----------------+---------+----------+--------------+
|   577522|    22944|CHRISTMAS METAL P...|       6|11/20/2011 13:23|     0.39|   15988.0|United Kingdom|
|   576245|    23569|TRADTIONAL ALPHAB...|       4|11/14/2011 13:40|     4.95|   12553.0|        France|
|   578293|    22086|PAPER CHAIN KIT 5...|      12|11/23/2011 14:36|     2.95|   15640.0|United Kingdom|
|   573248|    23247|BISCUIT TIN 50'S ...|       2|10/28/2011 12:09|     2.89|   14498.0|United Kingdom|
|  C569985|    22617|BAKING SET SPACEB...|      -3| 10/6/2011 19:51|     4.95|   15365.0|United Kingdom|
+---------+---------+--------------------+--------+----------------+---------+----------+--------------+
only showing top 5 rows



In [8]:
df.describe().show()

+-------+------------------+------------------+--------------------+------------------+---------------+-----------------+------------------+-----------+
|summary|         InvoiceNo|         StockCode|         Description|          Quantity|    InvoiceDate|        UnitPrice|        CustomerID|    Country|
+-------+------------------+------------------+--------------------+------------------+---------------+-----------------+------------------+-----------+
|  count|              3075|              3075|                3075|              3075|           3075|             3075|              3075|       3075|
|   mean| 560810.9327787021| 27448.25352112676|                NULL|11.038373983739838|           NULL|3.790188617886225|15360.709268292683|       NULL|
| stddev|12995.291739278331|16305.236310429387|                NULL|24.238442717258327|           NULL|43.21421695917299|1705.3954473312326|       NULL|
|    min|            536367|             10120| OVAL WALL MIRROR...|              

## Data Transformation

In [3]:
from pyspark.sql.functions import to_timestamp, month, year, col, round

In [4]:
# Convert 'InvoiceDate' from string to timestamp using the specified format
df = df.withColumn('InvoiceDate', to_timestamp(col('InvoiceDate'), 'MM/dd/yyyy HH:mm'))

In [5]:
# Extract Month and Year
df = df.withColumn('Month', month(col('InvoiceDate')))
df = df.withColumn('Year', year(col('InvoiceDate')))

In [7]:
# Show the updated DataFrame
df.show(5)

+---------+---------+--------------------+--------+-------------------+---------+----------+--------------+-----+----+
|InvoiceNo|StockCode|         Description|Quantity|        InvoiceDate|UnitPrice|CustomerID|       Country|Month|Year|
+---------+---------+--------------------+--------+-------------------+---------+----------+--------------+-----+----+
|   577522|    22944|CHRISTMAS METAL P...|       6|2011-11-20 13:23:00|     0.39|   15988.0|United Kingdom|   11|2011|
|   576245|    23569|TRADTIONAL ALPHAB...|       4|2011-11-14 13:40:00|     4.95|   12553.0|        France|   11|2011|
|   578293|    22086|PAPER CHAIN KIT 5...|      12|2011-11-23 14:36:00|     2.95|   15640.0|United Kingdom|   11|2011|
|   573248|    23247|BISCUIT TIN 50'S ...|       2|2011-10-28 12:09:00|     2.89|   14498.0|United Kingdom|   10|2011|
|  C569985|    22617|BAKING SET SPACEB...|      -3|2011-10-06 19:51:00|     4.95|   15365.0|United Kingdom|   10|2011|
+---------+---------+--------------------+------

                                                                                

In [6]:
from pyspark.sql import functions as F

# Convert numeric month to alphabetic month name
month_dict = {
    1: 'January', 2: 'February', 3: 'March', 4: 'April', 5: 'May', 6: 'June',
    7: 'July', 8: 'August', 9: 'September', 10: 'October', 11: 'November', 12: 'December'
}

df = df.withColumn("MonthName", F.expr("CASE WHEN month(InvoiceDate) = 1 THEN 'January' " + 
                                                    "WHEN month(InvoiceDate) = 2 THEN 'February' " +
                                                    "WHEN month(InvoiceDate) = 3 THEN 'March' " +
                                                    "WHEN month(InvoiceDate) = 4 THEN 'April' " +
                                                    "WHEN month(InvoiceDate) = 5 THEN 'May' " +
                                                    "WHEN month(InvoiceDate) = 6 THEN 'June' " +
                                                    "WHEN month(InvoiceDate) = 7 THEN 'July' " +
                                                    "WHEN month(InvoiceDate) = 8 THEN 'August' " +
                                                    "WHEN month(InvoiceDate) = 9 THEN 'September' " +
                                                    "WHEN month(InvoiceDate) = 10 THEN 'October' " +
                                                    "WHEN month(InvoiceDate) = 11 THEN 'November' " +
                                                    "WHEN month(InvoiceDate) = 12 THEN 'December' END"))

df.show()

+---------+---------+--------------------+--------+-------------------+---------+----------+--------------+-----+----+---------+
|InvoiceNo|StockCode|         Description|Quantity|        InvoiceDate|UnitPrice|CustomerID|       Country|Month|Year|MonthName|
+---------+---------+--------------------+--------+-------------------+---------+----------+--------------+-----+----+---------+
|   577522|    22944|CHRISTMAS METAL P...|       6|2011-11-20 13:23:00|     0.39|   15988.0|United Kingdom|   11|2011| November|
|   576245|    23569|TRADTIONAL ALPHAB...|       4|2011-11-14 13:40:00|     4.95|   12553.0|        France|   11|2011| November|
|   578293|    22086|PAPER CHAIN KIT 5...|      12|2011-11-23 14:36:00|     2.95|   15640.0|United Kingdom|   11|2011| November|
|   573248|    23247|BISCUIT TIN 50'S ...|       2|2011-10-28 12:09:00|     2.89|   14498.0|United Kingdom|   10|2011|  October|
|  C569985|    22617|BAKING SET SPACEB...|      -3|2011-10-06 19:51:00|     4.95|   15365.0|Unite

In [7]:
df=df.drop('Month')

In [8]:
df = df.withColumnRenamed("MonthName", "Month")

In [9]:
# Creating column revenue
df = df.withColumn("Revenue", round(col("Quantity") * col("UnitPrice"),2))

In [10]:
df.show(5)

+---------+---------+--------------------+--------+-------------------+---------+----------+--------------+----+--------+-------+
|InvoiceNo|StockCode|         Description|Quantity|        InvoiceDate|UnitPrice|CustomerID|       Country|Year|   Month|Revenue|
+---------+---------+--------------------+--------+-------------------+---------+----------+--------------+----+--------+-------+
|   577522|    22944|CHRISTMAS METAL P...|       6|2011-11-20 13:23:00|     0.39|   15988.0|United Kingdom|2011|November|   2.34|
|   576245|    23569|TRADTIONAL ALPHAB...|       4|2011-11-14 13:40:00|     4.95|   12553.0|        France|2011|November|   19.8|
|   578293|    22086|PAPER CHAIN KIT 5...|      12|2011-11-23 14:36:00|     2.95|   15640.0|United Kingdom|2011|November|   35.4|
|   573248|    23247|BISCUIT TIN 50'S ...|       2|2011-10-28 12:09:00|     2.89|   14498.0|United Kingdom|2011| October|   5.78|
|  C569985|    22617|BAKING SET SPACEB...|      -3|2011-10-06 19:51:00|     4.95|   15365.

In [12]:
#Storing preprocessed to local
df.write.csv("processed_data", header=True)

                                                                                

## Data Aggregation

### 1. Total Revenue by Country

In [10]:
from pyspark.sql.functions import sum

# Calculate total revenue by Country
total_revenue_by_region = df.groupBy("Country") \
    .agg(sum("Revenue").alias("TotalRevenue")) \
    .orderBy(col("TotalRevenue").desc())

total_revenue_by_region.show()

[Stage 3:>                                                          (0 + 1) / 1]

+---------------+------------------+
|        Country|      TotalRevenue|
+---------------+------------------+
| United Kingdom| 51720.96000000006|
|           EIRE|2108.0899999999997|
|        Germany|1674.2599999999998|
|         France|1625.3500000000006|
|    Netherlands|1615.0799999999997|
|      Australia|           1277.53|
|    Switzerland|360.97999999999996|
|          Spain|286.65999999999997|
|Channel Islands|            205.75|
|        Belgium|            173.62|
|          Italy|138.29000000000002|
|         Norway|            136.89|
|       Portugal|            129.38|
|        Denmark|             123.0|
|         Sweden|             92.59|
|        Finland|             83.58|
|         Cyprus|              53.6|
|         Poland|              51.4|
|          Japan|48.650000000000006|
| Czech Republic|              45.9|
+---------------+------------------+
only showing top 20 rows



                                                                                

### 2. Revenue Growth Over the years

In [11]:
revenue_growth = df.groupBy("Year") \
    .agg(sum("Revenue").alias("TotalRevenue")) \
    .orderBy("Year")

revenue_growth.show()

+----+-----------------+
|Year|     TotalRevenue|
+----+-----------------+
|2010|3955.940000000002|
|2011|55840.30000000006|
+----+-----------------+



### 3. Average Revenue by Month

In [12]:
from pyspark.sql.functions import avg

# Calculate average monthly revenue by region
avg_monthly_revenue_by_region = df.groupBy("Country", "Year", "Month") \
    .agg(sum("Revenue").alias("MonthlyRevenue")) \
    .groupBy("Country") \
    .agg(avg("MonthlyRevenue").alias("AvgMonthlyRevenue")) \
    .orderBy(col("AvgMonthlyRevenue").desc())

avg_monthly_revenue_by_region.show()

+---------------+------------------+
|        Country| AvgMonthlyRevenue|
+---------------+------------------+
| United Kingdom|3978.5353846153853|
|    Netherlands|230.72571428571428|
|           EIRE| 162.1607692307692|
|      Australia|159.69125000000003|
|        Germany|128.78923076923078|
|         France|125.02692307692307|
|    Switzerland|51.568571428571424|
| Czech Republic|              45.9|
|Channel Islands|             41.15|
|        Denmark|              41.0|
|      Lithuania|              35.4|
|            USA|              32.4|
|       Portugal|            32.345|
|        Belgium|28.936666666666667|
|          Spain|28.666000000000004|
|          Italy|27.658000000000005|
|         Norway|27.377999999999997|
|         Canada|             26.52|
|         Poland|              25.7|
|         Sweden|           23.1475|
+---------------+------------------+
only showing top 20 rows



### 4. Average transaction value per customer

In [13]:
from pyspark.sql.functions import count
avg_transaction_value = df.groupBy("CustomerID") \
    .agg((sum("Revenue") / count("InvoiceNo")).alias("AvgTransactionValue")) \
    .orderBy(col("AvgTransactionValue").desc())

avg_transaction_value.show()

+----------+-------------------+
|CustomerID|AvgTransactionValue|
+----------+-------------------+
|   16041.0|              675.0|
|   14145.0|              594.0|
|   18102.0|             562.66|
|   12931.0|              507.5|
|   17389.0|             414.48|
|   15769.0|              358.0|
|   14031.0|             343.76|
|   13629.0|              330.0|
|   12939.0|             325.44|
|   17396.0|              246.1|
|   14154.0|              195.0|
|   16684.0| 186.42399999999998|
|   18092.0|              180.0|
|   18064.0|              179.0|
|   14607.0|              179.0|
|   13093.0|              175.2|
|   13798.0|              172.5|
|   16553.0|              169.5|
|   16029.0|              163.2|
|   14680.0|            154.375|
+----------+-------------------+
only showing top 20 rows



### 5. Total Quantity Sold per product

In [14]:
# Calculate total quantity sold per product
top_products = df.groupBy("Description") \
    .agg(sum("Quantity").alias("TotalQuantitySold")) \
    .orderBy(col("TotalQuantitySold").desc()) \
    .limit(10)

top_products.show()

+--------------------+-----------------+
|         Description|TotalQuantitySold|
+--------------------+-----------------+
|SET OF 60 I LOVE ...|              537|
|JUMBO BAG RED RET...|              448|
|LUNCH BAG ALPHABE...|              392|
|WOODLAND CHARLOTT...|              310|
|DOORMAT KEEP CALM...|              306|
|DOORMAT UNION JAC...|              304|
|MEDIUM CERAMIC TO...|              300|
|PACK OF 72 RETROS...|              277|
|PLASTERS IN TIN S...|              263|
|JAZZ HEARTS ADDRE...|              248|
+--------------------+-----------------+



## Store Data Back to S3

### Storing preprocessed data in S3

In [21]:
df.write.csv("s3a://project-data-alvan/processed_data",header=True)

24/12/14 21:36:21 WARN AbstractS3ACommitterFactory: Using standard FileOutputCommitter to commit work. This is slow and potentially unsafe.
24/12/14 21:36:22 WARN AbstractS3ACommitterFactory: Using standard FileOutputCommitter to commit work. This is slow and potentially unsafe.
                                                                                

### Storing aggregated data in S3

In [16]:
top_products.write.csv("s3a://project-data-alvan/aggregated_data/top_products",header=True)

24/12/14 21:30:50 WARN AbstractS3ACommitterFactory: Using standard FileOutputCommitter to commit work. This is slow and potentially unsafe.
24/12/14 21:30:51 WARN AbstractS3ACommitterFactory: Using standard FileOutputCommitter to commit work. This is slow and potentially unsafe.
                                                                                

In [17]:
avg_transaction_value.write.csv("s3a://project-data-alvan/aggregated_data/avg_transaction_value",header=True)

24/12/14 21:31:30 WARN AbstractS3ACommitterFactory: Using standard FileOutputCommitter to commit work. This is slow and potentially unsafe.
24/12/14 21:31:30 WARN AbstractS3ACommitterFactory: Using standard FileOutputCommitter to commit work. This is slow and potentially unsafe.
                                                                                

In [18]:
avg_monthly_revenue_by_region.write.csv("s3a://project-data-alvan/aggregated_data/avg_monthly_revenue_by_region",header=True)

24/12/14 21:32:05 WARN AbstractS3ACommitterFactory: Using standard FileOutputCommitter to commit work. This is slow and potentially unsafe.
24/12/14 21:32:05 WARN AbstractS3ACommitterFactory: Using standard FileOutputCommitter to commit work. This is slow and potentially unsafe.
                                                                                

In [19]:
revenue_growth.write.csv("s3a://project-data-alvan/aggregated_data/revenue_growth",header=True)

24/12/14 21:32:54 WARN AbstractS3ACommitterFactory: Using standard FileOutputCommitter to commit work. This is slow and potentially unsafe.
24/12/14 21:33:00 WARN AbstractS3ACommitterFactory: Using standard FileOutputCommitter to commit work. This is slow and potentially unsafe.
                                                                                

In [20]:
total_revenue_by_region.write.csv("s3a://project-data-alvan/aggregated_data/total_revenue_by_region",header=True)

24/12/14 21:33:42 WARN AbstractS3ACommitterFactory: Using standard FileOutputCommitter to commit work. This is slow and potentially unsafe.
24/12/14 21:33:55 WARN AbstractS3ACommitterFactory: Using standard FileOutputCommitter to commit work. This is slow and potentially unsafe.
                                                                                