In [0]:
# spark.conf.set("fs.azure.account.auth.type.productdemandgen2.dfs.core.windows.net", "OAuth")
# spark.conf.set("fs.azure.account.oauth.provider.type.productdemandgen2.dfs.core.windows.net", "org.apache.hadoop.fs.azurebfs.oauth2.ClientCredsTokenProvider")
# spark.conf.set("fs.azure.account.oauth2.client.id.productdemandgen2.dfs.core.windows.net", "ce8eac0d-b508-42d9-b521-874b0ddbe926")
# spark.conf.set("fs.azure.account.oauth2.client.secret.productdemandgen2.dfs.core.windows.net", "U4-8Q~Eov~WShrl6Yh9a0XCj2qCTBLL_-qIDXaL9")
# spark.conf.set("fs.azure.account.oauth2.client.endpoint.productdemandgen2.dfs.core.windows.net", "https://login.microsoftonline.com/ba9d3701-1fca-44e6-b474-474f610b0e97/oauth2/token")


In [0]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *
import re
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, DateType, DoubleType
from datetime import datetime, timedelta

In [0]:
dbutils.fs.ls("/mnt/bronze-gaurav")

In [0]:
# Define schema for the DataFrame
schema = StructType([
    StructField("Sales_Region", StringType(), True),
    StructField("Plant_Id", StringType(), True),
    StructField("Sorg", StringType(), True),
    StructField("Dch", IntegerType(), True),
    StructField("Div", IntegerType(), True),
    StructField("Item_Categ", StringType(), True),
    StructField("Customer_Grp_Id", StringType(), True),
    StructField("Customer_Grp_Name", StringType(), True),
    StructField("Order_type", StringType(), True),
    StructField("Sales_Order", IntegerType(), True),
    StructField("Invoice_No", IntegerType(), True),
    StructField("Invoice_Date", DateType(), True),
    StructField("State_Name", StringType(), True),
    StructField("Sales_Zone", StringType(), True),
    StructField("Customer_Id", IntegerType(), True),
    StructField("Customer_Name", StringType(), True),
    StructField("Address", StringType(), True),
    StructField("Product_Id", IntegerType(), True),
    StructField("Product_Name", StringType(), True),
    StructField("Quantity", IntegerType(), True),
    StructField("Basic_Rate", DoubleType(), True),
    StructField("Special_Ra", DoubleType(), True),
    StructField("Basic_Amt", DoubleType(), True),
    StructField("Discount_A", IntegerType(), True),
    StructField("Net_Amt_DC", DoubleType(), True),
    StructField("D380", StringType(), True),
    StructField("Type_Calculations", StringType(), True),
    StructField("Product_Family", StringType(), True),
    StructField("Product_Type", StringType(), True),
    StructField("Product_Category", StringType(), True),
    StructField("Quantity_2", IntegerType(), True),
    StructField("Per_Coll", DoubleType(), True),
    StructField("Number_of_Coll", DoubleType(), True),
    StructField("Andris", StringType(), True),
    StructField("EWH_Groups", StringType(), True),
    StructField("Ecom", StringType(), True)
])

# Create DataFrame with the specified schema
result_df = spark.createDataFrame([], schema)

In [0]:
print(len(result_df.columns))

In [0]:
 
# Function to clean column names
def clean_column_name(col_name):
    cleaned_name = col_name.strip()
    cleaned_name = re.sub(r'\s+', '_', cleaned_name)
    cleaned_name = re.sub(r'[^a-zA-Z0-9_]', '', cleaned_name)
    return cleaned_name
 
# Function to replace null values
def NullValueReplace(df):
    for column_name in df.columns:
        column_type = df.schema[column_name].dataType
        if column_type in (IntegerType(), LongType(), ShortType(), ByteType()):
            df = df.withColumn(column_name, when(df[column_name].isNull(), 0).otherwise(df[column_name]))
        elif column_type == StringType():
            df = df.withColumn(column_name, when(df[column_name].isNull(), "Other").otherwise(df[column_name]))
        elif column_type == DoubleType():
            df = df.withColumn(column_name, when(df[column_name].isNull(), 0.0).otherwise(df[column_name]))
    return df
 
# Function to drop unwanted columns
def DropUnwantedColumns(df):
    columns_to_drop = [
        'Product_Fa', 'Material_Description_2', 'EWH_Region_Classification', 'ES_State',
        'State_with_Haryana_1_2', 'Solar_Group', 'For_Split', 'Commercial_vs_Domestic',
        'Source', 'PF01', 'Weeks'
    ]
    for column_name in columns_to_drop:
        if column_name in df.columns:
            df = df.drop(column_name)
    return df
 
# Load and process data
threshold_time = datetime.now() - timedelta(hours=24)
result_df = None  # Initialize an empty DataFrame
 
