In [None]:
# CELL 0: Start Spark Session  
# CELL 0: 
# ------------------------------------------------

# Summary:
# - Start Spark session using shared notebook 00_ghcn2_setup.ipynb 
# - Use config: 2 executors, 1 core each, 1GB memory
#  

#  
#  
%run ./00_ghcn2_setup.ipynb
# Run this cell to start a spark session in this notebook


start_spark(
    executor_instances=2,
    executor_cores=1,
    worker_memory=1,
    master_memory=1
)


### Assignment 1 ###

The code below demonstrates how to explore and load the data provided for the assignment from Azure Blob Storage and how to save any outputs that you generate to a separate user container.

**Key points**

- The data provided for the assignment is stored in Azure Blob Storage and outputs that you generate will be stored in Azure Blob Storage as well. Hadoop and Spark can both interact with Azure Blob Storage similar to how they interact with HDFS, but where the replication and distribution is handled by Azure instead. This makes it possible to read or write data in Azure over HTTPS where the path is prefixed by `wasbs://`.
- There are two containers, one for the data which is read only and one for any outputs that you generate,
  - `wasbs://campus-data@madsstorage002.blob.core.windows.net/`
  - `wasbs://campus-user@madsstorage002.blob.core.windows.net/`
- You can use variable interpolation to insert your global username variable into paths automatically.
  - This works for bash commands as well.

**Q1** First you will investigate the `daily`, `stations`, `states`, `countries`, and `inventory` data provided  in cloud storage in:
 `wasbs://campus-data@madsstorage002.blob.core.windows.net/ghcnd/`  
using the `hdfs` command.

**(a)** How is the data structured?

In [None]:
# Write your imports here or insert cells below

from pyspark.sql           import functions as F 
from pyspark.sql.types     import *
from pyspark.sql.functions import col
import time

In [None]:
aDaily         = f'/2025.csv.gz'
prefix         = f'wasbs://{azure_data_container_name}@{azure_account_name}.blob.core.windows.net/ghcnd/'
prefixWrite    = f'wasbs://campus-user@madsstorage002.blob.core.windows.net/dew59/'
#prefixWrite    = "./"
prefixDaily    = f'{prefix}daily/'
dprintf(DEEBUG,prefix)
dprintf(DEEBUG,prefixWrite)
dprintf(DEEBUG,prefixDaily)
dprintf(DEEBUG,"DEEBUG = TRUE")


In [None]:
# CELL 15 Q1: - a last minute attempt to accept James challenge
# CELL 15 Q1: – Inspect Azure HDFS layout and daily folder contents
# ------------------------------------------------
# Summary:
# - PART 1: list contents under {prefix}
# - PART 2: human-readable size info under {prefix}
# - PART 3: HDFS block and file count for prefixDaily
# - PART 4: list and format daily file entries

#  A last minute attempt to do what James had challenged the class with.

dprintf(DEEBUG,"Q1:  Azure HDFS layout  daily folder ")

# --- PART 1: directory listing ---
dprintf(DEEBUG, "--- PART 1 ---")
start = time.time()

lines_ls = !hdfs dfs -ls {prefix}
parsed_ls = []

for line in lines_ls:
    line = line.strip()
    if not line or line.startswith("INFO") or line.startswith("WARN") or "Found" in line:
        continue

    parts = line.split()
    if len(parts) >= 2:
        perms     = parts[0]
        full_path = parts[-1]

        if perms.startswith("-") or perms.startswith("d"):
            rel_path = full_path.replace(prefix, '')
            parsed_ls.append((perms, rel_path))

for perms, name in parsed_ls:
    dprintf(DEEBUG, f"{perms:<12} {name}")

stop = time.time()
dprintf(DEEBUG, f"complete in {stop - start:.2f} seconds")
# --- PART 2: du -h (size info) ---
dprintf(DEEBUG, "--- PART 2 ---")
start = time.time()

