In [1]:
# data/jobs/bronze/bronze_services.py
import argparse
from pyspark.sql import SparkSession, functions as F

def spark(app):
    return (
        SparkSession.builder
        .appName(app)
        # Delta Lake integration
        .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension")
        .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog")
        # HDFS endpoint (from docker-compose: hdfs-namenode on port 9000)
        .config("spark.hadoop.fs.defaultFS", "hdfs://hdfs-namenode:9000")
        # small local-friendly shuffle
        .config("spark.sql.shuffle.partitions", "8")
        .getOrCreate()
    )

def main(ingest_date: str, out_path: str):
    s = spark("bronze-delta-sanity")

    # --- 1) tiny in-memory DataFrame -----------------------------------------
    df = s.createDataFrame(
        [
            ("a001", "Alice", 29),
            ("b002", "Bob",   41),
            ("c003", "Cara",  35),
        ],
        ["customerID", "name", "age"]
    ).withColumn("ingest_date", F.lit(ingest_date))

    # --- 2) write as Delta to HDFS -------------------------------------------
    (df.write
        .format("delta")
        .mode("append")
        .partitionBy("ingest_date")
        .save(out_path))

    print(f"âœ… Wrote Delta table â†’ {out_path} (ingest_date={ingest_date})")

    # --- 3) read back & show --------------------------------------------------
    back = s.read.format("delta").load(out_path)
    print("ðŸ”Ž Read-back sample:")
    back.where(F.col("ingest_date") == ingest_date).show(truncate=False)

In [2]:
!pwd

/home/jovyan/work


In [3]:
main("2025-10-11", "hdfs://hdfs-namenode:9000/data/delta/bronze/telco/services")

âœ… Wrote Delta table â†’ hdfs://hdfs-namenode:9000/data/delta/bronze/telco/services (ingest_date=2025-10-11)
ðŸ”Ž Read-back sample:
+----------+-----+---+-----------+
|customerID|name |age|ingest_date|
+----------+-----+---+-----------+
|a001      |Alice|29 |2025-10-11 |
|c003      |Cara |35 |2025-10-11 |
|b002      |Bob  |41 |2025-10-11 |
+----------+-----+---+-----------+



In [4]:
import os
print("HADOOP_USER_NAME =", os.environ.get("HADOOP_USER_NAME"))


HADOOP_USER_NAME = spark


In [6]:
import os, re
from pyspark.sql import SparkSession
from pyspark.sql.functions import col

# Spark
spark = (
    SparkSession.builder
    .appName("rename-and-save-1")
    .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension")
        .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog")
        # HDFS endpoint (from docker-compose: hdfs-namenode on port 9000)
        .config("spark.hadoop.fs.defaultFS", "hdfs://hdfs-namenode:9000")
        # small local-friendly shuffle
        .config("spark.sql.shuffle.partitions", "8")
    .getOrCreate()
)

RAW_DIR = "/data/raw/telco/telco_local/"
OUT_DIR = "hdfs://hdfs-namenode:9000//data/delta/bronze/"

def normalize_token(s: str) -> str:
    s = s.strip().lower()
    s = re.sub(r'[^a-z0-9]+', '_', s)      # non-alnum -> _
    s = re.sub(r'_+', '_', s)               # collapse multiple _
    s = s.strip('_')
    return s

def make_prefix_from_filename(filename: str) -> str:
    base = os.path.splitext(filename)[0]
    # remove literal 'Telco_customer' (case-insensitive), then normalize
    base = re.sub(r'(?i)telco_customer', '', base)
    base = normalize_token(base)
    return base or "file"

def uniquify(names):
    """Ensure names are unique by appending _1, _2 ... when needed."""
    seen = {}
    out = []
    for n in names:
        if n not in seen:
            seen[n] = 0
            out.append(n)
        else:
            seen[n] += 1
            out.append(f"{n}_{seen[n]}")
    return out

# Ensure output root exists (Spark will create subdirs)
os.makedirs(OUT_DIR, exist_ok=True)

files = ['Telco_customer_churn_demographics.csv']
files.sort()

for f in files:
    path = os.path.join(RAW_DIR, f)
    prefix = make_prefix_from_filename(f)
    print("="*100)
    print(f"ðŸ“‚ Processing: {f}   â†’   prefix='{prefix}'")
    print("="*100)

    # Read (lazy)
    df = (
        spark.read
        .option("header", "true")
        .csv(path)
    )

    # Build rename map
    raw_cols = df.columns
    norm_cols = [normalize_token(c) for c in raw_cols]
    # prepend prefix
    target_cols = [f"{prefix}_{c}" if c else prefix for c in norm_cols]
    # ensure uniqueness (in case different raw columns normalize to same token)
    target_cols = uniquify(target_cols)
    print(target_cols)
    rename_pairs = list(zip(raw_cols, target_cols))
    # Apply renames
    for old, new in rename_pairs:
        if old != new:
            df = df.withColumnRenamed(old, new)

    # Write to Parquet (Spark-friendly)
    out_path = os.path.join(OUT_DIR, prefix)
    (
        df.write
        .mode("overwrite")     # idempotent reruns
        .parquet(out_path)
    )

    print(f"âœ… Saved: {out_path}")

ðŸ“‚ Processing: Telco_customer_churn_demographics.csv   â†’   prefix='churn_demographics'
['churn_demographics_customer_id_count_gender_age_under_30_senior_citizen_married_dependents_number_of_dependents']
âœ… Saved: hdfs://hdfs-namenode:9000//data/delta/bronze/churn_demographics
