# LAB 0 - PYSPARK

#install Pyspark

In [14]:
!pip install -q pyspark

# SparK Session

In [15]:
from pyspark.sql import SparkSession
spark = (
SparkSession.builder
.master("local[*]")
.appName("Lab0-OnlineRetail-Warmup")
.config("spark.ui.showConsoleProgress", "false")
.getOrCreate()

)

#Import library

In [16]:
from pyspark.sql.types import (
    StructType, StructField,
    IntegerType, StringType,
    TimestampType, FloatType
)


In [17]:
!unzip -o archive.zip

Archive:  archive.zip
  inflating: OnlineRetail.csv        


# organise the shema

In [18]:
online_retail_schema = StructType([
    StructField("InvoiceNo", StringType(), True),
    StructField("StockCode", StringType(), True),
    StructField("Description", StringType(), True),
    StructField("Quantity", IntegerType(), True),
    StructField("InvoiceDate", TimestampType(), True),
    StructField("UnitPrice", FloatType(), True),
    StructField("CustomerId", IntegerType(), True),
    StructField("Country", StringType(), True),
])


 # read the data





In [19]:
df = (
    spark.read
        .option("header", "true")                 # Le fichier contient des noms de colonnes
        .option("timestampFormat", "M/d/yyyy H:m") # Format des dates dans InvoiceDate
        .schema(online_retail_schema)              # On impose un schéma
        .csv("OnlineRetail.csv")                   # Chemin du fichier
)

display 5 rows in our data

In [20]:
df.show(5)


+---------+---------+--------------------+--------+-------------------+---------+----------+--------------+
|InvoiceNo|StockCode|         Description|Quantity|        InvoiceDate|UnitPrice|CustomerId|       Country|
+---------+---------+--------------------+--------+-------------------+---------+----------+--------------+
|   536365|   85123A|WHITE HANGING HEA...|       6|2010-12-01 08:26:00|     2.55|     17850|United Kingdom|
|   536365|    71053| WHITE METAL LANTERN|       6|2010-12-01 08:26:00|     3.39|     17850|United Kingdom|
|   536365|   84406B|CREAM CUPID HEART...|       8|2010-12-01 08:26:00|     2.75|     17850|United Kingdom|
|   536365|   84029G|KNITTED UNION FLA...|       6|2010-12-01 08:26:00|     3.39|     17850|United Kingdom|
|   536365|   84029E|RED WOOLLY HOTTIE...|       6|2010-12-01 08:26:00|     3.39|     17850|United Kingdom|
+---------+---------+--------------------+--------+-------------------+---------+----------+--------------+
only showing top 5 rows



# Shema

In [21]:
df.printSchema()


root
 |-- InvoiceNo: string (nullable = true)
 |-- StockCode: string (nullable = true)
 |-- Description: string (nullable = true)
 |-- Quantity: integer (nullable = true)
 |-- InvoiceDate: timestamp (nullable = true)
 |-- UnitPrice: float (nullable = true)
 |-- CustomerId: integer (nullable = true)
 |-- Country: string (nullable = true)



# count how many rows in data

In [22]:
print("ROWS", df.count())

ROWS 541909


In [23]:
# Show a few rows
df.show(5, truncate=False)

+---------+---------+-----------------------------------+--------+-------------------+---------+----------+--------------+
|InvoiceNo|StockCode|Description                        |Quantity|InvoiceDate        |UnitPrice|CustomerId|Country       |
+---------+---------+-----------------------------------+--------+-------------------+---------+----------+--------------+
|536365   |85123A   |WHITE HANGING HEART T-LIGHT HOLDER |6       |2010-12-01 08:26:00|2.55     |17850     |United Kingdom|
|536365   |71053    |WHITE METAL LANTERN                |6       |2010-12-01 08:26:00|3.39     |17850     |United Kingdom|
|536365   |84406B   |CREAM CUPID HEARTS COAT HANGER     |8       |2010-12-01 08:26:00|2.75     |17850     |United Kingdom|
|536365   |84029G   |KNITTED UNION FLAG HOT WATER BOTTLE|6       |2010-12-01 08:26:00|3.39     |17850     |United Kingdom|
|536365   |84029E   |RED WOOLLY HOTTIE WHITE HEART.     |6       |2010-12-01 08:26:00|3.39     |17850     |United Kingdom|
+---------+-----

In [24]:
# Display columns
print("Columns:", df.columns)

Columns: ['InvoiceNo', 'StockCode', 'Description', 'Quantity', 'InvoiceDate', 'UnitPrice', 'CustomerId', 'Country']


# describe the data

In [25]:
# Quick statistical summary on numeric fields
df.describe(["Quantity", "UnitPrice"]).show()

+-------+------------------+-----------------+
|summary|          Quantity|        UnitPrice|
+-------+------------------+-----------------+
|  count|            541909|           541909|
|   mean|  9.55224954743324|4.611113614622466|
| stddev|218.08115785023486|96.75985330031472|
|    min|            -80995|        -11062.06|
|    max|             80995|          38970.0|
+-------+------------------+-----------------+