lines = !hdfs dfs -du -h {prefix}
parsed_du = []

for line in lines:
    if line.startswith("INFO") or line.startswith("WARN"):
        continue

    parts = line.split()
    if len(parts) >= 5:
        size1    = f"{parts[0]} {parts[1]}"
        size2    = f"{parts[2]} {parts[3]}"
        full_path = parts[4]
    elif len(parts) >= 3:
        size1, size2, full_path = parts[0], parts[1], parts[2]
    else:
        continue

    rel_path = full_path.replace(prefix, '')
    parsed_du.append((rel_path, size1, size2))

for name, size1, size2 in parsed_du:
    dprintf(DEEBUG, f"{name:<25} {size1:<7} {size2:<7}")

stop = time.time()
dprintf(DEEBUG, f"complete in {stop - start:.2f} seconds")

# --- PART 3: HDFS file count ---
dprintf(DEEBUG, "--- PART 3 ---")
start = time.time()

!hdfs dfs -count {prefixDaily}

stop = time.time()
dprintf(DEEBUG, f"complete in {stop - start:.2f} seconds")

# --- PART 4: list daily files ---
dprintf(DEEBUG, "--- PART 4 ---")
start = time.time()

lines_daily = !hdfs dfs -ls {prefixDaily}
parsed_daily = []

for line in lines_daily:
    line = line.strip()
    if not line or "INFO" in line or "WARN" in line or "Found" in line:
        continue

    parts = line.split()
    if len(parts) == 6:
        size      = parts[2]
        full_path = parts[5]
        file_name = full_path.rsplit('/', 1)[-1]
        parsed_daily.append((size, file_name))
    else:
        dprintf(DEEBUG, f"(wrong format): {line}")

if parsed_daily:
    for size, name in parsed_daily:
        dprintf(DEEBUG, f"{name:<15} {size}")
else:
    dprintf(DEEBUG, "none found.")

stop = time.time()
dprintf(DEEBUG, f"complete in {stop - start:.2f} seconds")


In [None]:
# CELL 2 Q1(A): Define Schema for Daily  
# CELL 2 Q1(A): 
# ------------------------------------------------

# Summary:
# - Schema for daily based on GHCN Daily README
# - Define the schema using PySpark types for the daily data format

# The daily dataset includes
# - ID (String)
# - DATE (String)
# - ELEMENT (String)
# - VALUE (Double)
# - MFLAG (String)
# - QFLAG (String)
# - SFLAG (String)
# - OBS_TIME (String)
# using pyspark.sql.types.StructType
# 
# The DATE field is formatted as YYYYMMDD as loaded 

from pyspark.sql.types import StructType, StructField, StringType, DoubleType

schemaDaily = StructType([
    StructField("ID", StringType(), True),
    StructField("DATE", StringType(), True),
    StructField("ELEMENT", StringType(), True),
    StructField("VALUE", DoubleType(), True),
    StructField("MFLAG", StringType(), True),
    StructField("QFLAG", StringType(), True),
    StructField("SFLAG", StringType(), True),
    StructField("OBS_TIME", StringType(), True)
])


In [None]:
# CELL 3 Q2(B): Load Single Year of Daily  
# CELL 3 Q2(B): 
# ------------------------------------------------

# Summary:
# - Load this year's daily observations using schemaDaily

# The source data is stored as CSV with no header and read using the schema defined in CELL 2.
# We load only one file (2025.csv.gz) to keep execution fast.
#  

# 2023.csv.gz     174726829
# 2024.csv.gz     168485088
# 2025.csv.gz     17061071 
# drwxrwxrwx   daily
# -rwxrwxrwx   ghcnd-countries.txt
# -rwxrwxrwx   ghcnd-inventory.txt
# -rwxrwxrwx   ghcnd-states.txt
# -rwxrwxrwx   ghcnd-stations.txt

