<a href="https://colab.research.google.com/github/RachelNderitu/Big-Data-Analytics/blob/main/Retail_PySpark.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# 1. Installation and Setup

In [None]:
# Install PySpark and Java
# Install Java
!apt-get install openjdk-8-jdk-headless -qq > /dev/null

# Download and extract Spark
!wget -q https://archive.apache.org/dist/spark/spark-3.5.1/spark-3.5.1-bin-hadoop3.tgz
!tar -xzf spark-3.5.1-bin-hadoop3.tgz

# Install PySpark
!pip install -q pyspark

# Set environment variables
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.5.1-bin-hadoop3"

# Initialize SparkSession
from pyspark.sql import SparkSession
spark = SparkSession.builder \
    .appName("ColabSpark") \
    .master("local[*]") \
    .getOrCreate()

spark


# 2. Data Extraction and Loading

In [None]:
import zipfile
import pandas as pd

# Extract the ZIP file
zip_path = "//content/online+retail.zip"
extract_path = "/content/"

try:
    with zipfile.ZipFile(zip_path, 'r') as zip_ref:
        zip_ref.extractall(extract_path)
    print("Extraction complete. Files extracted:")
    !ls /content/
except FileNotFoundError:
    print("Zip file not found. Please ensure 'online retail.zip' is uploaded.")

# Load data into DataFrame
try:
    df = pd.read_excel('/content/Online Retail.xlsx', engine='openpyxl')  # 'openpyxl' is needed for .xlsx files
    print(" File loaded successfully!")
    print(f"Shape: {df.shape} (rows, columns)")
    print("\nFirst 5 rows:")
    display(df.head())
except Exception as e:
    print(f" Error: {e}")

Extraction complete. Files extracted:
'Online Retail.xlsx'   rdd_output    spark-3.5.1-bin-hadoop3
 online+retail.zip     sample_data   spark-3.5.1-bin-hadoop3.tgz
 File loaded successfully!
Shape: (541909, 8) (rows, columns)

First 5 rows:


Unnamed: 0,InvoiceNo,StockCode,Description,Quantity,InvoiceDate,UnitPrice,CustomerID,Country
0,536365,85123A,WHITE HANGING HEART T-LIGHT HOLDER,6,2010-12-01 08:26:00,2.55,17850.0,United Kingdom
1,536365,71053,WHITE METAL LANTERN,6,2010-12-01 08:26:00,3.39,17850.0,United Kingdom
2,536365,84406B,CREAM CUPID HEARTS COAT HANGER,8,2010-12-01 08:26:00,2.75,17850.0,United Kingdom
3,536365,84029G,KNITTED UNION FLAG HOT WATER BOTTLE,6,2010-12-01 08:26:00,3.39,17850.0,United Kingdom
4,536365,84029E,RED WOOLLY HOTTIE WHITE HEART.,6,2010-12-01 08:26:00,3.39,17850.0,United Kingdom


# 3. Create Spark RDD

In [None]:
# Convert Pandas DataFrame to Spark DataFrame
spark_df = spark.createDataFrame(df)

# Convert Spark DataFrame to RDD
rdd = spark_df.rdd

# Check RDD contents
print(" RDD created successfully!")
print(f"Number of partitions: {rdd.getNumPartitions()}")
print(f"First row: {rdd.first()}")

 RDD created successfully!
Number of partitions: 2
First row: Row(InvoiceNo='536365', StockCode='85123A', Description='WHITE HANGING HEART T-LIGHT HOLDER', Quantity=6, InvoiceDate=datetime.datetime(2010, 12, 1, 8, 26), UnitPrice=2.55, CustomerID=17850.0, Country='United Kingdom')


## The RDD Descriptive Statistics

In [None]:
record_count = rdd.count()
print(f"Total number of records: {record_count}")

unique_items = rdd.map(lambda row: row["Description"]).distinct()
print("\nFirst 10 unique items:")
for item in unique_items.take(10):
    print(item)

Total number of records: 541909

First 10 unique items:
WHITE HANGING HEART T-LIGHT HOLDER
WHITE METAL LANTERN
RED WOOLLY HOTTIE WHITE HEART.
GLASS STAR FROSTED T-LIGHT HOLDER
HAND WARMER RED POLKA DOT
POPPY'S PLAYHOUSE KITCHEN
FELTCRAFT PRINCESS CHARLOTTE DOLL
LOVE BUILDING BLOCK WORD
RECIPE BOX WITH METAL HEART
RED COAT RACK PARIS FASHION