# SQL in Pyspark

In [26]:
from pyspark.sql.functions import col, expr
# Single column
df.select("Country").show(5, truncate=False)

+--------------+
|Country       |
+--------------+
|United Kingdom|
|United Kingdom|
|United Kingdom|
|United Kingdom|
|United Kingdom|
+--------------+
only showing top 5 rows



In [27]:
# Multiple columns
df.select("StockCode", "Description", "UnitPrice").show(5, truncate=False)

+---------+-----------------------------------+---------+
|StockCode|Description                        |UnitPrice|
+---------+-----------------------------------+---------+
|85123A   |WHITE HANGING HEART T-LIGHT HOLDER |2.55     |
|71053    |WHITE METAL LANTERN                |3.39     |
|84406B   |CREAM CUPID HEARTS COAT HANGER     |2.75     |
|84029G   |KNITTED UNION FLAG HOT WATER BOTTLE|3.39     |
|84029E   |RED WOOLLY HOTTIE WHITE HEART.     |3.39     |
+---------+-----------------------------------+---------+
only showing top 5 rows



In [28]:
# First 5 columns using slicing on df.columns
df.select(df.columns[0:5]).show(5, truncate=False)

+---------+---------+-----------------------------------+--------+-------------------+
|InvoiceNo|StockCode|Description                        |Quantity|InvoiceDate        |
+---------+---------+-----------------------------------+--------+-------------------+
|536365   |85123A   |WHITE HANGING HEART T-LIGHT HOLDER |6       |2010-12-01 08:26:00|
|536365   |71053    |WHITE METAL LANTERN                |6       |2010-12-01 08:26:00|
|536365   |84406B   |CREAM CUPID HEARTS COAT HANGER     |8       |2010-12-01 08:26:00|
|536365   |84029G   |KNITTED UNION FLAG HOT WATER BOTTLE|6       |2010-12-01 08:26:00|
|536365   |84029E   |RED WOOLLY HOTTIE WHITE HEART.     |6       |2010-12-01 08:26:00|
+---------+---------+-----------------------------------+--------+-------------------+
only showing top 5 rows



In [29]:
# Add a simple indicator column using an expression
df_flagged = df.selectExpr(
"*",
"UnitPrice > 100 as HighValueItem"
)

In [30]:
df_flagged.select("InvoiceNo", "Description", "UnitPrice", "HighValueItem")

DataFrame[InvoiceNo: string, Description: string, UnitPrice: float, HighValueItem: boolean]

In [31]:
# Compute some global aggregates in a single selectExpr
df_agg = df.selectExpr(
"sum(Quantity) as total_quantity",
"sum(UnitPrice * Quantity) as approx_revenue"
)
df_agg.show()

+--------------+----------------+
|total_quantity|  approx_revenue|
+--------------+----------------+
|       5176450|9747747.90762934|
+--------------+----------------+



# Add and rename a column

In [32]:
from pyspark.sql.functions import col

# Add InvoiceValue column
df_with_value = df.withColumn(
    "InvoiceValue",
    col("UnitPrice") * col("Quantity")
)

df_with_value.select(
    "InvoiceNo", "Description", "UnitPrice", "Quantity", "InvoiceValue"
).show(5, truncate=False)

# Rename InvoiceValue → LineTotal
df_line_total = df_with_value.withColumnRenamed("InvoiceValue", "LineTotal")

df_line_total.select(
    "InvoiceNo", "Description", "UnitPrice", "Quantity", "LineTotal"
).show(5, truncate=False)


+---------+-----------------------------------+---------+--------+------------+
|InvoiceNo|Description                        |UnitPrice|Quantity|InvoiceValue|
+---------+-----------------------------------+---------+--------+------------+
|536365   |WHITE HANGING HEART T-LIGHT HOLDER |2.55     |6       |15.299999   |
|536365   |WHITE METAL LANTERN                |3.39     |6       |20.34       |
|536365   |CREAM CUPID HEARTS COAT HANGER     |2.75     |8       |22.0        |
|536365   |KNITTED UNION FLAG HOT WATER BOTTLE|3.39     |6       |20.34       |
|536365   |RED WOOLLY HOTTIE WHITE HEART.     |3.39     |6       |20.34       |
+---------+-----------------------------------+---------+--------+------------+
only showing top 5 rows

+---------+-----------------------------------+---------+--------+---------+
|InvoiceNo|Description                        |UnitPrice|Quantity|LineTotal|
+---------+-----------------------------------+---------+--------+---------+
|536365   |WHITE HANGING

# Drop columns

In [33]:
# Drop a couple of columns (for example CustomerId and StockCode)
df_reduced = df.drop("CustomerId", "StockCode")

print("Original number of columns:", len(df.columns))
print("After drop:", len(df_reduced.columns))

df_reduced.show(5, truncate=False)