filePath          = f"{prefixDaily}2025.csv.gz"
filePathCountries = f"{prefixDaily}ghcnd-countries.txt"
filePathInventory = f"{prefixDaily}ghcnd-inventory.txt"
filePathStates    = f"{prefixDaily}ghcnd-states.txt"
filePathStations  = f"{prefixDaily}ghcnd-stations.txt"
pqtEnrichedStations = f"{prefixWrite}enhanced-STATION.parquet"
try:
    dfDailyYear = dReadCSV(
        b = 1,
        path = filePath,
        bHeader = False,
        schema = schemaDaily
    )

except Exception as e:
    print("❌ Error caught:", type(e), e)
    traceback.print_exc()


In [None]:
# CELL 4 Q2(C): Load Metadata Tables  
# CELL 4 Q2(C): 
# ------------------------------------------------

# Summary:
# - Load and parse metadata tables: stations, inventory, countries, and states
# - Each dataset is fixed-width format and will be read as text then parsed with substring

#  
#  
#  

# 2023.csv.gz     174726829
# 2024.csv.gz     168485088
# 2025.csv.gz     17061071 
# drwxrwxrwx   daily
# -rwxrwxrwx   ghcnd-countries.txt
# -rwxrwxrwx   ghcnd-inventory.txt
# -rwxrwxrwx   ghcnd-states.txt
# -rwxrwxrwx   ghcnd-stations.txt

filePath          = f"{prefixDaily}2025.csv.gz"
filePathCountries = f"{prefix}ghcnd-countries.txt"
filePathInventory = f"{prefix}ghcnd-inventory.txt"
filePathStates    = f"{prefix}ghcnd-states.txt"
filePathStations  = f"{prefix}ghcnd-stations.txt"

# stations

dprintf(1,"stationsRawdf")
stationsRawdf = dReadTEXT(1,filePathStations)

stationsdf = stationsRawdf.select(
    F.substring("value", 1, 11).alias("ID"),
    F.substring("value", 13, 8).cast("double").alias("LATITUDE"),
    F.substring("value", 22, 9).cast("double").alias("LONGITUDE"),
    F.substring("value", 32, 6).cast("double").alias("ELEVATION"),
    F.substring("value", 39, 2).alias("STATE"),
    F.substring("value", 42, 30).alias("STATIONNAME"),
    F.substring("value", 73, 3).alias("GSNFLAG"),
    F.substring("value", 77, 3).alias("HCNFLAG"),
    F.substring("value", 81, 5).alias("WMOID")
)
stationsdf = stationsdf.withColumn("COUNTRYCODE", F.substring("ID", 1, 2)).drop("CODE")


dprintf(1,"stationsdf")
hprintf(1,stationsdf)

In [None]:
# inventory

dprintf(1,"inventoryRawdf")
inventoryRawdf = dReadTEXT(1, filePathInventory)

inventorydf = inventoryRawdf.select(
    F.substring("value", 1, 11).alias("ID"),
    F.substring("value", 13, 8).cast("double").alias("LATITUDE"),
    F.substring("value", 22, 9).cast("double").alias("LONGITUDE"),
    F.substring("value", 32, 4).alias("ELEMENT"),
    F.substring("value", 37, 4).cast("int").alias("FIRSTYEAR"),
    F.substring("value", 42, 4).cast("int").alias("LASTYEAR")
)

dprintf(1,"inventorydf")
hprintf(1, inventorydf)

In [None]:
# countries

dprintf(1,"countriesdf")
countriesRawdf = dReadTEXT(1, filePathCountries)

countriesdf = countriesRawdf.select(
    F.substring("value", 1, 2).alias("CODE"),
    F.substring("value", 4, 61).alias("NAME")
) 
dprintf(1,"countriesdf")
hprintf(1, countriesdf)

In [None]:
# states

dprintf(1,"statesRawdf")
statesRawdf = dReadTEXT(1, filePathStates)