for filePath in dbutils.fs.ls('mnt/bronze-gaurav'):
    if datetime.fromtimestamp(filePath.modificationTime / 1000) > threshold_time:
        path = filePath[0]
        print(f"Processing file: {path}")
        df = spark.read.format("csv").options(header=True, inferSchema=True).load(path)
 
        # Clean column names
        current_columns = df.columns
        cleaned_columns = [clean_column_name(col_name) for col_name in current_columns]
        df = df.toDF(*cleaned_columns)
 
        # Drop unwanted columns
        df = DropUnwantedColumns(df)
 
        # Replace null values
        df = NullValueReplace(df)
 
        # Standardize schema
        df = df.withColumnRenamed('REF', 'Sales_Region') \
               .withColumnRenamed('Sales_Orde', 'Sales_Order') \
               .withColumnRenamed('Invoice_Da', 'Invoice_Date') \
               .withColumnRenamed('Plant', 'Plant_Id') \
               .withColumnRenamed('Cust_Grp', 'Customer_Grp_Id') \
               .withColumnRenamed('Customer_G', 'Customer_Grp_Name') \
               .withColumnRenamed('Sold_To', 'Customer_Id') \
               .withColumnRenamed('Sold_to_Party_Name', 'Customer_Name') \
               .withColumnRenamed('Sold_To_City', 'Address') \
               .withColumnRenamed('Material', 'Product_Id') \
               .withColumnRenamed('Material_Description', 'Product_Name') \
               .withColumnRenamed('Type', 'Product_Type') \
               .withColumnRenamed('Solar_Type', 'Product_Category')
 
        # Initialize or union the DataFrame
        if result_df is None:
            result_df = df
        else:
            result_df = result_df.unionByName(df)
 
 
 
# Show the final DataFrame
if result_df:
    result_df.show()
 
 

In [0]:
result_df.count()

In [0]:
df_single_partition = result_df.repartition(1)

In [0]:
opt_path='/mnt/silver-gaurav'
# result_df.write.option("header","true").csv(opt_path+'/'+'Df.csv',mode='ignore')
df_single_partition.write.option("header","true").parquet(opt_path, mode='overwrite')

Code end here

In [0]:
# from datetime import datetime, timedelta
# # Define a threshold for recent modification time (e.g., within the last 24 hours)
# threshold_time = datetime.now() - timedelta(hours=48)

# # Filter the list to select only the recently added files
# recent_files = list([file_info for file_info in dbutils.fs.ls('mnt/bronze') if datetime.fromtimestamp(file_info.modificationTime / 1000) > threshold_time])

# # Print the list of recently added files
# for file_path in recent_files:
#     print(file_path) 

In [0]:
# path1=recent_files[0][0]
# print(path1)

In [0]:
# df=spark.read.format("csv").options(header=True,inferschema=True).load(path1)
# df.printSchema()

In [0]:
# print(len(df.columns))

column name standerdization 

In [0]:
# def clean_column_name(col_name):
#     # Remove leading and trailing spaces
#     cleaned_name = col_name.strip()
#     # Replace spaces between words with underscores
#     cleaned_name = re.sub(r'\s+', '_', cleaned_name)
#     # Replace special characters with empty string
#     cleaned_name = re.sub(r'[^a-zA-Z0-9_]', '', cleaned_name)
#     return cleaned_name

# # Get the current column names
# current_columns = df.columns

# # Clean column names
# cleaned_columns = [clean_column_name(col_name) for col_name in current_columns]

# # Rename columns
# df = df.toDF(*cleaned_columns)
# # print(cleaned_columns)

In [0]:
# df.printSchema()

Dropping Unwanted Columns

In [0]:
# df=df.drop('Product_Fa','Material_Description_2','EWH_Region_Classification','ES_State',
#             'State_with_Haryana_1_2','Solar_Group','For_Split','Commercial_vs_Domestic')

# #Dropping Additional column which is present only in first sheet
# for column_name in df.columns:
#     if column_name=='Source':
#         df=df.drop('Source')
#     if column_name=='PF01':
#         df=df.drop('PF01')


In [0]:
# print(len(df.columns))

In [0]:
# df.printSchema()

Checking null values and replacing it as per data type


In [0]:
# # Iterate over each column
# for column_name in df.columns:
#     # Check if the column contains null values
#     if df.filter(df[column_name].isNull()).count() > 0:
#         # Get the data type of the column
#         column_type = df.schema[column_name].dataType
#         # Replace null values based on the data type
#         if column_type in (IntegerType(), LongType(), ShortType(), ByteType()):
#             df = df.withColumn(column_name, when(df[column_name].isNull(), 0).otherwise(df[column_name]))
#         elif column_type == StringType():
#             df = df.withColumn(column_name, when(df[column_name].isNull(), "Other").otherwise(df[column_name]))
#         elif column_type == DoubleType():
#             df = df.withColumn(column_name, when(df[column_name].isNull(), 0.0).otherwise(df[column_name]))

column Rename 
 

In [0]:
# clm=df.columns
# print(clm)