## 4. Low-level: Transformations

In [None]:
# map(): Transform to (Description, Quantity) pairs
map_example = rdd.map(lambda row: (row["Description"], row["Quantity"]))
print("map() - First 5 results:")
for result in map_example.take(5):
    print(result)

map() - First 5 results:
('WHITE HANGING HEART T-LIGHT HOLDER', 6)
('WHITE METAL LANTERN', 6)
('CREAM CUPID HEARTS COAT HANGER', 8)
('KNITTED UNION FLAG HOT WATER BOTTLE', 6)
('RED WOOLLY HOTTIE WHITE HEART.', 6)


In [None]:
# reduceByKey(): Count items per invoice
invoice_counts = rdd.map(lambda row: (row["InvoiceNo"], 1))\
                   .reduceByKey(lambda a, b: a + b)\
                   .sortByKey()
print("\nItems per Invoice (first 5):")
for invoice, count in invoice_counts.take(5):
    print(f"Invoice {invoice}: {count} items")


Items per Invoice (first 5):
Invoice 536365: 7 items
Invoice 536366: 2 items
Invoice 536367: 12 items
Invoice 536368: 4 items
Invoice 536369: 1 items


In [None]:
# FlatMap(): Split description words
# Each word in the Description field becomes a separate record in the new words_rdd.
words_rdd = rdd.flatMap(lambda row: row.Description.split() if row.Description else [])
print(words_rdd.take(10))

['WHITE', 'HANGING', 'HEART', 'T-LIGHT', 'HOLDER', 'WHITE', 'METAL', 'LANTERN', 'CREAM', 'CUPID']


In [None]:
# Distinct(): Removes any duplicate words
unique_words_rdd = words_rdd.distinct()
unique_words = unique_words_rdd.take(10)  # Take first 10 unique words
print(unique_words)

['HOLDER', 'CUPID', 'HANGER', 'FLAG', 'HOT', 'WATER', 'WOOLLY', 'SET', '7', 'BABUSHKA']


In [None]:
unique_countries = rdd.map(lambda row: row.Country).distinct()
print(unique_countries.take(10))

['France', 'Australia', 'Germany', 'EIRE', 'Italy', 'Lithuania', 'Japan', 'Iceland', 'Channel Islands', 'Cyprus']


In [None]:
# Filter(): Filters elements based on a condition.
# Filter expensive items (UnitPrice > 10)
expensive_items_10 = rdd.filter(lambda row: row["UnitPrice"] > 10)\
                       .map(lambda row: (row["Description"], row["UnitPrice"]))
print("Items with UnitPrice > 10 (first 5):")
for desc, price in expensive_items_10.take(5):
    print(f"{desc}: ${price}")

Items with UnitPrice > 10 (first 5):
POSTAGE: $18.0
VICTORIAN SEWING BOX LARGE: $10.95
Discount: $27.5
3 TIER CAKE TIN GREEN AND CREAM: $14.95
3 TIER CAKE TIN RED AND CREAM: $14.95


In [None]:
# Filter very expensive items (UnitPrice > 5000)
expensive_items_5000 = rdd.filter(lambda row: row["UnitPrice"] > 5000)\
                         .map(lambda row: (row["Description"], row["UnitPrice"]))
print("\nItems with UnitPrice > 5000:")
for desc, price in expensive_items_5000.collect():
    print(f"{desc}: ${price}")


Items with UnitPrice > 5000:
AMAZON FEE: $13541.33
AMAZON FEE: $13541.33
AMAZON FEE: $13474.79
AMAZON FEE: $5519.25
AMAZON FEE: $13541.33
AMAZON FEE: $6706.71
AMAZON FEE: $16888.02
AMAZON FEE: $16453.71
AMAZON FEE: $5575.28
AMAZON FEE: $5258.77
AMAZON FEE: $5693.05
AMAZON FEE: $5225.03
POSTAGE: $8142.75
POSTAGE: $8142.75
Manual: $6930.0
AMAZON FEE: $5876.4
AMAZON FEE: $7006.83
Manual: $38970.0
AMAZON FEE: $5791.18
AMAZON FEE: $6497.47
AMAZON FEE: $6721.37
Adjust bad debt: $11062.06
AMAZON FEE: $6662.51
AMAZON FEE: $5522.14
AMAZON FEE: $7427.97
AMAZON FEE: $5942.57
AMAZON FEE: $5942.57
AMAZON FEE: $5877.18
AMAZON FEE: $8286.22
AMAZON FEE: $11586.5
AMAZON FEE: $17836.46