statesdf = statesRawdf.select(
    F.substring("value", 1, 2).alias("CODE"),
    F.substring("value", 4, 47).alias("NAME")
)

dprintf(1,"statesdf")
hprintf(1, statesdf)

In [None]:
# CELL 5 Q2(D): Row Counts for Metadata Tables  
# CELL 5 Q2(D): 
# ------------------------------------------------

# Summary:
# - Count number of rows in each metadata table
# - Print each count for reference
#  

#  
#  

dprintf(DEEBUG, f"stationsdf.count() = {stationsdf.count()}")
dprintf(DEEBUG, f"inventorydf.count() = {inventorydf.count()}")
dprintf(DEEBUG, f"countriesdf.count() = {countriesdf.count()}")
dprintf(DEEBUG, f"statesdf.count() = {statesdf.count()}")


In [None]:
# CELL 6 Q3(A): Extract Country Code from Station ID  
# CELL 6 Q3(A): 
# ------------------------------------------------

# Summary:
# - Extract first two characters from station ID to identify country
# - Store as COUNTRYCODE column in stationsdf

#  
#  

dprintf(DEEBUG, "stationsdf")
hprintf(DEEBUG, stationsdf)



In [None]:
# Join 1: Add COUNTRYNAME from countriesdf

dprintf(1, "stationsdf (before join with countriesdf)")
hprintf(DEEBUG, stationsdf)

dprintf(1, "countriesdf (used for join)")
hprintf(DEEBUG, countriesdf)

In [None]:
temp1MD  = stationsdf.join(
    countriesdf,
    stationsdf["COUNTRYCODE"] == countriesdf["CODE"],
    "left"
).drop("CODE").withColumnRenamed("NAME","COUNTRYNAME")

dprintf(1, "temp1MD (after join with countriesdf)")
hprintf(DEEBUG, temp1MD)

In [None]:
# Join 2: Add STATENAME from statesdf

dprintf(1, "temp1MD (before join with statesdf)")
hprintf(DEEBUG, temp1MD)

dprintf(1, "statesdf (used for join)")
hprintf(DEEBUG, statesdf)

In [None]:
dprintf(1, "temp1MD (before join with statesdf)")
hprintf(DEEBUG, temp1MD)

stationMetadata = temp1MD.join(
    statesdf,
    temp1MD["STATE"] == statesdf["CODE"],
    "left"
).drop("CODE").withColumnRenamed("NAME","STATENAME")

dprintf(1, "stationMetadata (after join with statesdf)")
hprintf(DEEBUG, stationMetadata)

In [None]:

dprintf(1, "stationMetadata (after join with statesdf)")
hprintf(DEEBUG, stationMetadata)


cols = ["ID", "LATITUDE", "LONGITUDE", "ELEVATION", "STATE", "STATIONNAME",
        "GSNFLAG", "HCNFLAG", "WMOID", "COUNTRYCODE", "COUNTRYNAME", "STATENAME"]


results = []

# Loop through and collect counts
for colname in cols:
    count = countNonEmpty(stationMetadata, colname)
    results.append(Row(COLUMN=colname, COUNT=count))

# Convert list of Rows to DataFrame
summaryCounts = spark.createDataFrame(results)

# Show result
hprintf(1,summaryCounts,20)




dWritePQ(1, pqtEnrichedStations, stationMetadata)

In [None]:
# CELL 8 Q3(B): Extract Country Name from Enriched Metadata  
# CELL 8 Q3(B): 
# ------------------------------------------------

# Summary:
# - From the enriched meta-data: 
# - Display station ID, country code, and country name
#  

#  
#  

displaydf = stationMetadata.select("ID", "COUNTRYCODE", "COUNTRYNAME")\
            .dropDuplicates(["COUNTRYCODE"])
dprintf(1, "Display of station ID, country code, and country name")
hprintf(1, displaydf)


In [None]:
# CELL 9 Q3(C): Extract State Name from Enriched Metadata  
# CELL 9 Q3(C): 
# ------------------------------------------------