Original number of columns: 8
After drop: 6
+---------+-----------------------------------+--------+-------------------+---------+--------------+
|InvoiceNo|Description                        |Quantity|InvoiceDate        |UnitPrice|Country       |
+---------+-----------------------------------+--------+-------------------+---------+--------------+
|536365   |WHITE HANGING HEART T-LIGHT HOLDER |6       |2010-12-01 08:26:00|2.55     |United Kingdom|
|536365   |WHITE METAL LANTERN                |6       |2010-12-01 08:26:00|3.39     |United Kingdom|
|536365   |CREAM CUPID HEARTS COAT HANGER     |8       |2010-12-01 08:26:00|2.75     |United Kingdom|
|536365   |KNITTED UNION FLAG HOT WATER BOTTLE|6       |2010-12-01 08:26:00|3.39     |United Kingdom|
|536365   |RED WOOLLY HOTTIE WHITE HEART.     |6       |2010-12-01 08:26:00|3.39     |United Kingdom|
+---------+-----------------------------------+--------+-------------------+---------+--------------+
only showing top 5 rows



# GroupBy aggregations

In [34]:
from pyspark.sql.functions import avg, stddev_pop
# Average quantity per country
avg_qty_per_country = (
df.groupBy("Country")
.agg(avg("Quantity").alias("avg_quantity"))
)
avg_qty_per_country.show(10, truncate=False)

+------------------+------------------+
|Country           |avg_quantity      |
+------------------+------------------+
|Sweden            |77.13636363636364 |
|Singapore         |22.85589519650655 |
|Germany           |12.369457609268036|
|France            |12.91106696272058 |
|Greece            |10.657534246575343|
|European Community|8.147540983606557 |
|Belgium           |11.18994683421943 |
|Finland           |15.346762589928058|
|Malta             |7.433070866141732 |
|Unspecified       |7.3991031390134525|
+------------------+------------------+
only showing top 10 rows



In [35]:
invoice_stats = (
df.groupBy("InvoiceNo")
.agg(
avg("Quantity").alias("avg_quantity"),
stddev_pop("Quantity").alias("std_quantity")
)
)
invoice_stats.show(5, truncate=False)

+---------+------------------+------------------+
|InvoiceNo|avg_quantity      |std_quantity      |
+---------+------------------+------------------+
|536596   |1.5               |1.1180339887498947|
|536938   |33.142857142857146|20.698023172885524|
|537252   |31.0              |0.0               |
|537691   |8.15              |5.597097462078001 |
|538041   |30.0              |0.0               |
+---------+------------------+------------------+
only showing top 5 rows



# Schema inference vs explicit schema timing

In [36]:
# Explicit schema timing
%%time
df_schema = (
spark.read
.option("header", "true")
.option("timestampFormat", "M/d/yyyy H:m")
.schema(online_retail_schema)
.csv("OnlineRetail.csv")
)

CPU times: user 1.39 ms, sys: 1.84 ms, total: 3.23 ms
Wall time: 37.7 ms


In [37]:
# Inferred schema timing
%%time
df_infer = (
spark.read
.option("header", "true")
.option("inferSchema", "true")
.option("timestampFormat", "M/d/yyyy H:m")
.csv("OnlineRetail.csv")
)
print("Schema with explicit schema:")
df_schema.printSchema()
print("\nSchema with inferSchema:")
df_infer.printSchema()

Schema with explicit schema:
root
 |-- InvoiceNo: string (nullable = true)
 |-- StockCode: string (nullable = true)
 |-- Description: string (nullable = true)
 |-- Quantity: integer (nullable = true)
 |-- InvoiceDate: timestamp (nullable = true)
 |-- UnitPrice: float (nullable = true)
 |-- CustomerId: integer (nullable = true)
 |-- Country: string (nullable = true)


Schema with inferSchema:
root
 |-- InvoiceNo: string (nullable = true)
 |-- StockCode: string (nullable = true)
 |-- Description: string (nullable = true)
 |-- Quantity: integer (nullable = true)
 |-- InvoiceDate: timestamp (nullable = true)
 |-- UnitPrice: double (nullable = true)
 |-- CustomerID: integer (nullable = true)
 |-- Country: string (nullable = true)

CPU times: user 5.16 ms, sys: 151 µs, total: 5.31 ms
Wall time: 4.13 s


 # Reflection
 Which load was faster?

When comparing the two approaches, the load with an explicit schema was noticeably faster.
This is because Spark did not need to scan the entire dataset to infer the types of each column.
By skipping schema inference, the reading step becomes lighter and more efficient.

Why is having an explicit schema useful in real projects?

Using an explicit schema is important for several reasons:

1. Performance

With a predefined schema, Spark does not need to analyze all rows to guess column types.
This reduces the I/O cost and speeds up the loading process, especially for large datasets.

2. Control over data types

In many datasets, some columns may look numeric but contain letters or missing values.
By defining the schema yourself, you ensure that Spark uses the correct data types right from the start.

3. Consistency across data sources

In real-world pipelines, data often comes from multiple files, days, or systems.
Manually defining the schema guarantees that all files follow the same structure, preventing errors during merges or transformations.

4. Fewer data-quality issues

Schema inference may misinterpret dates, decimals, or mixed-type columns.
An explicit schema eliminates these ambiguities and reduces the risk of nulls or incorrect parsing.