In [23]:
spark.conf.set("spark.sql.parquet.vorder.enabled", "true")
spark.conf.set("spark.microsoft.delta.optimizeWrite.enabled", "true")
spark.conf.set("spark.microsoft.delta.optimizeWrite.binSize", "1073741824")
spark.conf.set("spark.sql.parquet.int96RebaseModeInRead", "LEGACY")
spark.conf.set("spark.sql.parquet.int96RebaseModeInWrite", "LEGACY")
spark.conf.set("spark.sql.legacy.timeParserPolicy", "CORRECTED")
from pyspark.sql.functions import col, when
from pyspark.sql import functions as F
from pyspark.sql.types import DataType
from datetime import datetime
import re
import os

df = spark.sql("SELECT * FROM DE_LH_RAW.erp_data")

# Replace nulls in integer/float columns with 0
df_cleaned = df.select([when(col(c).isNull(), 0).otherwise(col(c)).alias(c) 
                        if t in ["int", "double", "float"] else col(c) for c, t in df.dtypes])

# Replace nulls in string columns with 'UNKNOWN'
df_cleaned = df_cleaned.select([when(col(c).isNull(), "UNKNOWN").otherwise(col(c)).alias(c) 
                                if t == "string" else col(c) for c, t in df_cleaned.dtypes])

# Capitalize column names
df_cleaned = df_cleaned.select([F.col(c).alias(c.upper()) for c in df_cleaned.columns])

# Add "nav_" prefix and convert to lowercase
final_table_name = f"silver_erp_data"

# Add a special suffix for the Delta table
delta_table_name = f"{final_table_name}_delta"

# Save the DataFrame as a Delta table with column mapping mode 'name' and schema merge enabled
df_cleaned.write.format("delta").mode("overwrite")\
    .option("delta.columnMapping.mode", "name")\
    .option("mergeSchema", "true")\
    .saveAsTable(delta_table_name)

# # Converting the Delta table to a SQL Table
# spark.sql(f"CREATE OR REPLACE TABLE {final_table_name} AS SELECT * FROM {delta_table_name}")
# print("Conversion done for SQL table:", final_table_name)

# # Dropping Delta Table
# spark.sql(f"DROP TABLE IF EXISTS {delta_table_name}")

StatementMeta(, 266c4518-858c-4b71-9bce-45150d9abe1d, 25, Finished, Available, Finished)

Conversion done for SQL table: silver_erp_data


DataFrame[]