In [0]:
from pyspark.sql.functions import *
from pyspark.sql.types import *
from pyspark.sql.window import Window

In [0]:
%sql
select * from table customer_df

### Customers data

In [0]:
customer_df=spark.read.format("csv").option("header","true").option("inferSchema","true").load('abfss://servicecenterdata@servicecentreadlsgen2.dfs.core.windows.net/project/bronze/Malik_Customers_Data.csv'
)

In [0]:
customer_df.count()

In [0]:
duplicates=customer_df.groupBy("customer_id").count().filter(col("count")>1)

In [0]:
#customer_df = customer_df.dropDuplicates(["customer_id"])

In [0]:
display(customer_df)

In [0]:
from pyspark.sql.functions import col
customer_df.filter(col("Pincode").isNull()).count()

In [0]:
duplicates=customer_df.groupBy("customer_id").count().filter(col("count")>1)

In [0]:
display(duplicates)

In [0]:
customer_df = customer_df.withColumnRenamed("Temporary Registration Number", "Temporary Registration Number").withColumnRenamed("Invoice Date", "Invoice_Date").withColumnRenamed("Chassis Number", "Chassis_Number")

In [0]:
%python
customer_df.write.format("delta").mode("overwrite").option("mergeSchema", "true").save("abfss://servicecenterdata@servicecentreadlsgen2.dfs.core.windows.net/project/bronze/output/customer_data")
customer_df.write.format("delta").mode("overwrite").option("mergeSchema", "true").saveAsTable("service_center.bronze_schema.customer_data")

### Sales Data

In [0]:
sales_df=spark.read.format("csv").option("header","true").option("inferSchema","true").load('abfss://servicecenterdata@servicecentreadlsgen2.dfs.core.windows.net/project/bronze/Malik_Motors_Sales.csv')

In [0]:
sales_df = sales_df.withColumn("Parts Price", col("Parts Price").cast(DoubleType()))

In [0]:
sales_df = sales_df.withColumnRenamed("Dealer Name", "Dealer_Name").withColumnRenamed("Base Price", "Base_Price").withColumnRenamed("Parts Price", "Parts_Price").withColumnRenamed("Parts Added", "Parts_Added")

In [0]:
display(sales_df)

In [0]:
sales_df = sales_df.withColumn("Parts_Added", 
                   when(col("Parts_Added") == "0", "NA")
                   .otherwise(col("Parts_Added")))

In [0]:
sales_df=sales_df.withColumnRenamed("Branch id", "Branch_id").withColumnRenamed("Invoice Date", "Invoice_Date").withColumnRenamed("Temporary Registration Number", "Temporary_Registration_Number").withColumnRenamed("Service Cost", "Service_Cost").withColumnRenamed("Service Tax", "Service_Tax").withColumnRenamed("Final Price", "Final_Price").withColumnRenamed("Base Price Tax", "Base_Price_Tax").withColumnRenamed("Discount Price", "Discount_Price").withColumnRenamed("Chassis Number", "Chassis_Number").withColumnRenamed("Base Price", "Base_Price").withColumnRenamed("Parts Added", "Parts_Added").withColumnRenamed("Parts Price", "Parts_Price").withColumnRenamed("Parts Tax", "Parts_Tax").withColumnRenamed("Dealer Name", "Dealer_Name")

In [0]:
display(sales_df)

In [0]:
duplicates=sales_df.groupBy("Chassis_Number").count().filter(col("count")>1)

In [0]:
display(duplicates)

In [0]:
display(sales_df)

In [0]:
%python
sales_df.write.format("delta").mode("overwrite").save("abfss://servicecenterdata@servicecentreadlsgen2.dfs.core.windows.net/project/bronze/output/sales_data")
sales_df.write.format("delta").option("mergeSchema", "true").mode("overwrite").saveAsTable("service_center.bronze_schema.sales_data")

### Vehicle Data

In [0]:
vehicle_df=spark.read.format("csv").option("header","true").option("inferSchema","true").load('abfss://servicecenterdata@servicecentreadlsgen2.dfs.core.windows.net/project/bronze/Malik_Motors_vehicle_data.csv')

In [0]:
vehicle_df = vehicle_df.drop("ChassisNumber")

In [0]:
vehicle_df=vehicle_df.na.drop()

In [0]:
display(vehicle_df)

In [0]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf, col
from pyspark.sql.functions import round as spark_round
from pyspark.sql.types import FloatType

