- # Bronze layer
    - ## import to parquet or delta lake
        - import from API to csv overwrite

In [None]:
import os
import shutil
import requests
import zipfile
import io

# Paths
base_dir = "../data/openpowerlifting-latest"
os.makedirs(base_dir, exist_ok=True)

# Download ZIP file
url = "https://openpowerlifting.gitlab.io/opl-csv/files/openpowerlifting-latest.zip"
response = requests.get(url)
response.raise_for_status()

# Extract ZIP to memory
with zipfile.ZipFile(io.BytesIO(response.content)) as z:
    # Identify the actual folder name inside the ZIP
    folder_names = list({name.split('/')[0] for name in z.namelist() if '/' in name})
    if not folder_names:
        raise ValueError("‚ùå No folder found inside ZIP.")
    
    inner_folder_name = folder_names[0]
    target_dir = os.path.join(base_dir, inner_folder_name)

    # Skip download if this folder already exists
    if os.path.exists(target_dir):
        print(f"‚è© Skipping: {target_dir} already exists.")
    else:
        print(f"üì¶ Extracting: {inner_folder_name} ‚Üí {base_dir}")
        z.extractall(base_dir)

        # Optional: delete other folders to keep only the latest
        for f in os.listdir(base_dir):
            f_path = os.path.join(base_dir, f)
            if f != inner_folder_name and os.path.isdir(f_path):
                print(f"üóëÔ∏è Deleting old folder: {f_path}")
                shutil.rmtree(f_path)

print("‚úÖ Finished.")


In [None]:
import os
from pyspark.sql import SparkSession

# Set up Spark
spark = SparkSession.builder \
    .appName("Bronze Transformation") \
    .getOrCreate()

# Base directory where ZIP was extracted
base_dir = "../data/openpowerlifting-latest"

# Find the only subfolder (e.g., openpowerlifting-2025-05-31)
subfolders = [f for f in os.listdir(base_dir) if os.path.isdir(os.path.join(base_dir, f))]
if not subfolders:
    raise FileNotFoundError("‚ùå No data folder found in openpowerlifting-latest/")
latest_folder = subfolders[0]

# Path to the CSV file
csv_path = os.path.join(base_dir, latest_folder, "openpowerlifting-2025-05-31.csv")

# In case the CSV filename varies slightly, search for it
if not os.path.exists(csv_path):
    files = os.listdir(os.path.join(base_dir, latest_folder))
    csv_files = [f for f in files if f.endswith(".csv")]
    if not csv_files:
        raise FileNotFoundError("‚ùå No CSV file found in the latest folder.")
    csv_path = os.path.join(base_dir, latest_folder, csv_files[0])

print(f"üìÇ Reading: {csv_path}")

# Read CSV into Spark DataFrame
df = spark.read.csv(csv_path, header=True, inferSchema=True)

# Save to Parquet in the bronze zone
bronze_dir = "../data/bronze"
os.makedirs(bronze_dir, exist_ok=True)
df.write.mode("overwrite").parquet(os.path.join(bronze_dir, "openpowerlifting_bronze.parquet"))

print("‚úÖ Saved Parquet to bronze zone.")


In [None]:
print(df.count())

In [None]:
df.printSchema()