# Summary:
# - Select and inspect STATE and STATENAME columns from enriched metadata
# - Validate correctness of state-level join from earlier enrichment step
#  

fiftyStates = [
    "AL", "AK", "AZ", "AR", "CA", "CO", "CT", "DE", "FL", "GA",
    "HI", "ID", "IL", "IN", "IA", "KS", "KY", "LA", "ME", "MD",
    "MA", "MI", "MN", "MS", "MO", "MT", "NE", "NV", "NH", "NJ",
    "NM", "NY", "NC", "ND", "OH", "OK", "OR", "PA", "RI", "SC",
    "SD", "TN", "TX", "UT", "VT", "VA", "WA", "WV", "WI", "WY"
]

# Select distinct STATE-level rows
usStationsdf = stationMetadata \
    .filter(F.col("COUNTRYCODE") == "US") \
    .select("ID", "STATE", "STATENAME") \
    .dropDuplicates(["STATE"])

hprintf(1,usStationsdf,100)  # find out if there are more than 50 states coded.
    
# Split into recognised and unrecognised state codes
df_50states = usStationsdf.filter(F.col("STATE").isin(fiftyStates))
df_nonStates = usStationsdf.filter(~F.col("STATE").isin(fiftyStates))

dprintf(1, "✅ 50 Recognised U.S. States")
hprintf(1, df_50states,100)

dprintf(1, "⚠️ Extra STATE Codes")
hprintf(1, df_nonStates,100)


In [None]:
# CELL 10: Display Station Count by Country (Exploratory Only)  
# CELL 10: 
# ------------------------------------------------

# Summary:
# - Display-only: Count number of stations per country
# - Included for contextual exploration; not required by Q3
#  

#  
#  

summaryByCountrydf = (
    stationsEnricheddf
    .groupBy("COUNTRYNAME")
    .count()
    .orderBy(F.desc("count"))
)

dprintf(DEEBUG, "summaryByCountrydf")
hprintf(DEEBUG, summaryByCountrydf.show(5))
dprintf(DEEBUG, f"summaryByCountrydf.count() = {summaryByCountrydf.count()}")


In [None]:
# CELL 11 Q3(D): Count Unique Elements per Station  
# CELL 11 Q3(D): 
# ------------------------------------------------

# Summary:
# - Join stationMetadata : inventorydf by ID
# - For each station ID, count number of ELEMENTs 
# - Make human readable
#  

# Join and count unique ELEMENTs
stationElementsdf = stationMetadata.join(
    inventorydf,
    on="ID",
    how="inner"  # keep only rows that exist in both dfs
) \
.groupBy("ID", "STATIONNAME","COUNTRYNAME", "STATE") \
.agg(
    F.countDistinct("ELEMENT").alias("nElements")  # repurpose column: count of unique ELEMENTs
)

dprintf(1, "✅ Unique ELEMENT counts per station (with COUNTRYNAME and STATE)")
hprintf(1, stationElementsdf)


In [None]:
# CELL 12 Q3(E): Classify Stations by Core Element Coverage  
# CELL 12 Q3(E): 
# ------------------------------------------------

# Summary:
# - Join stationMetadata with inventorydf on ID to retain station names
# - For each station ID, collect all distinct ELEMENT values observed
# - Intersect with five core elements [PRCP, SNOW, SNWD, TMAX, TMIN]
# - Count number of matching core elements → CORE_COUNT
# - Classify each station:
#     CORE        = all 5 core elements
#     PARTIAL     = some but not all core elements
#     SPECIALISED = none of the core elements
# - Output ID, STATIONNAME, CORE_COUNT, CLASSIFICATION

#  
#  

coreElements = ["TMAX", "TMIN", "PRCP", "SNOW", "SNWD"]

