In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, IntegerType, StringType, TimestampType, FloatType
from pyspark.sql.functions import expr, countDistinct, col, month, max, sum as spark_sum

In [None]:
spark = (
SparkSession.builder
.master("local[*]")
.appName("Lab0-OnlineRetail-Warmup")
.config("spark.ui.showConsoleProgress", "false")
.getOrCreate()

)
print("Spark version:", spark.version)
print("Master:", spark.sparkContext.master)

online_retail_schema = StructType([
StructField("InvoiceNo", IntegerType(), True),
StructField("StockCode", StringType(), True),
StructField("Description", StringType(), True),
StructField("Quantity", IntegerType(), True),
StructField("InvoiceDate", TimestampType(), True),
StructField("UnitPrice", FloatType(), True),
StructField("CustomerId", IntegerType(), True),
StructField("Country", StringType(), True),
])

# Load CSV
df = (
spark.read
.option("header", "true")
.option("timestampFormat", "M/d/yyyy H:m")
.schema(online_retail_schema)
.csv("OnlineRetail.csv")
)
print("Rows:", df.count())
df.show(3, truncate=False)

Spark version: 3.5.1
Master: local[*]
Rows: 541909
+---------+---------+----------------------------------+--------+-------------------+---------+----------+--------------+
|InvoiceNo|StockCode|Description                       |Quantity|InvoiceDate        |UnitPrice|CustomerId|Country       |
+---------+---------+----------------------------------+--------+-------------------+---------+----------+--------------+
|536365   |85123A   |WHITE HANGING HEART T-LIGHT HOLDER|6       |2010-12-01 08:26:00|2.55     |17850     |United Kingdom|
|536365   |71053    |WHITE METAL LANTERN               |6       |2010-12-01 08:26:00|3.39     |17850     |United Kingdom|
|536365   |84406B   |CREAM CUPID HEARTS COAT HANGER    |8       |2010-12-01 08:26:00|2.75     |17850     |United Kingdom|
+---------+---------+----------------------------------+--------+-------------------+---------+----------+--------------+
only showing top 3 rows



In [None]:
# Exercise 1: show 5 descriptions
df.select("Description").show(5, truncate=False)

+-----------------------------------+
|Description                        |
+-----------------------------------+
|WHITE HANGING HEART T-LIGHT HOLDER |
|WHITE METAL LANTERN                |
|CREAM CUPID HEARTS COAT HANGER     |
|KNITTED UNION FLAG HOT WATER BOTTLE|
|RED WOOLLY HOTTIE WHITE HEART.     |
+-----------------------------------+
only showing top 5 rows



In [None]:
# Exercise 2: count distinct invoices
df.select("InvoiceNo").distinct().count()
df.select(countDistinct("InvoiceNo")).show()

+-------------------------+
|count(DISTINCT InvoiceNo)|
+-------------------------+
|                    22061|
+-------------------------+



In [None]:
# Exercise 3: month with most invoices

df.groupBy(month("InvoiceDate")).count().orderBy(col("count").desc()).show(1)

+------------------+-----+
|month(InvoiceDate)|count|
+------------------+-----+
|                11|84711|
+------------------+-----+
only showing top 1 row



In [None]:
# Exercise 4: high quantity lines
df.select("*").where(col("Quantity") > 30).show()

+---------+---------+--------------------+--------+-------------------+---------+----------+--------------+
|InvoiceNo|StockCode|         Description|Quantity|        InvoiceDate|UnitPrice|CustomerId|       Country|
+---------+---------+--------------------+--------+-------------------+---------+----------+--------------+
|   536367|    84879|ASSORTED COLOUR B...|      32|2010-12-01 08:34:00|     1.69|     13047|United Kingdom|
|   536370|    10002|INFLATABLE POLITI...|      48|2010-12-01 08:45:00|     0.85|     12583|        France|
|   536370|    22492|MINI PAINT SET VI...|      36|2010-12-01 08:45:00|     0.65|     12583|        France|
|   536371|    22086|PAPER CHAIN KIT 5...|      80|2010-12-01 09:00:00|     2.55|     13748|United Kingdom|
|   536374|    21258|VICTORIAN SEWING ...|      32|2010-12-01 09:09:00|    10.95|     15100|United Kingdom|
|   536376|    22114|HOT WATER BOTTLE ...|      48|2010-12-01 09:32:00|     3.45|     15291|United Kingdom|
|   536376|    21733|RED HAN

In [None]:
# Exercise 5: top 4 most sold items


df.groupBy("Description", "StockCode").agg(spark_sum("Quantity").alias("TotalQuantity")).orderBy(col("TotalQuantity").desc()).show(4)

+--------------------+---------+-------------+
|         Description|StockCode|TotalQuantity|
+--------------------+---------+-------------+
|WORLD WAR 2 GLIDE...|    84077|        53847|
|JUMBO BAG RED RET...|   85099B|        47363|
|ASSORTED COLOUR B...|    84879|        36381|
|      POPCORN HOLDER|    22197|        36334|
+--------------------+---------+-------------+
only showing top 4 rows



In [91]:
(
  df.select("InvoiceNo").where(col("InvoiceNo").isNull()).distinct().count(),
  df.where(col("InvoiceNo").isNull()).select(countDistinct("InvoiceNo"))
)


(1, DataFrame[count(DISTINCT InvoiceNo): bigint])

# Extra challenge: count vs countDistinct with null
The above examples of code implements the compared methods accompagned with the output, where we notice the first difference is in the **return type** of the 1st is an **int** and the 2nd is a **bigint in a DataFrame** that could be shown using show method.

where the first value is equal to 1 indicating that it **considers** the null value **when filtering/comparing** whereas the 2nd one's result is 0 indicating that the null values are **not considered**.

In [None]:
spark.stop()