# Define UDF
def extract_avg_price(price_str):
    import re
    price_str = price_str.lower().replace("lakh", "").strip()
    numbers = re.findall(r"\d+\.\d+", price_str)
    
    if len(numbers) == 2:
        return (float(numbers[0]) + float(numbers[1])) / 2
    elif len(numbers) == 1:
        return float(numbers[0])
    else:
        return None

# Register UDF
avg_price_udf = udf(extract_avg_price, FloatType())

# Apply UDF and round the result using spark_round
vehicle_df = vehicle_df.withColumn("avg_price_lakh", spark_round(avg_price_udf(col("price")), 2))

# Display
vehicle_df.display()


In [0]:
%python
# Set the session configuration to enable schema auto merge
spark.conf.set("spark.databricks.delta.schema.autoMerge.enabled", "true")

# Write the DataFrame with schema evolution enabled
vehicle_df.write.format("delta").option("mergeSchema", "true").mode("overwrite").save("abfss://servicecenterdata@servicecentreadlsgen2.dfs.core.windows.net/project/bronze/output/vehicle_data")

# Write the DataFrame to a table with schema evolution enabled
vehicle_df.write.format("delta").option("mergeSchema", "true").mode("overwrite").saveAsTable("service_center.bronze_schema.vehicle_data")

### branch convenience score

In [0]:
branch_conveniance_df=spark.read.format("csv").option('header','true').option('inferSchema','true').load("abfss://servicecenterdata@servicecentreadlsgen2.dfs.core.windows.net/project/bronze/branch_convience_score.csv")

In [0]:
branch_conveniance_df.filter(col('Branch_Name').isNull()).count()

In [0]:
display(branch_conveniance_df)

In [0]:
branch_conveniance_df=branch_conveniance_df.na.drop()

In [0]:
from pyspark.sql.functions import col
from pyspark.sql.types import DoubleType
 
branch_conveniance_df= branch_conveniance_df.withColumn("Workshop_Area_sqft", col("Workshop_Area_sqft").cast(DoubleType())) \
               .withColumn("Monthly_Revenue", col("Monthly_Revenue").cast(DoubleType())) \
               .withColumn("normalization_service_day", col("normalization_service_day").cast(DoubleType()))

In [0]:
branch_conveniance_df = branch_conveniance_df.withColumnRenamed("Number of Employees", "Number_of_Employees").withColumnRenamed("Service days", "Service_days")

In [0]:
duplicates=branch_conveniance_df.groupBy("Branch_Type").count().filter(col("count")>1)

In [0]:
branch_conveniance_df.write.format("delta").mode("overwrite").save("abfss://servicecenterdata@servicecentreadlsgen2.dfs.core.windows.net/project/bronze/output/branch_conveniance_score")

branch_conveniance_df.write.format("delta").option("mergeSchema", "true").mode("overwrite").saveAsTable("service_center.bronze_schema.branch_conveniance_score")

### branch wise model score


In [0]:
model_convenience_df=spark.read.format("csv").option('header','true').option('inferSchema','true').load("abfss://servicecenterdata@servicecentreadlsgen2.dfs.core.windows.net/project/bronze/Model_Convenience_Score.csv")

In [0]:
model_convenience_df=model_convenience_df.withColumn("Start Date",col("Start Date").cast("timestamp")).withColumn("Expected Date",col("Expected Date").cast("timestamp")).withColumn("Actual Date",col("Start Date").cast("timestamp"))

In [0]:
model_convenience_df = model_convenience_df.withColumnRenamed("Dealer Name", "Dealer_Name").withColumnRenamed("Branch_ID", "Branch_ID").withColumnRenamed("Branch Convenience Score", "Branch_Convenience_Score").withColumnRenamed("Number of Visits", "Number_of_Visits").withColumnRenamed("Start Date", "Start_Date").withColumnRenamed("Expected Date", "Expected_Date").withColumnRenamed("Actual Date","Actual_Date").withColumnRenamed("Total Price", "Total_Price").withColumnRenamed("Average Cost", "Average_Cost").withColumnRenamed("Average Delays", "Average_Delays").withColumnRenamed("Normalized Average Delays", "Normalized_Average_Delays").withColumnRenamed("Normalized Visit Count", "Normalized_Visit_Count").withColumnRenamed("Normalized Average Cost", "Normalized_Average_Cost").withColumnRenamed("Delay Days", "Delay_Days").withColumnRenamed("Visit Count", "Visit_Count")