# For each station, get the name of the element 
stationElementsdf = (
    inventorydf
    .groupBy("ID")                   # stationID
    .agg(F.collect_set("ELEMENT")    # get element names
    .alias("ELEMENTS"))              # repurpose 
)

dprintf(DEEBUG, "stationElementsdf - Joining stationMetadata with inventorydf to create stationElementsdf")
hprintf(DEEBUG,stationElementsdf)

coreIntersectdf = stationElementsdf.withColumn(
    "CORE_MATCH",
    F.array_intersect(
        F.array(
            F.lit("PRCP"),
            F.lit("SNOW"),
            F.lit("SNWD"),
            F.lit("TMAX"),
            F.lit("TMIN")
        ),
        F.col("ELEMENTS")
    )
)

dprintf(DEEBUG, "coreIntersectdf – matched core elements per station")
hprintf(DEEBUG,coreIntersectdf)

coreCountdf = coreIntersectdf.withColumn(
    "CORE_COUNT",
    F.size(F.col("CORE_MATCH"))
)

dprintf(1, "coreCountdf – number of core elements matched")

hprintf(DEEBUG,coreCountdf)

clasifydf = coreCountdf.withColumn(
    "CLASSIFICATION",
    F.when(F.col("CORE_COUNT") == 5, "CORE")
     .when((F.col("CORE_COUNT") > 0) & (F.col("CORE_COUNT") < 5), "PARTIAL")
     .otherwise("SPECIALISED")
)


resultdf = clasifydf.join(
    stationMetadata.select("ID", "STATIONNAME"),
    on="ID",
    how="left"
).select("ID", "STATIONNAME", "CORE_COUNT", "CLASSIFICATION")


hprintf(1, clasifydf)

dprintf(1, "resultdf – final output: ID, STATIONNAME, CORE_COUNT, CLASSIFICATION")
hprintf(1, resultdf)

# Step 1:  summary
groupSummarydf = resultdf.groupBy("CLASSIFICATION") \
    .count() \
    .withColumnRenamed("count", "nStations") \
    .orderBy("CLASSIFICATION")


dprintf(1, "groupSummarydf – number of stations per CLASSIFICATION category")
hprintf(1, groupSummarydf)

# Step 2:  total row
totalCount = groupSummarydf.agg(F.sum("nStations")).first()[0]
totalRow = spark.createDataFrame([("TOTAL", totalCount)], ["CLASSIFICATION", "nStations"])

# Step 3: Union the total row with the summary
summarydf = groupSummarydf.unionByName(totalRow)

dprintf(1, "summarydf"  )
hprintf(DEEBUG,summarydf)



In [None]:
# CELL 13 Q3(E): Join Classification Back to Enriched Metadata  
# CELL 13 Q3(E): 
# ------------------------------------------------

# Summary:
# - Join classification labels (CORE / PARTIAL / SPECIALISED) back to enriched metadata
# - Resulting dataframe can be exported or visualised
#  

#  
#  

stationsClassifieddf = stationsEnricheddf.join(
    classifiedStationsdf.select("ID", "CLASSIFICATION"),
    on="ID",
    how="left"
)

dprintf(DEEBUG, "stationsClassifieddf – enriched metadata with classification")
hprintf(DEEBUG, stationsClassifieddf.select("ID", "COUNTRYNAME", "CLASSIFICATION").show(5))
dprintf(DEEBUG, f"stationsClassifieddf.count() = {stationsClassifieddf.count()}")


In [None]:
# CELL 14: Write Classified Metadata for Analysis  
# CELL 14: 
# ------------------------------------------------

# Summary:
# - Write only classifiedStationsdf for later analysis steps
# - stationsEnricheddf was already written in CELL 7
#  

#  
#  

pqtStationsClassified = f"{prefixWrite}/stations_classified.parquet"

dprintf(DEEBUG, f"classifiedStationsdf.count() = {classifiedStationsdf.count()}")
dprintf(DEEBUG, f"write to : {pqtStationsClassified}")

