In [None]:
from pyspark.sql import SparkSession

# Initialize Spark session
spark = SparkSession.builder \
    .appName("DataCleaningPipeline") \
    .getOrCreate()

# GCS bucket and folder paths
bucket_name = "on-prem-ingestion-data"  
folder_path = f"gs://{bucket_name}/2022_data_ingestion/"  # Adjust folder name based on your specific folder


In [None]:
# Load a single chunk for initial testing
chunk_file = folder_path + "data_page_1_20241019_155049.csv"  # Replace with actual chunk name if different
df = spark.read.csv(chunk_file, header=True, inferSchema=True)

# Display the schema and sample data
df.show(5)


In [None]:
from pyspark.sql.functions import regexp_replace, col, expr

# Step 1: Identify columns that have underscores at the start or end and clean the names
# We ensure that each column exists in the DataFrame
for col_name in df.columns:
    if col_name.startswith('_') or col_name.endswith('_'):
        # Strip leading and trailing underscores
        new_col_name = col_name.strip('_')
        # Only rename if the new name is different and doesn't already exist in the DataFrame
        if new_col_name != col_name and new_col_name not in df.columns:
            df = df.withColumnRenamed(col_name, new_col_name)

# Step 2: Use regexp_replace to clean byte strings in specified columns (IMONTH, IYEAR, IDAY, IDATE)
# Remove the 'b' prefix and surrounding quotes
df = df.withColumn("IMONTH", regexp_replace(col("IMONTH"), r"^b'|'$", ""))
df = df.withColumn("IYEAR", regexp_replace(col("IYEAR"), r"^b'|'$", ""))
df = df.withColumn("IDAY", regexp_replace(col("IDAY"), r"^b'|'$", ""))
df = df.withColumn("IDATE", regexp_replace(col("IDATE"), r"^b'|'$", ""))

# Step 3: Convert the cleaned columns to integers (if necessary)
df = df.withColumn("IMONTH", col("IMONTH").cast("int"))
df = df.withColumn("IYEAR", col("IYEAR").cast("int"))
df = df.withColumn("IDAY", col("IDAY").cast("int"))
df = df.withColumn("IDATE", col("IDATE").cast("int"))
df = df.withColumn("SEQNO", col("SEQNO").cast("int"))

# Display the cleaned DataFrame to verify changes
df.select("IMONTH", "IYEAR", "IDAY", "IDATE").show()


In [None]:
from pyspark.sql import functions as F

# Create a dictionary to map old column names to new column names
rename_dict = {col_name: col_name.strip('_') for col_name in df.columns if col_name.startswith('_') or col_name.endswith('_')}

# Apply the renaming in one go using `selectExpr` with aliasing
df = df.select([F.col(col_name).alias(new_col_name) if col_name in rename_dict else F.col(col_name) for col_name, new_col_name in rename_dict.items()])

# Show the updated DataFrame
df.show(5)


In [None]:
df