In [0]:
model_convenience_df=model_convenience_df.drop("Actual Date ","_c0")

In [0]:
%python
null_counts = model_convenience_df.select(
    [sum(col(c).isNull().cast("int")).alias(c) for c in model_convenience_df.columns]
)
display(null_counts)

In [0]:
duplicates=model_convenience_df.groupBy("Branch_Id").count().filter(col("count")>1)

In [0]:
duplicates=model_convenience_df.groupBy("Branch_ID").count().filter(col("count")>1)
display(duplicates)

In [0]:
model_convenience_df.write.format("delta").mode("overwrite").save("abfss://servicecenterdata@servicecentreadlsgen2.dfs.core.windows.net/project/bronze/output/success_score")

model_convenience_df.write.format("delta").option("mergeSchema", "true").mode("overwrite").saveAsTable("service_center.bronze_schema.success_score")

## service data 


In [0]:
service_df=spark.read.format("csv").option("header",True).option("infeSchema",True).load("abfss://servicecenterdata@servicecentreadlsgen2.dfs.core.windows.net/project/bronze/Malik_motors_Service.csv")

In [0]:
service_df = service_df.withColumnRenamed("Chassis Number", "Chassis_Number").withColumnRenamed("Number of Visits", "Number_of_Visits").withColumnRenamed("Start Date", "Start_Date").withColumnRenamed("Expected Date", "Expected_Date").withColumnRenamed("Actual Date","Actual_Date").withColumnRenamed("Delay Days", "Delay_Days").withColumnRenamed("Speciality", "Speciality").withColumnRenamed("Parts Added", "Parts_Added").withColumnRenamed("Individual Parts Tax", "Individual_Parts_Tax").withColumnRenamed("Service Cost", "Service_Cost").withColumnRenamed("Service Tax", "Service_Tax").withColumnRenamed("Total Price", "Total_Price").withColumnRenamed("Customer Feedback", "Customer_Feedback").withColumnRenamed("Delayed Reason", "Delayed_Reason").withColumnRenamed("Previous Service Date", "Previous_Service_Date").withColumnRenamed("Individual Parts Cost", "Individual_Parts_Cost")

In [0]:
from pyspark.sql.functions import col
service_df=service_df.withColumn("Service_Tax",col("Service_Tax").cast("double")).withColumn("Service_Cost",col("Service_Cost").cast("double")).withColumn("Total_Price",col("Total_Price").cast("double")).withColumn("Individual_Parts_Cost",col("Individual_Parts_Cost").cast("double")).withColumn("Number_of_Visits",col("Number_of_Visits").cast("long")).withColumn("Start_Date",col("Start_Date").cast("timestamp")).withColumn("Expected_Date",col("Expected_Date").cast("timestamp")).withColumn("Actual_Date",col("Actual_Date").cast("timestamp")).withColumn("Delay_Days",col("Delay_Days").cast("int")).withColumn("Individual_Parts_Tax",col("Individual_Parts_Tax").cast("double")).withColumn("Previous_Service_Date",col("Previous_Service_Date").cast("timestamp"))

In [0]:
display(service_df)

In [0]:
null_counts = service_df.select([
    sum(when(col(c).isNull(), 1).otherwise(0)).alias(c)
    for c in service_df.columns
])

In [0]:
display(null_counts)

In [0]:
# service_df = service_df.fillna({"Start_Date": "2022-02-11"})
# service_df = service_df.fillna({"Expected_Date": "2022-02-14"})
# service_df = service_df.fillna({"Previous_Service_Date": "NaT"})
service_df=service_df.fillna({"Delayed_Reason": "NA"})

In [0]:
# service_df=service_df.withColumn("Previous_Service_Date",col("Previous_Service_Date").cast("timestamp"))

In [0]:
display(service_df)

In [0]:
from pyspark.sql import functions as F

service_df = service_df.withColumn(
    'Previous_Service_Date',
    F.when(F.col('Previous_Service_Date').isNull(), F.col('Start_Date'))
     .otherwise(F.col('Previous_Service_Date'))
)


In [0]:
service_df.write.format("delta").mode("overwrite").save("abfss://servicecenterdata@servicecentreadlsgen2.dfs.core.windows.net/project/bronze/output/service_centre")

service_df.write.format("delta").option("mergeSchema",True).mode("overwrite").saveAsTable("service_center.bronze_schema.service_centre")