if not 0:
    start = time.time()
    classifiedStationsdf.write.mode("overwrite").parquet(pqtStationsClassified)
    stop = time.time()
    dprintf(1, f"complete in {stop - start:.2f} seconds")
    dprintf(1, f"classifiedStationsdf.count() = {classifiedStationsdf.count()}")
else:
    dprintf(1, f"DEEBUG = 1, no write to : {pqtStationsClassified}")


In [None]:
# CELL 15 Q1: – Inspect Azure HDFS layout and daily folder contents
# CELL 15 Q1: 
# ------------------------------------------------
# Summary:
# - PART 1: list contents under {prefix}
# - PART 2: human-readable size info under {prefix}
# - PART 3: HDFS block and file count for prefixDaily
# - PART 4: list and format daily file entries

#  A last minute attempt to do what James had challenged the class with.

hprintf("Q1:  Azure HDFS layout  daily folder ")

# --- PART 1: directory listing ---
dprintf(DEEBUG, "--- PART 1 ---")
start = time.time()

lines_ls = !hdfs dfs -ls {prefix}
parsed_ls = []

for line in lines_ls:
    line = line.strip()
    if not line or line.startswith("INFO") or line.startswith("WARN") or "Found" in line:
        continue

    parts = line.split()
    if len(parts) >= 2:
        perms     = parts[0]
        full_path = parts[-1]

        if perms.startswith("-") or perms.startswith("d"):
            rel_path = full_path.replace(prefix, '')
            parsed_ls.append((perms, rel_path))

for perms, name in parsed_ls:
    dprintf(DEEBUG, f"{perms:<12} {name}")

stop = time.time()
dprintf(DEEBUG, f"complete in {stop - start:.2f} seconds")

# --- PART 2: du -h (size info) ---
dprintf(DEEBUG, "--- PART 2 ---")
start = time.time()

lines = !hdfs dfs -du -h {prefix}
parsed_du = []

for line in lines:
    if line.startswith("INFO") or line.startswith("WARN"):
        continue

    parts = line.split()
    if len(parts) >= 5:
        size1    = f"{parts[0]} {parts[1]}"
        size2    = f"{parts[2]} {parts[3]}"
        full_path = parts[4]
    elif len(parts) >= 3:
        size1, size2, full_path = parts[0], parts[1], parts[2]
    else:
        continue

    rel_path = full_path.replace(prefix, '')
    parsed_du.append((rel_path, size1, size2))

for name, size1, size2 in parsed_du:
    dprintf(DEEBUG, f"{name:<25} {size1:<7} {size2:<7}")

stop = time.time()
dprintf(DEEBUG, f"complete in {stop - start:.2f} seconds")

# --- PART 3: HDFS file count ---
dprintf(DEEBUG, "--- PART 3 ---")
start = time.time()

!hdfs dfs -count {prefixDaily}

stop = time.time()
dprintf(DEEBUG, f"complete in {stop - start:.2f} seconds")

# --- PART 4: list daily files ---
dprintf(DEEBUG, "--- PART 4 ---")
start = time.time()

lines_daily = !hdfs dfs -ls {prefixDaily}
parsed_daily = []

for line in lines_daily:
    line = line.strip()
    if not line or "INFO" in line or "WARN" in line or "Found" in line:
        continue

    parts = line.split()
    if len(parts) == 6:
        size      = parts[2]
        full_path = parts[5]
        file_name = full_path.rsplit('/', 1)[-1]
        parsed_daily.append((size, file_name))
    else:
        dprintf(DEEBUG, f"(wrong format): {line}")

if parsed_daily:
    for size, name in parsed_daily:
        dprintf(DEEBUG, f"{name:<15} {size}")
else:
    dprintf(DEEBUG, "none found.")

stop = time.time()
dprintf(DEEBUG, f"complete in {stop - start:.2f} seconds")


In [None]:
stop_spark()