In [42]:
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.window import Window
from pyspark.sql.functions import *
import zipfile
from pathlib import Path

In [43]:
spark = SparkSession.builder.appName("Exercise7").enableHiveSupport().getOrCreate()

25/11/07 14:28:35 WARN SparkSession: Using an existing Spark session; only runtime SQL configurations will take effect.


In [44]:
def load_csv_from_zip(spark, zip_path):
    with zipfile.ZipFile(zip_path, "r") as z:
        csv_name = [f for f in z.namelist() if f.endswith(".csv")][0]
        text = z.read(csv_name).decode("utf-8").splitlines()
    rdd = spark.sparkContext.parallelize(text, 8)
    print(csv_name)
    df = spark.read.option("header", True).csv(rdd)
    return df

In [45]:
def add_source_file_col(df, zip_path):
    file_name = Path(Path(zip_path).stem).stem
    return df.withColumn("source_file", F.lit(file_name))

In [48]:
def add_file_date_col(df):
    return df.withColumn("file_date",F.to_date(F.regexp_extract(F.col("source_file"), r"(\d{4}-\d{2}-\d{2})", 1),"yyyy-MM-dd"))

In [50]:
zip_path = "data/hard-drive-2022-01-01-failures.csv.zip"
df = load_csv_from_zip(spark, zip_path)
df = add_source_file_col(df, zip_path)
df = add_file_date_col(df)
df.show(20, truncate=False)

hard-drive-2022-01-01-failures.csv


25/11/07 14:29:07 WARN TaskSetManager: Stage 37 contains a task of very large size (8903 KiB). The maximum recommended task size is 1000 KiB.
Exception ignored in: <_io.BufferedWriter name=5>
Traceback (most recent call last):
  File "/home/developer/.pyenv/versions/3.13.7/envs/pyspark_env/lib/python3.13/site-packages/pyspark/python/lib/pyspark.zip/pyspark/daemon.py", line 200, in manager
BrokenPipeError: [Errno 32] Broken pipe
25/11/07 14:29:08 WARN TaskSetManager: Stage 38 contains a task of very large size (8903 KiB). The maximum recommended task size is 1000 KiB.


+----------+--------------+--------------------+--------------+-------+------------------+-----------+------------------+-----------+------------------+-----------+------------------+-----------+------------------+-----------+------------------+-----------+------------------+-----------+------------------+-----------+-------------------+------------+-------------------+------------+-------------------+------------+-------------------+------------+-------------------+------------+-------------------+------------+-------------------+------------+-------------------+------------+-------------------+------------+-------------------+------------+-------------------+------------+--------------------+-------------+--------------------+-------------+--------------------+-------------+--------------------+-------------+--------------------+-------------+--------------------+-------------+--------------------+-------------+--------------------+-------------+--------------------+-------------+---

Exception ignored in: <_io.BufferedWriter name=5>
Traceback (most recent call last):
  File "/home/developer/.pyenv/versions/3.13.7/envs/pyspark_env/lib/python3.13/site-packages/pyspark/python/lib/pyspark.zip/pyspark/daemon.py", line 200, in manager
BrokenPipeError: [Errno 32] Broken pipe


In [None]:

def add_file_date_col(df):
    return df.withColumn("file_date",F.to_date(F.regexp_extract("source_file", r"(\d{4}-\d{2}-\d{2})",1)))

def add_brand_col(df):
    return df.withColumn("brand", F.when(F.col("model").contains(" "),F.split("model", " ").getItem(0)).otherwise(F.lit("unknown")))

def add_storage_ranking(df):
    cap_df = (df.select("model", "brand", "capacity_bytes").groupBy("model", "brand").agg(F.min("capacity_bytes").alias("min_capacity")))

    w = Window.partitionBy("brand").orderBy(F.desc("min_capacity"))

    ranked = cap_df.withColumn("storage_ranking", F.dense_rank().over(w))

    return df.join(ranked.select("model", "storage_ranking"), on="model", how="left")

def add_primary_key(df):
    cols = ["date", "serial_number", "model"]
    return df.withColumn("primary_key", F.hash(F.concat_ws("||", *cols)))

In [None]:
df = add_file_date_col(df)
df = add_brand_col(df)
df = add_storage_ranking(df)
df = add_primary_key(df)