<a href="https://colab.research.google.com/github/AntonPaala/CPE313-CPE31S3/blob/main/Hands_on_Act_1_2_Big_Data.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

#Procedure


In [None]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("AdvancedDFOps").getOrCreate()


Upload the online_retail_ii.csv from Kaggle into your Google Drive.

Mount Google Drive in Colab and read the CSV:


In [None]:
retail_df = spark.read.csv("/content/drive/MyDrive/CPE032/online_retail_II.csv", header=True, inferSchema=True)
retail_df.printSchema()

root
 |-- Invoice: string (nullable = true)
 |-- StockCode: string (nullable = true)
 |-- Description: string (nullable = true)
 |-- Quantity: integer (nullable = true)
 |-- InvoiceDate: timestamp (nullable = true)
 |-- Price: double (nullable = true)
 |-- Customer ID: double (nullable = true)
 |-- Country: string (nullable = true)



1. Merge data from two years of sales into a single DataFrame.
2. First, split the dataset by year:
3. Perform a full outer join on the "Customer ID" column:

In [None]:
retail_df.show()

+-------+---------+--------------------+--------+-------------------+-----+-----------+--------------+
|Invoice|StockCode|         Description|Quantity|        InvoiceDate|Price|Customer ID|       Country|
+-------+---------+--------------------+--------+-------------------+-----+-----------+--------------+
| 489434|    85048|15CM CHRISTMAS GL...|      12|2009-12-01 07:45:00| 6.95|    13085.0|United Kingdom|
| 489434|   79323P|  PINK CHERRY LIGHTS|      12|2009-12-01 07:45:00| 6.75|    13085.0|United Kingdom|
| 489434|   79323W| WHITE CHERRY LIGHTS|      12|2009-12-01 07:45:00| 6.75|    13085.0|United Kingdom|
| 489434|    22041|"RECORD FRAME 7""...|      48|2009-12-01 07:45:00|  2.1|    13085.0|United Kingdom|
| 489434|    21232|STRAWBERRY CERAMI...|      24|2009-12-01 07:45:00| 1.25|    13085.0|United Kingdom|
| 489434|    22064|PINK DOUGHNUT TRI...|      24|2009-12-01 07:45:00| 1.65|    13085.0|United Kingdom|
| 489434|    21871| SAVE THE PLANET MUG|      24|2009-12-01 07:45:00| 1.2

In [None]:
from pyspark.sql.functions import year, col

df_2009 = retail_df.filter(year(col("InvoiceDate")) == 2009)
df_2010 = retail_df.filter(year(col("InvoiceDate")) == 2010)

# Full outer join on Customer ID

joined_df = df_2009.join(df_2010, on=["Customer ID"], how="full_outer")
# Keep only one "Customer ID"



In [None]:
df_2009.show()

+-------+---------+--------------------+--------+-------------------+-----+-----------+--------------+
|Invoice|StockCode|         Description|Quantity|        InvoiceDate|Price|Customer ID|       Country|
+-------+---------+--------------------+--------+-------------------+-----+-----------+--------------+
| 489434|    85048|15CM CHRISTMAS GL...|      12|2009-12-01 07:45:00| 6.95|    13085.0|United Kingdom|
| 489434|   79323P|  PINK CHERRY LIGHTS|      12|2009-12-01 07:45:00| 6.75|    13085.0|United Kingdom|
| 489434|   79323W| WHITE CHERRY LIGHTS|      12|2009-12-01 07:45:00| 6.75|    13085.0|United Kingdom|
| 489434|    22041|"RECORD FRAME 7""...|      48|2009-12-01 07:45:00|  2.1|    13085.0|United Kingdom|
| 489434|    21232|STRAWBERRY CERAMI...|      24|2009-12-01 07:45:00| 1.25|    13085.0|United Kingdom|
| 489434|    22064|PINK DOUGHNUT TRI...|      24|2009-12-01 07:45:00| 1.65|    13085.0|United Kingdom|
| 489434|    21871| SAVE THE PLANET MUG|      24|2009-12-01 07:45:00| 1.2