In [0]:
# df=df.withColumnRenamed('REF','Sales_Region')\
#     .withColumnRenamed('Sales_Orde','Sales_Order')\
#     .withColumnRenamed('Invoice_Da','Invoice_Date')\
#     .withColumnRenamed('Plant','Plant_Id')\
#     .withColumnRenamed('Cust_Grp','Customer_Grp_Id')\
#     .withColumnRenamed('Customer_G','Customer_Grp_Name')\
#     .withColumnRenamed('Sold_To','Customer_Id')\
#     .withColumnRenamed('Sold_to_Party_Name','Customer_Name')\
#     .withColumnRenamed('Sold_To_City','Address')\
#     .withColumnRenamed('Material','Product_Id')\
#     .withColumnRenamed('Material_Description','Product_Name')\
#     .withColumnRenamed('Type','Product_Type')\
#     .withColumnRenamed('Solar_Type','Product_Category')

In [0]:
# print(df.columns)

In [0]:
# clm= ['Sales_Region', 'Plant_Id', 'Sorg', 'Dch', 'Div', 'Item_Categ', 'Customer_Grp_Id', 'Customer_Grp_Name', 'Order_type', 'Sales_Order', 'Invoice_No', 'Invoice_Date', 'Weeks', 'State_Name', 'Sales_Zone', 'Customer_Id', 'Customer_Name', 'Address', 'Product_Id', 'Product_Name', 'Quantity', 'Basic_Rate', 'Special_Ra', 'Basic_Amt', 'Discount_A', 'Net_Amt_DC', 'D380', 'Type_Calculations', 'Product_Family', 'Product_Type', 'Product_Category', 'Quantity_2', 'Per_Coll', 'Number_of_Coll', 'Andris', 'EWH_Groups', 'Ecom']
# print(len(clm))

Before it merge all sheet data to single df 

Creating fact and dimentions

In [0]:
# def cust_dim(df):
#   customer_df=df.select('Customer_Id','Customer_Name','Address')
#   df=df.drop('Customer_Name','Address') #dropping columns from main table which are taken in cusomer df
#   customer_df=customer_df.dropDuplicates(['Customer_Id'])
#   # customer_df.show()
#   return customer_df, df

# def plant_dim(df):
#   plant_df=df.select('Plant_Id','Sorg','Dch','Div')
#   #dropping columns from main table which are taken in pant df
#   df=df.drop('Sorg','Dch','Div')
#   plant_df=plant_df.dropDuplicates(['Plant_Id','Dch'])
#   return plant_df, df

# def custgrp_dim(df):
#   custgrp_df=df.select('Customer_Grp_Id','Customer_Grp_Name')
#   print("Before",custgrp_df.count())
#   #dropping columns from main table which are taken in pant df
#   df=df.drop('Customer_Grp_Name')
#   custgrp_df=custgrp_df.dropDuplicates(['Customer_Grp_Id'])
#   print("After",custgrp_df.count())
#   return custgrp_df, df

# def product_dim(df):
#   material_df=df.select('Product_Id','Product_Name','Product_Category','Product_Type','Andris','EWH_Groups')
#   print("Before",material_df.count())
#   #dropping columns from main table which are taken in pant df
#   df=df.drop('Product_Name','Product_Category','Product_Type','Andris','EWH_Groups')
#   material_df=material_df.dropDuplicates(['Product_Id'])
#   print("After",material_df.count())
#   return material_df, df

# def location_dim(df):
#   location_df=df.select(col('State_Name').alias('StateName'),col('Sales_Zone').alias('SalesZone'),col('Sales_Region').alias('SalesRegion'))
#   location_df=location_df.dropDuplicates(['StateName','SalesZone','SalesRegion'])
#   # Add a unique id column to the existing dataframe
#   location_df = location_df.withColumn("StateId", monotonically_increasing_id())
#   # Convert the id column to a string
#   location_df = location_df.withColumn("StateId", concat(lit("S"), col("StateId").cast("string")))
#   return location_df, df

Creating Fact and Dimentions

In [0]:
# #Creating Customer Dimention
# customer_df,df=cust_dim(df)
# #Creating Plant Dimention
# plant_df,df=plant_dim(df)
# #Creating Customer_group Dimention
# custgrp_df,df=custgrp_dim(df)
# #Creating product Dimention
# product_df,df=product_dim(df)
# #Creating Location Dimention
# location_df,df=location_dim(df)
# location_df.dropDuplicates(['StateId'])
# #addting loaction_id to fact data frame 
# df = df.join(location_df,
#                     (df.State_Name == location_df.StateName) &
#                     (df.Sales_Zone == location_df.SalesZone) &
#                     (df.Sales_Region == location_df.SalesRegion),
#                     "left")
# df=df.drop("State_Name","Sales_Zone","Sales_Region","StateName","SalesZone","SalesRegion")
# #Df is fact data frame

In [0]:
# demo_df.write.option("header","true").csv(opt_path+'/'+'demodf',mode='ignore')
# location_df.write.option("header","true").csv(opt_path+'/'+'locationDf',mode='ignore')

In [0]:

# df.write.option("header","true").csv(opt_path+'/'+'salesfactData',mode='ignore')

## GIT changes

In [0]:
# df.write.option("header","true").csv(opt_path+'/'+'salesfactData',mode='ignore')