##  Low-level: Actions

In [None]:
# takeOrdered(): Returns the top n elements sorted by the key
# takeOrdered(): Top 5 highest quantity items
top_quantities = rdd.takeOrdered(5, key=lambda row: -row["Quantity"])
print("Top 5 highest quantity items:")
for result in top_quantities:
    print(f"Description: {result['Description']}, Quantity: {result['Quantity']}")

Top 5 highest quantity items:
Description: PAPER CRAFT , LITTLE BIRDIE, Quantity: 80995
Description: MEDIUM CERAMIC TOP STORAGE JAR, Quantity: 74215
Description: ASSTD DESIGN 3D PAPER STICKERS, Quantity: 12540
Description: NaN, Quantity: 5568
Description: WORLD WAR 2 GLIDERS ASSTD DESIGNS, Quantity: 4800


In [None]:
# count(): Count the number of elements in the RDD
# Count the total number of elements in the RDD
total_count = rdd.count()
print(f"Total number of records: {total_count}")

Total number of records: 541909


In [None]:
# collect(): Return all elements as a list to the driver
# Collect all elements in the RDD and print them
all_records = rdd.collect()
print(f"Total records collected: {len(all_records)}")
for record in all_records[:5]:  # Show the first 5 records
    print(record)

Total records collected: 541909
Row(InvoiceNo='536365', StockCode='85123A', Description='WHITE HANGING HEART T-LIGHT HOLDER', Quantity=6, InvoiceDate=datetime.datetime(2010, 12, 1, 8, 26), UnitPrice=2.55, CustomerID=17850.0, Country='United Kingdom')
Row(InvoiceNo='536365', StockCode='71053', Description='WHITE METAL LANTERN', Quantity=6, InvoiceDate=datetime.datetime(2010, 12, 1, 8, 26), UnitPrice=3.39, CustomerID=17850.0, Country='United Kingdom')
Row(InvoiceNo='536365', StockCode='84406B', Description='CREAM CUPID HEARTS COAT HANGER', Quantity=8, InvoiceDate=datetime.datetime(2010, 12, 1, 8, 26), UnitPrice=2.75, CustomerID=17850.0, Country='United Kingdom')
Row(InvoiceNo='536365', StockCode='84029G', Description='KNITTED UNION FLAG HOT WATER BOTTLE', Quantity=6, InvoiceDate=datetime.datetime(2010, 12, 1, 8, 26), UnitPrice=3.39, CustomerID=17850.0, Country='United Kingdom')
Row(InvoiceNo='536365', StockCode='84029E', Description='RED WOOLLY HOTTIE WHITE HEART.', Quantity=6, InvoiceDa

In [None]:
# first(): Return the first element of the RDD
# Get the first record in the RDD
first_record = rdd.first()
print("First record in RDD:")
print(first_record)

First record in RDD:
Row(InvoiceNo='536365', StockCode='85123A', Description='WHITE HANGING HEART T-LIGHT HOLDER', Quantity=6, InvoiceDate=datetime.datetime(2010, 12, 1, 8, 26), UnitPrice=2.55, CustomerID=17850.0, Country='United Kingdom')


In [None]:
# saveAsTextFile(path): Save the RDD to a text file on disk
# Save the RDD as a text file in the specified path
rdd.saveAsTextFile("/content/rdd_output")

In [None]:
# reduce(func): Aggregate the elements of the RDD using the specified associative function. It takes a binary function and combines elements in the RDD. It’s commonly used for aggregation tasks like summing values.
# Reduce to calculate the total quantity of all items in the RDD
total_quantity = rdd.map(lambda row: row["Quantity"]).reduce(lambda a, b: a + b)
print(f"Total quantity of all items: {total_quantity}")

Total quantity of all items: 5176450


In [None]:
# countByKey(): Count the occurrences of each key in an RDD of key-value pairs (only works on RDDs with key-value pairs)
# Assuming we have an RDD of key-value pairs, like (item, quantity)
rdd_kv = rdd.map(lambda row: (row["Description"], row["Quantity"]))

# Count occurrences of each description
item_count = rdd_kv.countByKey()
print("Item counts:")
for item, count in item_count.items():
    print(f"Description: {item}, Count: {count}")

Item counts:
Description: WHITE HANGING HEART T-LIGHT HOLDER, Count: 2369
Description: WHITE METAL LANTERN, Count: 328
Description: CREAM CUPID HEARTS COAT HANGER, Count: 293
Description: KNITTED UNION FLAG HOT WATER BOTTLE, Count: 473
Description: RED WOOLLY HOTTIE WHITE HEART., Count: 449
Description: SET 7 BABUSHKA NESTING BOXES, Count: 389
Description: GLASS STAR FROSTED T-LIGHT HOLDER, Count: 141
Description: HAND WARMER UNION JACK, Count: 515
Description: HAND WARMER RED POLKA DOT, Count: 18
Description: ASSORTED COLOUR BIRD ORNAMENT, Count: 1501
Description: POPPY'S PLAYHOUSE BEDROOM , Count: 436
Description: POPPY'S PLAYHOUSE KITCHEN, Count: 451
Description: FELTCRAFT PRINCESS CHARLOTTE DOLL, Count: 469
Description: IVORY KNITTED MUG COSY , Count: 111
Description: BOX OF 6 ASSORTED COLOUR TEASPOONS, Count: 213
Description: BOX OF VINTAGE JIGSAW BLOCKS , Count: 243
Description: BOX OF VINTAGE ALPHABET BLOCKS, Count: 272
Description: HOME BUILDING BLOCK WORD, Count: 806
Descripti

In [None]:
# takeSample(withReplacement, num, seed): Return a random sample of the RDD (can sample with or without replacement)
# Take a random sample of 5 elements without replacement
sampled_data = rdd.takeSample(withReplacement=False, num=5, seed=42)
print("Sampled Data:")
for record in sampled_data:
    print(record)

Sampled Data:
Row(InvoiceNo='575477', StockCode='84997D', Description='CHILDRENS CUTLERY POLKADOT PINK', Quantity=1, InvoiceDate=datetime.datetime(2011, 11, 9, 16, 14), UnitPrice=8.29, CustomerID=nan, Country='United Kingdom')
Row(InvoiceNo='548191', StockCode='21430', Description='SET/3 RED GINGHAM ROSE STORAGE BOX', Quantity=1, InvoiceDate=datetime.datetime(2011, 3, 29, 15, 20), UnitPrice=7.46, CustomerID=nan, Country='United Kingdom')
Row(InvoiceNo='558999', StockCode='84077', Description='WORLD WAR 2 GLIDERS ASSTD DESIGNS', Quantity=48, InvoiceDate=datetime.datetime(2011, 7, 5, 12, 15), UnitPrice=0.29, CustomerID=15806.0, Country='United Kingdom')
Row(InvoiceNo='564712', StockCode='23319', Description="BOX OF 6 MINI 50'S CRACKERS", Quantity=6, InvoiceDate=datetime.datetime(2011, 8, 28, 10, 40), UnitPrice=2.49, CustomerID=15611.0, Country='United Kingdom')
Row(InvoiceNo='574025', StockCode='22197', Description='POPCORN HOLDER', Quantity=4, InvoiceDate=datetime.datetime(2011, 11, 2, 

# 5. Mid-Level: DataFrame Operations

In [None]:
from pyspark.sql.functions import col

# Convert Pandas DataFrame to Spark DataFrame
df_spark = spark.createDataFrame(df)

# Select specific columns and filter
filtered_df = df_spark.select("InvoiceNo", "Description", "Quantity", "UnitPrice")\
                      .filter(col("Quantity") > 0)

print("Filtered DataFrame (Quantity > 0, first 5):")
filtered_df.show(5, truncate=False)


Filtered DataFrame (Quantity > 0, first 5):
+---------+-----------------------------------+--------+---------+
|InvoiceNo|Description                        |Quantity|UnitPrice|
+---------+-----------------------------------+--------+---------+
|536365   |WHITE HANGING HEART T-LIGHT HOLDER |6       |2.55     |
|536365   |WHITE METAL LANTERN                |6       |3.39     |
|536365   |CREAM CUPID HEARTS COAT HANGER     |8       |2.75     |
|536365   |KNITTED UNION FLAG HOT WATER BOTTLE|6       |3.39     |
|536365   |RED WOOLLY HOTTIE WHITE HEART.     |6       |3.39     |
+---------+-----------------------------------+--------+---------+
only showing top 5 rows



In [None]:
# Show Schema of the DataFrame
df_spark.printSchema()

root
 |-- InvoiceNo: string (nullable = true)
 |-- StockCode: string (nullable = true)
 |-- Description: string (nullable = true)
 |-- Quantity: long (nullable = true)
 |-- InvoiceDate: timestamp (nullable = true)
 |-- UnitPrice: double (nullable = true)
 |-- CustomerID: double (nullable = true)
 |-- Country: string (nullable = true)



In [None]:
# Select Specific Columns
df_spark.select("Description", "Quantity").show(5, truncate=False)

+-----------------------------------+--------+
|Description                        |Quantity|
+-----------------------------------+--------+
|WHITE HANGING HEART T-LIGHT HOLDER |6       |
|WHITE METAL LANTERN                |6       |
|CREAM CUPID HEARTS COAT HANGER     |8       |
|KNITTED UNION FLAG HOT WATER BOTTLE|6       |
|RED WOOLLY HOTTIE WHITE HEART.     |6       |
+-----------------------------------+--------+
only showing top 5 rows



In [None]:
# Filter Rows Based on a Condition
from pyspark.sql.functions import col

df_spark.filter(col("Quantity") > 5).show(5, truncate=False)

+---------+---------+-----------------------------------+--------+-------------------+---------+----------+--------------+
|InvoiceNo|StockCode|Description                        |Quantity|InvoiceDate        |UnitPrice|CustomerID|Country       |
+---------+---------+-----------------------------------+--------+-------------------+---------+----------+--------------+
|536365   |85123A   |WHITE HANGING HEART T-LIGHT HOLDER |6       |2010-12-01 08:26:00|2.55     |17850.0   |United Kingdom|
|536365   |71053    |WHITE METAL LANTERN                |6       |2010-12-01 08:26:00|3.39     |17850.0   |United Kingdom|
|536365   |84406B   |CREAM CUPID HEARTS COAT HANGER     |8       |2010-12-01 08:26:00|2.75     |17850.0   |United Kingdom|
|536365   |84029G   |KNITTED UNION FLAG HOT WATER BOTTLE|6       |2010-12-01 08:26:00|3.39     |17850.0   |United Kingdom|
|536365   |84029E   |RED WOOLLY HOTTIE WHITE HEART.     |6       |2010-12-01 08:26:00|3.39     |17850.0   |United Kingdom|
+---------+-----

In [None]:
# Add a New Column (e.g., Total Price)
df_spark = df_spark.withColumn("TotalPrice", col("Quantity") * col("UnitPrice"))
df_spark.show(5, truncate=False)

+---------+---------+-----------------------------------+--------+-------------------+---------+----------+--------------+------------------+
|InvoiceNo|StockCode|Description                        |Quantity|InvoiceDate        |UnitPrice|CustomerID|Country       |TotalPrice        |
+---------+---------+-----------------------------------+--------+-------------------+---------+----------+--------------+------------------+
|536365   |85123A   |WHITE HANGING HEART T-LIGHT HOLDER |6       |2010-12-01 08:26:00|2.55     |17850.0   |United Kingdom|15.299999999999999|
|536365   |71053    |WHITE METAL LANTERN                |6       |2010-12-01 08:26:00|3.39     |17850.0   |United Kingdom|20.34             |
|536365   |84406B   |CREAM CUPID HEARTS COAT HANGER     |8       |2010-12-01 08:26:00|2.75     |17850.0   |United Kingdom|22.0              |
|536365   |84029G   |KNITTED UNION FLAG HOT WATER BOTTLE|6       |2010-12-01 08:26:00|3.39     |17850.0   |United Kingdom|20.34             |
|53636

In [None]:
#  Group By and Aggregate
df_spark.groupBy("Country").sum("TotalPrice").show(truncate=False)

+------------------+------------------+
|Country           |sum(TotalPrice)   |
+------------------+------------------+
|Sweden            |36595.90999999999 |
|Singapore         |9120.390000000001 |
|Germany           |221698.2100000001 |
|France            |197403.90000000026|
|Greece            |4710.5199999999995|
|European Community|1291.7499999999998|
|Belgium           |40910.95999999998 |
|Finland           |22326.74          |
|Malta             |2505.470000000001 |
|Unspecified       |4749.789999999998 |
|Italy             |16890.509999999987|
|EIRE              |263276.81999999884|
|Lithuania         |1661.06           |
|Norway            |35163.46000000001 |
|Spain             |54774.5800000002  |
|Denmark           |18768.139999999992|
|Hong Kong         |10117.039999999997|
|Iceland           |4310.000000000001 |
|Israel            |7907.82           |
|Channel Islands   |20086.289999999983|
+------------------+------------------+
only showing top 20 rows



In [None]:
# Group by and aggregate
from pyspark.sql.functions import sum, avg, count, desc

# Group by and aggregate
sales_by_country = df_spark.groupBy("Country")\
    .agg(
        sum("Quantity").alias("TotalQuantity"),
        avg("UnitPrice").alias("AvgUnitPrice"),
        count("InvoiceNo").alias("InvoiceCount")
    )\
    .orderBy(desc("TotalQuantity"))

print("Sales by Country (top 5):")
sales_by_country.show(5, truncate=False)


Sales by Country (top 5):
+--------------+-------------+------------------+------------+
|Country       |TotalQuantity|AvgUnitPrice      |InvoiceCount|
+--------------+-------------+------------------+------------+
|United Kingdom|4263829      |4.532422174138713 |495478      |
|Netherlands   |200128       |2.738317165752836 |2371        |
|EIRE          |142637       |5.9110773548073405|8196        |
|Germany       |117448       |3.966929963138557 |9495        |
|France        |110480       |5.028864087881326 |8557        |
+--------------+-------------+------------------+------------+
only showing top 5 rows



##  Mid-level: SQL Operations

In [None]:
# Register the Spark dataframe as a temporary SQL view
df_spark.createOrReplaceTempView("sales")

In [None]:
# SELECT Specific Columns
spark.sql("SELECT InvoiceNo, Description, Quantity FROM sales").show(5, truncate=False)

+---------+-----------------------------------+--------+
|InvoiceNo|Description                        |Quantity|
+---------+-----------------------------------+--------+
|536365   |WHITE HANGING HEART T-LIGHT HOLDER |6       |
|536365   |WHITE METAL LANTERN                |6       |
|536365   |CREAM CUPID HEARTS COAT HANGER     |8       |
|536365   |KNITTED UNION FLAG HOT WATER BOTTLE|6       |
|536365   |RED WOOLLY HOTTIE WHITE HEART.     |6       |
+---------+-----------------------------------+--------+
only showing top 5 rows



In [None]:
# WHERE Clause (Filtering Rows)
spark.sql("SELECT * FROM sales WHERE Quantity > 5").show(5, truncate=False)

+---------+---------+-----------------------------------+--------+-------------------+---------+----------+--------------+------------------+
|InvoiceNo|StockCode|Description                        |Quantity|InvoiceDate        |UnitPrice|CustomerID|Country       |TotalPrice        |
+---------+---------+-----------------------------------+--------+-------------------+---------+----------+--------------+------------------+
|536365   |85123A   |WHITE HANGING HEART T-LIGHT HOLDER |6       |2010-12-01 08:26:00|2.55     |17850.0   |United Kingdom|15.299999999999999|
|536365   |71053    |WHITE METAL LANTERN                |6       |2010-12-01 08:26:00|3.39     |17850.0   |United Kingdom|20.34             |
|536365   |84406B   |CREAM CUPID HEARTS COAT HANGER     |8       |2010-12-01 08:26:00|2.75     |17850.0   |United Kingdom|22.0              |
|536365   |84029G   |KNITTED UNION FLAG HOT WATER BOTTLE|6       |2010-12-01 08:26:00|3.39     |17850.0   |United Kingdom|20.34             |
|53636

In [None]:
# ORDER BY (Sorting Results)
spark.sql("SELECT Description, Quantity FROM sales ORDER BY Quantity DESC").show(5, truncate=False)

+---------------------------------+--------+
|Description                      |Quantity|
+---------------------------------+--------+
|PAPER CRAFT , LITTLE BIRDIE      |80995   |
|MEDIUM CERAMIC TOP STORAGE JAR   |74215   |
|ASSTD DESIGN 3D PAPER STICKERS   |12540   |
|NaN                              |5568    |
|WORLD WAR 2 GLIDERS ASSTD DESIGNS|4800    |
+---------------------------------+--------+
only showing top 5 rows



In [None]:
# GROUP BY with Aggregation
spark.sql("""
SELECT Country, SUM(Quantity) AS TotalQuantity
FROM sales
GROUP BY Country
ORDER BY TotalQuantity DESC
""").show(truncate=False)

+---------------+-------------+
|Country        |TotalQuantity|
+---------------+-------------+
|United Kingdom |4263829      |
|Netherlands    |200128       |
|EIRE           |142637       |
|Germany        |117448       |
|France         |110480       |
|Australia      |83653        |
|Sweden         |35637        |
|Switzerland    |30325        |
|Spain          |26824        |
|Japan          |25218        |
|Belgium        |23152        |
|Norway         |19247        |
|Portugal       |16180        |
|Finland        |10666        |
|Channel Islands|9479         |
|Denmark        |8188         |
|Italy          |7999         |
|Cyprus         |6317         |
|Singapore      |5234         |
|Austria        |4827         |
+---------------+-------------+
only showing top 20 rows



In [None]:
# COUNT DISTINCT
spark.sql("SELECT COUNT(DISTINCT Description) AS UniqueItems FROM sales").show()

+-----------+
|UniqueItems|
+-----------+
|       4224|
+-----------+



# 6. High-Level: Machine Learning


In [None]:
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.ml import Pipeline

# 1. Prepare Data (simulate a binary target)
from pyspark.sql.functions import when

# Add a label column: 1 if TotalPrice > 50, else 0
df_ml = df_spark.withColumn("TotalPrice", col("Quantity") * col("UnitPrice"))
df_ml = df_ml.withColumn("label", when(col("TotalPrice") > 50, 1).otherwise(0))


In [None]:
# 2. Vector Assembler
assembler = VectorAssembler(
    inputCols=["Quantity", "UnitPrice"],
    outputCol="features"
)

In [None]:
# 3. Split and Train Model
# Split data
train_data, test_data = df_ml.randomSplit([0.8, 0.2], seed=42)

# Logistic Regression Model
lr = LogisticRegression(featuresCol="features", labelCol="label")

# Pipeline
pipeline = Pipeline(stages=[assembler, lr])
model = pipeline.fit(train_data)

In [None]:
#  4. Evaluate
predictions = model.transform(test_data)
predictions.select("features", "label", "prediction").show(20)

evaluator = BinaryClassificationEvaluator()
accuracy = evaluator.evaluate(predictions)
print(f"Model Accuracy: {accuracy:.4f}")

+-----------+-----+----------+
|   features|label|prediction|
+-----------+-----+----------+
| [6.0,3.39]|    0|       0.0|
| [6.0,2.55]|    0|       0.0|
| [6.0,1.85]|    0|       0.0|
| [2.0,9.95]|    0|       0.0|
|[32.0,1.69]|    1|       0.0|
| [3.0,4.95]|    0|       0.0|
|[24.0,1.65]|    0|       0.0|
|[24.0,0.42]|    0|       0.0|
| [3.0,18.0]|    1|       0.0|
|[80.0,2.55]|    1|       1.0|
| [6.0,1.85]|    0|       0.0|
| [6.0,4.95]|    0|       0.0|
| [6.0,1.06]|    0|       0.0|
| [6.0,1.06]|    0|       0.0|
| [6.0,3.39]|    0|       0.0|
| [6.0,4.25]|    0|       0.0|
| [6.0,2.55]|    0|       0.0|
|[12.0,3.75]|    0|       0.0|
| [1.0,4.95]|    0|       0.0|
|[10.0,1.65]|    0|       0.0|
+-----------+-----+----------+
only showing top 20 rows

Model Accuracy: 0.9257