In [None]:
df_2010.show()

+-------+---------+--------------------+--------+-------------------+-----+-----------+--------------+
|Invoice|StockCode|         Description|Quantity|        InvoiceDate|Price|Customer ID|       Country|
+-------+---------+--------------------+--------+-------------------+-----+-----------+--------------+
| 493410|  TEST001|This is a test pr...|       5|2010-01-04 09:24:00|  4.5|    12346.0|United Kingdom|
|C493411|    21539|RETRO SPOTS BUTTE...|      -1|2010-01-04 09:43:00| 4.25|    14590.0|United Kingdom|
| 493412|  TEST001|This is a test pr...|       5|2010-01-04 09:53:00|  4.5|    12346.0|United Kingdom|
| 493413|    21724|PANDA AND BUNNIES...|       1|2010-01-04 09:54:00| 0.85|       NULL|United Kingdom|
| 493413|    84578|ELEPHANT TOY WITH...|       1|2010-01-04 09:54:00| 3.75|       NULL|United Kingdom|
| 493413|    21723|ALPHABET HEARTS S...|       1|2010-01-04 09:54:00| 0.85|       NULL|United Kingdom|
| 493414|    21844|      RETRO SPOT MUG|      36|2010-01-04 10:28:00| 2.5

Union Operations: A union stacks DataFrames vertically but only works if their schemas match exactly in column names, order, and data types.

In [None]:
union_df = df_2009.union(df_2010)


Advanced Column Operations: Functions like withColumn, year, and struct allow creation of new calculated fields and nested structures without altering the original data.

In [None]:
from pyspark.sql.functions import month, struct

retail_df = retail_df.withColumn("TotalPrice", col("Quantity") * col("Price")) \
                     .withColumn("Year", year(col("InvoiceDate"))) \
                     .withColumn("Month", month(col("InvoiceDate"))) \
                     .withColumn("ProductInfo", struct(col("StockCode"), col("Description")))


Working with Arrays & Maps: Aggregating values into arrays or maps enables storing complex, multi-value attributes inside a single DataFrame column.



In [None]:
from pyspark.sql.functions import collect_list, create_map

products_per_customer = retail_df.groupBy("Customer ID") \
    .agg(collect_list("Description").alias("ProductsBought"))

maps_per_customer = retail_df.groupBy("Customer ID") \
    .agg(collect_list("StockCode").alias("Codes"),
         collect_list("Quantity").alias("Quantities"))


Aggregations and Analytics: Grouping and using aggregate functions such as sum or window functions enables advanced insights like top customers and time-based trends.

In [None]:
from pyspark.sql.functions import sum as _sum

top_customers = retail_df.groupBy("Customer ID") \
    .agg(_sum("TotalPrice").alias("TotalSpent")) \
    .orderBy(col("TotalSpent").desc()) \
    .limit(5)


In [None]:
retail_df.show()


+-------+---------+--------------------+--------+-------------------+-----+-----------+--------------+------------------+----+-----+--------------------+
|Invoice|StockCode|         Description|Quantity|        InvoiceDate|Price|Customer ID|       Country|        TotalPrice|Year|Month|         ProductInfo|
+-------+---------+--------------------+--------+-------------------+-----+-----------+--------------+------------------+----+-----+--------------------+
| 489434|    85048|15CM CHRISTMAS GL...|      12|2009-12-01 07:45:00| 6.95|    13085.0|United Kingdom|              83.4|2009|   12|{85048, 15CM CHRI...|
| 489434|   79323P|  PINK CHERRY LIGHTS|      12|2009-12-01 07:45:00| 6.75|    13085.0|United Kingdom|              81.0|2009|   12|{79323P, PINK CHE...|
| 489434|   79323W| WHITE CHERRY LIGHTS|      12|2009-12-01 07:45:00| 6.75|    13085.0|United Kingdom|              81.0|2009|   12|{79323W,  WHITE C...|
| 489434|    22041|"RECORD FRAME 7""...|      48|2009-12-01 07:45:00|  2.1| 