In [0]:


%python
bronze_df = spark.table("yogesh_catalog.pda.bronze_table")

In [0]:
%python
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *
import re
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, DateType, DoubleType
from datetime import datetime, timedelta

In [0]:
%python
bronze_df.show(10)
bronze_df= bronze_df.withColumn("ingestion_time", current_timestamp())  

In [0]:
%python

schema = StructType([
    StructField("Sales_Region", StringType(), True),
    StructField("Plant_Id", StringType(), True),
    StructField("Sorg", StringType(), True),
    StructField("Dch", IntegerType(), True),
    StructField("Div", IntegerType(), True),
    StructField("Item_Categ", StringType(), True),
    StructField("Customer_Grp_Id", StringType(), True),
    StructField("Customer_Grp_Name", StringType(), True),
    StructField("Order_type", StringType(), True),
    StructField("Sales_Order", IntegerType(), True),
    StructField("Invoice_No", IntegerType(), True),
    StructField("Invoice_Date", DateType(), True),
    StructField("State_Name", StringType(), True),
    StructField("Sales_Zone", StringType(), True),
    StructField("Customer_Id", IntegerType(), True),
    StructField("Customer_Name", StringType(), True),
    StructField("Address", StringType(), True),
    StructField("Product_Id", IntegerType(), True),
    StructField("Product_Name", StringType(), True),
    StructField("Quantity", IntegerType(), True),
    StructField("Basic_Rate", DoubleType(), True),
    StructField("Special_Ra", DoubleType(), True),
    StructField("Basic_Amt", DoubleType(), True),
    StructField("Discount_A", IntegerType(), True),
    StructField("Net_Amt_DC", DoubleType(), True),
    StructField("D380", StringType(), True),
    StructField("Type_Calculations", StringType(), True),
    StructField("Product_Family", StringType(), True),
    StructField("Product_Type", StringType(), True),
    StructField("Product_Category", StringType(), True),
    StructField("Quantity_2", IntegerType(), True),
    StructField("Per_Coll", DoubleType(), True),
    StructField("Number_of_Coll", DoubleType(), True),
    StructField("Andris", StringType(), True),
    StructField("EWH_Groups", StringType(), True),
    StructField("Ecom", StringType(), True),
    StructField("ingestion_time", TimestampType(), True)
])

# Create DataFrame with the specified schema
result_df = spark.createDataFrame([], schema)

In [0]:
%python
print(len(result_df.columns))

In [0]:
%python
 
# Function to clean column names
def clean_column_name(col_name):
    cleaned_name = col_name.strip()
    cleaned_name = re.sub(r'\s+', '_', cleaned_name)
    cleaned_name = re.sub(r'[^a-zA-Z0-9_]', '', cleaned_name)
    return cleaned_name
 
# Function to replace null values
def NullValueReplace(df):
    for column_name in df.columns:
        column_type = df.schema[column_name].dataType
        if column_type in (IntegerType(), LongType(), ShortType(), ByteType()):
            df = df.withColumn(column_name, when(df[column_name].isNull(), 0).otherwise(df[column_name]))
        elif column_type == StringType():
            df = df.withColumn(column_name, when(df[column_name].isNull(), "Other").otherwise(df[column_name]))
        elif column_type == DoubleType():
            df = df.withColumn(column_name, when(df[column_name].isNull(), 0.0).otherwise(df[column_name]))
    return df
 
# Function to drop unwanted columns
def DropUnwantedColumns(df):
    columns_to_drop = [
        'Product_Fa', 'Material_Description_2', 'EWH_Region_Classification', 'ES_State',
        'State_with_Haryana_1_2', 'Solar_Group', 'For_Split', 'Commercial_vs_Domestic',
        'Source', 'PF01', 'Weeks'
    ]
    for column_name in columns_to_drop:
        if column_name in df.columns:
            df = df.drop(column_name)
    return df

 
# Load and process data
threshold_time = datetime.now() - timedelta(hours=24)
result_df = None  # Initialize an empty DataFrame
 
# Clean column names  
df=bronze_df
recent_df = df.filter(col("ingestion_time") > threshold_time)
current_columns = df.columns
cleaned_columns = [clean_column_name(col_name) for col_name in current_columns]
df = df.toDF(*cleaned_columns)
 
# Drop unwanted columns
df = DropUnwantedColumns(df)
 
# Replace null values
df = NullValueReplace(df)
 
# Standardize schema
df = df.withColumnRenamed('REF', 'Sales_Region') \
    .withColumnRenamed('Sales_Orde', 'Sales_Order') \
    .withColumnRenamed('Invoice_Da', 'Invoice_Date') \
    .withColumnRenamed('Plant', 'Plant_Id') \
    .withColumnRenamed('Cust_Grp', 'Customer_Grp_Id') \
    .withColumnRenamed('Customer_G', 'Customer_Grp_Name') \
    .withColumnRenamed('Sold_To', 'Customer_Id') \
    .withColumnRenamed('Sold_to_Party_Name', 'Customer_Name') \
    .withColumnRenamed('Sold_To_City', 'Address') \
    .withColumnRenamed('Material', 'Product_Id') \
    .withColumnRenamed('Material_Description', 'Product_Name') \
    .withColumnRenamed('Type', 'Product_Type') \
    .withColumnRenamed('Solar_Type', 'Product_Category')
 
        # Initialize or union the DataFrame
if result_df is None:
    result_df = df
else:
    result_df = result_df.unionByName(df)
 
 
 
# Show the final DataFrame
if result_df:
    result_df.show()
 
 

In [0]:
%python
result_df.count()

In [0]:
%python
df_single_partition = result_df.repartition(1)

In [0]:
%python
result_df.write.format("delta").mode("overwrite").saveAsTable("yogesh_catalog.pda.silver_table")


In [0]:
DESCRIBE TABLE EXTENDED yogesh_catalog.pda.silver_table;

In [0]:
%python
result_df.write.format("delta").mode("overwrite").save("abfss://pda@yogeshdatalakegen2.dfs.core.windows.net/silver/csv/jan19/")
