LOAD CSV INTO DATAFRAME

In [0]:
# Read customers.csv file from FileStore
customer_df = spark.read.option("header", True).option("inferSchema", True).csv("/FileStore/tables/customers.csv")

# Show schema
customer_df.printSchema()

# Show sample records
customer_df.show(1)


root
 |-- customer_id: integer (nullable = true)
 |-- name: string (nullable = true)
 |-- email: string (nullable = true)
 |-- phone: long (nullable = true)
 |-- city: string (nullable = true)

+-----------+--------+----------------+----------+------+
|customer_id|    name|           email|     phone|  city|
+-----------+--------+----------------+----------+------+
|        101|John Doe|john@example.com|1234567890|Mumbai|
+-----------+--------+----------------+----------+------+
only showing top 1 row



In [0]:
customer_df_clean=customer_df.dropna(how="all")
customer_df.printSchema()
customer_df_clean.show()

root
 |-- customer_id: integer (nullable = true)
 |-- name: string (nullable = true)
 |-- email: string (nullable = true)
 |-- phone: long (nullable = true)
 |-- city: string (nullable = true)

+-----------+-----------+----------------+----------+------+
|customer_id|       name|           email|     phone|  city|
+-----------+-----------+----------------+----------+------+
|        101|   John Doe|john@example.com|1234567890|Mumbai|
|        102| Jane Smith|jane@example.com|2345678901|  Pune|
|        103|Bob Johnson| bob@example.com|3456789012|Nagpur|
+-----------+-----------+----------------+----------+------+



In [0]:
from pyspark.sql.functions import col,sum,when
null_count=customer_df.select([
    sum(when(col(c).isNull(),1).otherwise(0)).alias(c)
    for c in customer_df.columns
])
null_count.show()

+-----------+----+-----+-----+----+
|customer_id|name|email|phone|city|
+-----------+----+-----+-----+----+
|          0|   0|    0|    0|   0|
+-----------+----+-----+-----+----+



In [0]:
from pyspark.sql.functions import col
customer_df_clean=customer_df.withColumn("customer_id",col("customer_id").cast("int"))
customer_df_clean.show()
customer_df_clean.printSchema()


+-----------+-----------+----------------+----------+------+
|customer_id|       name|           email|     phone|  city|
+-----------+-----------+----------------+----------+------+
|        101|   John Doe|john@example.com|1234567890|Mumbai|
|        102| Jane Smith|jane@example.com|2345678901|  Pune|
|        103|Bob Johnson| bob@example.com|3456789012|Nagpur|
+-----------+-----------+----------------+----------+------+

root
 |-- customer_id: integer (nullable = true)
 |-- name: string (nullable = true)
 |-- email: string (nullable = true)
 |-- phone: long (nullable = true)
 |-- city: string (nullable = true)



In [0]:
customer_df_clean.write.format("delta").mode("overwrite").save("/mnt/raw/customers_delta")

In [0]:
customer_df_clean.printSchema()

root
 |-- customer_id: integer (nullable = true)
 |-- name: string (nullable = true)
 |-- email: string (nullable = true)
 |-- phone: long (nullable = true)
 |-- city: string (nullable = true)



In [0]:
customer_df_clean.write.format("delta").mode("overwrite").partitionBy("city").save("/mnt/raw/customers_delta_partition")
customer_df_clean.show()

+-----------+-----------+----------------+----------+------+
|customer_id|       name|           email|     phone|  city|
+-----------+-----------+----------------+----------+------+
|        101|   John Doe|john@example.com|1234567890|Mumbai|
|        102| Jane Smith|jane@example.com|2345678901|  Pune|
|        103|Bob Johnson| bob@example.com|3456789012|Nagpur|
+-----------+-----------+----------------+----------+------+



In [0]:
display(dbutils.fs.ls("/mnt/raw/customers_delta_partition"))

path,name,size,modificationTime
dbfs:/mnt/raw/customers_delta_partition/_delta_log/,_delta_log/,0,0
dbfs:/mnt/raw/customers_delta_partition/city=Mumbai/,city=Mumbai/,0,0
dbfs:/mnt/raw/customers_delta_partition/city=Nagpur/,city=Nagpur/,0,0
dbfs:/mnt/raw/customers_delta_partition/city=Pune/,city=Pune/,0,0


In [0]:
from delta.tables import DeltaTable
dt=DeltaTable.forPath(spark,"/mnt/raw/customers_delta_partition")
dt.history().show(truncate=False)

+-------+-------------------+----------------+--------------------------+---------+--------------------------------------------+----+------------------+--------------------+-----------+-----------------+-------------+-----------------------------------------------------------+------------+-----------------------------------+
|version|timestamp          |userId          |userName                  |operation|operationParameters                         |job |notebook          |clusterId           |readVersion|isolationLevel   |isBlindAppend|operationMetrics                                           |userMetadata|engineInfo                         |
+-------+-------------------+----------------+--------------------------+---------+--------------------------------------------+----+------------------+--------------------+-----------+-----------------+-------------+-----------------------------------------------------------+------------+-----------------------------------+
|8      |2025-06-25

In [0]:
customers_df_clean = spark.read.format("delta").load("/mnt/raw/customers_delta_partition")
vehicles_df = spark.read.format("delta").load("/mnt/raw/vehicles_delta_partitioned")

In [0]:

customer_360_df = customers_df_clean.join(vehicles_df, "customer_id", "inner")

In [0]:
customer_360_df.write.format('delta')\
                                     .mode("overwrite")\
                                      .save('/mnt/curated/customer_360')

In [0]:
display(dbutils.fs.ls('/mnt/curated/customer_360'))

path,name,size,modificationTime
dbfs:/mnt/curated/customer_360/_delta_log/,_delta_log/,0,0
dbfs:/mnt/curated/customer_360/part-00000-017f0ce5-4f96-4739-abbd-d33a392be53f-c000.snappy.parquet,part-00000-017f0ce5-4f96-4739-abbd-d33a392be53f-c000.snappy.parquet,2639,1750661886000
dbfs:/mnt/curated/customer_360/part-00000-05fd63a5-dc33-42f9-ad4c-1d5c9308a4b3-c000.snappy.parquet,part-00000-05fd63a5-dc33-42f9-ad4c-1d5c9308a4b3-c000.snappy.parquet,2639,1750674869000
dbfs:/mnt/curated/customer_360/part-00000-0fbfe2ed-caad-4cb4-9bbb-457fd2065d25-c000.snappy.parquet,part-00000-0fbfe2ed-caad-4cb4-9bbb-457fd2065d25-c000.snappy.parquet,2639,1750756709000
dbfs:/mnt/curated/customer_360/part-00000-2882a408-6c91-410e-8168-89143983e2ef-c000.snappy.parquet,part-00000-2882a408-6c91-410e-8168-89143983e2ef-c000.snappy.parquet,2639,1750844218000
dbfs:/mnt/curated/customer_360/part-00000-32daf787-39c8-4ceb-a28c-179d125363fe-c000.snappy.parquet,part-00000-32daf787-39c8-4ceb-a28c-179d125363fe-c000.snappy.parquet,2639,1750675402000
dbfs:/mnt/curated/customer_360/part-00000-a8a84aa1-786b-4e54-89d3-ff644ccac3ee-c000.snappy.parquet,part-00000-a8a84aa1-786b-4e54-89d3-ff644ccac3ee-c000.snappy.parquet,2639,1750600140000
dbfs:/mnt/curated/customer_360/part-00000-ce049315-512a-4be9-a3b5-92ac27fca8c6-c000.snappy.parquet,part-00000-ce049315-512a-4be9-a3b5-92ac27fca8c6-c000.snappy.parquet,2639,1750683738000
dbfs:/mnt/curated/customer_360/part-00001-282a24a5-2434-4751-8361-a7d5544e4c24-c000.snappy.parquet,part-00001-282a24a5-2434-4751-8361-a7d5544e4c24-c000.snappy.parquet,2639,1750675402000
dbfs:/mnt/curated/customer_360/part-00001-807153d0-885a-4cae-8241-d3c2da31f01c-c000.snappy.parquet,part-00001-807153d0-885a-4cae-8241-d3c2da31f01c-c000.snappy.parquet,2639,1750844218000


In [0]:
customer_360_df=spark.read.format('delta') \
            .option("header",'true')\
                .option("inferSchema",'true')\
                .load('/mnt/curated/customer_360')

In [0]:
customer_360_df.show()
customer_360_df.printSchema()

+-----------+-----------+----------------+----------+------+----------+------+-----+----+
|customer_id|       name|           email|     phone|  city|vehicle_id|  make|model|year|
+-----------+-----------+----------------+----------+------+----------+------+-----+----+
|        101|   John Doe|john@example.com|1234567890|Mumbai|         1|Toyota|Camry|2020|
|        103|Bob Johnson| bob@example.com|3456789012|Nagpur|         3|  Ford|Focus|2019|
|        102| Jane Smith|jane@example.com|2345678901|  Pune|         2| Honda|Civic|2021|
+-----------+-----------+----------------+----------+------+----------+------+-----+----+

root
 |-- customer_id: integer (nullable = true)
 |-- name: string (nullable = true)
 |-- email: string (nullable = true)
 |-- phone: long (nullable = true)
 |-- city: string (nullable = true)
 |-- vehicle_id: integer (nullable = true)
 |-- make: string (nullable = true)
 |-- model: string (nullable = true)
 |-- year: integer (nullable = true)



In [0]:
from pyspark.sql.functions import count
vehicalPerCustomer_df=customer_360_df.groupBy("customer_id","name")\
                    .agg(count("vehicle_id").alias("vehicle_count"))
vehicalPerCustomer_df.show()

+-----------+-----------+-------------+
|customer_id|       name|vehicle_count|
+-----------+-----------+-------------+
|        103|Bob Johnson|            1|
|        101|   John Doe|            1|
|        102| Jane Smith|            1|
+-----------+-----------+-------------+



In [0]:
vehicalPerCustomer_df.filter(" name='John Doe' AND vehicle_count=1").show()

+-----------+--------+-------------+
|customer_id|    name|vehicle_count|
+-----------+--------+-------------+
|        101|John Doe|            1|
+-----------+--------+-------------+



In [0]:
customer_360_df.groupBy("city").agg(count('vehicle_id').alias('vehicle_per_city')).show()

+------+----------------+
|  city|vehicle_per_city|
+------+----------------+
|Nagpur|               1|
|Mumbai|               1|
|  Pune|               1|
+------+----------------+



GOLD LAYER CREATION
****

Step 1: Create the Aggregated DataFrame

In [0]:
from pyspark.sql.functions import count
vehicle_count_df=customer_360_df.groupBy('customer_id','name',"city").agg(count('vehicle_id').alias('vehicle_count'))
vehicle_count_df.show()

+-----------+-----------+------+-------------+
|customer_id|       name|  city|vehicle_count|
+-----------+-----------+------+-------------+
|        103|Bob Johnson|Nagpur|            1|
|        101|   John Doe|Mumbai|            1|
|        102| Jane Smith|  Pune|            1|
+-----------+-----------+------+-------------+



In [0]:
display(dbutils.fs.ls("/mnt/gold/customer_vehicle_count"))

path,name,size,modificationTime
dbfs:/mnt/gold/customer_vehicle_count/_delta_log/,_delta_log/,0,0
dbfs:/mnt/gold/customer_vehicle_count/part-00000-01758f6e-ee21-4952-9406-40fc84f9d3d8-c000.snappy.parquet,part-00000-01758f6e-ee21-4952-9406-40fc84f9d3d8-c000.snappy.parquet,1198,1750684198000
dbfs:/mnt/gold/customer_vehicle_count/part-00000-07b80a7c-21ee-4e79-9b50-e1ffc0347ca8-c000.snappy.parquet,part-00000-07b80a7c-21ee-4e79-9b50-e1ffc0347ca8-c000.snappy.parquet,1198,1750684173000
dbfs:/mnt/gold/customer_vehicle_count/part-00000-14976840-006b-4dc0-a467-0b95e2e64d9e-c000.snappy.parquet,part-00000-14976840-006b-4dc0-a467-0b95e2e64d9e-c000.snappy.parquet,913,1750684858000
dbfs:/mnt/gold/customer_vehicle_count/part-00000-185898f4-b079-47b1-87c3-9e017f7e1425-c000.snappy.parquet,part-00000-185898f4-b079-47b1-87c3-9e017f7e1425-c000.snappy.parquet,1198,1750675366000
dbfs:/mnt/gold/customer_vehicle_count/part-00000-31ba8584-22ed-44bc-8a75-1d90d464081e-c000.snappy.parquet,part-00000-31ba8584-22ed-44bc-8a75-1d90d464081e-c000.snappy.parquet,922,1750668399000
dbfs:/mnt/gold/customer_vehicle_count/part-00000-3f559176-910a-47c3-81c1-ddd3b426d5c1-c000.snappy.parquet,part-00000-3f559176-910a-47c3-81c1-ddd3b426d5c1-c000.snappy.parquet,1198,1750685597000
dbfs:/mnt/gold/customer_vehicle_count/part-00000-9edfac4f-10f5-44fd-803a-f4737a735aef-c000.snappy.parquet,part-00000-9edfac4f-10f5-44fd-803a-f4737a735aef-c000.snappy.parquet,913,1750684419000
dbfs:/mnt/gold/customer_vehicle_count/part-00000-a3418094-9d67-4f5b-a9c1-20a924abe678-c000.snappy.parquet,part-00000-a3418094-9d67-4f5b-a9c1-20a924abe678-c000.snappy.parquet,1198,1750675418000
dbfs:/mnt/gold/customer_vehicle_count/part-00000-a5499ae2-3048-4b4a-8b36-ab283ddd3845-c000.snappy.parquet,part-00000-a5499ae2-3048-4b4a-8b36-ab283ddd3845-c000.snappy.parquet,913,1750684404000


 Step 2: Write to Gold Delta Table

In [0]:
vehicle_count_df.write.format('delta')\
                      .mode("overwrite")\
                      .save("/mnt/gold/customer_vehicle_count")


Step 3: Verify the Write

In [0]:
display(dbutils.fs.ls("/mnt/gold/customer_vehicle_count"))

path,name,size,modificationTime
dbfs:/mnt/gold/customer_vehicle_count/_delta_log/,_delta_log/,0,0
dbfs:/mnt/gold/customer_vehicle_count/part-00000-01758f6e-ee21-4952-9406-40fc84f9d3d8-c000.snappy.parquet,part-00000-01758f6e-ee21-4952-9406-40fc84f9d3d8-c000.snappy.parquet,1198,1750684198000
dbfs:/mnt/gold/customer_vehicle_count/part-00000-07b80a7c-21ee-4e79-9b50-e1ffc0347ca8-c000.snappy.parquet,part-00000-07b80a7c-21ee-4e79-9b50-e1ffc0347ca8-c000.snappy.parquet,1198,1750684173000
dbfs:/mnt/gold/customer_vehicle_count/part-00000-14976840-006b-4dc0-a467-0b95e2e64d9e-c000.snappy.parquet,part-00000-14976840-006b-4dc0-a467-0b95e2e64d9e-c000.snappy.parquet,913,1750684858000
dbfs:/mnt/gold/customer_vehicle_count/part-00000-185898f4-b079-47b1-87c3-9e017f7e1425-c000.snappy.parquet,part-00000-185898f4-b079-47b1-87c3-9e017f7e1425-c000.snappy.parquet,1198,1750675366000
dbfs:/mnt/gold/customer_vehicle_count/part-00000-31ba8584-22ed-44bc-8a75-1d90d464081e-c000.snappy.parquet,part-00000-31ba8584-22ed-44bc-8a75-1d90d464081e-c000.snappy.parquet,922,1750668399000
dbfs:/mnt/gold/customer_vehicle_count/part-00000-3f559176-910a-47c3-81c1-ddd3b426d5c1-c000.snappy.parquet,part-00000-3f559176-910a-47c3-81c1-ddd3b426d5c1-c000.snappy.parquet,1198,1750685597000
dbfs:/mnt/gold/customer_vehicle_count/part-00000-9ada0545-eeb4-4d76-b7ef-f2d91216335f-c000.snappy.parquet,part-00000-9ada0545-eeb4-4d76-b7ef-f2d91216335f-c000.snappy.parquet,1443,1750844239000
dbfs:/mnt/gold/customer_vehicle_count/part-00000-9edfac4f-10f5-44fd-803a-f4737a735aef-c000.snappy.parquet,part-00000-9edfac4f-10f5-44fd-803a-f4737a735aef-c000.snappy.parquet,913,1750684419000
dbfs:/mnt/gold/customer_vehicle_count/part-00000-a3418094-9d67-4f5b-a9c1-20a924abe678-c000.snappy.parquet,part-00000-a3418094-9d67-4f5b-a9c1-20a924abe678-c000.snappy.parquet,1198,1750675418000


In [0]:
spark.read.format('delta').load("/mnt/gold/customer_vehicle_count").show()

+-----------+-------------+-----------+------+
|customer_id|vehicle_count|       name|  city|
+-----------+-------------+-----------+------+
|        103|            1|Bob Johnson|Nagpur|
|        101|            1|   John Doe|Mumbai|
|        102|            1| Jane Smith|  Pune|
+-----------+-------------+-----------+------+



**GOLD TABLE 2: Vehicle Count Per city**

Step 1: Group and Aggregate

In [0]:
vehicle_count_per_city=customer_360_df.groupby('city')\
                    .agg(count('vehicle_id').alias('vehicle_count'))\
                        .orderBy("vehicle_count", ascending=False)


Step 2: Show Results

In [0]:
vehicle_count_per_city.show()

+------+-------------+
|  city|vehicle_count|
+------+-------------+
|Nagpur|            1|
|Mumbai|            1|
|  Pune|            1|
+------+-------------+



In [0]:
vehicle_count_per_city.write.format('delta')\
                      .mode("overwrite")\
                       .save("/mnt/gold/vehicle_count_per_city")


In [0]:
spark.read.format('delta').load("/mnt/gold/vehicle_count_per_city").show()

+------+-------------+
|  city|vehicle_count|
+------+-------------+
|Nagpur|            1|
|Mumbai|            1|
|  Pune|            1|
+------+-------------+



In [0]:
display(dbutils.fs.ls("/mnt/gold/customer_vehicle_count"))

path,name,size,modificationTime
dbfs:/mnt/gold/customer_vehicle_count/_delta_log/,_delta_log/,0,0
dbfs:/mnt/gold/customer_vehicle_count/part-00000-01758f6e-ee21-4952-9406-40fc84f9d3d8-c000.snappy.parquet,part-00000-01758f6e-ee21-4952-9406-40fc84f9d3d8-c000.snappy.parquet,1198,1750684198000
dbfs:/mnt/gold/customer_vehicle_count/part-00000-07b80a7c-21ee-4e79-9b50-e1ffc0347ca8-c000.snappy.parquet,part-00000-07b80a7c-21ee-4e79-9b50-e1ffc0347ca8-c000.snappy.parquet,1198,1750684173000
dbfs:/mnt/gold/customer_vehicle_count/part-00000-14976840-006b-4dc0-a467-0b95e2e64d9e-c000.snappy.parquet,part-00000-14976840-006b-4dc0-a467-0b95e2e64d9e-c000.snappy.parquet,913,1750684858000
dbfs:/mnt/gold/customer_vehicle_count/part-00000-185898f4-b079-47b1-87c3-9e017f7e1425-c000.snappy.parquet,part-00000-185898f4-b079-47b1-87c3-9e017f7e1425-c000.snappy.parquet,1198,1750675366000
dbfs:/mnt/gold/customer_vehicle_count/part-00000-31ba8584-22ed-44bc-8a75-1d90d464081e-c000.snappy.parquet,part-00000-31ba8584-22ed-44bc-8a75-1d90d464081e-c000.snappy.parquet,922,1750668399000
dbfs:/mnt/gold/customer_vehicle_count/part-00000-3f559176-910a-47c3-81c1-ddd3b426d5c1-c000.snappy.parquet,part-00000-3f559176-910a-47c3-81c1-ddd3b426d5c1-c000.snappy.parquet,1198,1750685597000
dbfs:/mnt/gold/customer_vehicle_count/part-00000-9ada0545-eeb4-4d76-b7ef-f2d91216335f-c000.snappy.parquet,part-00000-9ada0545-eeb4-4d76-b7ef-f2d91216335f-c000.snappy.parquet,1443,1750844239000
dbfs:/mnt/gold/customer_vehicle_count/part-00000-9edfac4f-10f5-44fd-803a-f4737a735aef-c000.snappy.parquet,part-00000-9edfac4f-10f5-44fd-803a-f4737a735aef-c000.snappy.parquet,913,1750684419000
dbfs:/mnt/gold/customer_vehicle_count/part-00000-a3418094-9d67-4f5b-a9c1-20a924abe678-c000.snappy.parquet,part-00000-a3418094-9d67-4f5b-a9c1-20a924abe678-c000.snappy.parquet,1198,1750675418000


In [0]:
customer_360_df.filter("customer_id is not null").show()


+-----------+-----------+----------------+----------+------+----------+------+-----+----+
|customer_id|       name|           email|     phone|  city|vehicle_id|  make|model|year|
+-----------+-----------+----------------+----------+------+----------+------+-----+----+
|        103|Bob Johnson| bob@example.com|3456789012|Nagpur|         3|  Ford|Focus|2019|
|        101|   John Doe|john@example.com|1234567890|Mumbai|         1|Toyota|Camry|2020|
|        102| Jane Smith|jane@example.com|2345678901|  Pune|         2| Honda|Civic|2021|
+-----------+-----------+----------------+----------+------+----------+------+-----+----+



### ##  **Goal:
### ## Get only the latest vehicle (by year) for each customer from the customer_360_df.**

In [0]:
from pyspark.sql.window import Window
from pyspark.sql.functions import row_number, col


In [0]:
window_spec=Window.partitionBy("customer_id").orderBy(col("year").desc())

In [0]:
latest_vehicle=customer_360_df.withColumn("row_num",row_number().over(window_spec))

# note-.over() expects a WindowSpec object, not a string so dont write window_spec in single/double **quote**

In [0]:
type(window_spec)

Out[33]: pyspark.sql.window.WindowSpec

In [0]:
latest_vehicle.show()

+-----------+-----------+----------------+----------+------+----------+------+-----+----+-------+
|customer_id|       name|           email|     phone|  city|vehicle_id|  make|model|year|row_num|
+-----------+-----------+----------------+----------+------+----------+------+-----+----+-------+
|        101|   John Doe|john@example.com|1234567890|Mumbai|         1|Toyota|Camry|2020|      1|
|        102| Jane Smith|jane@example.com|2345678901|  Pune|         2| Honda|Civic|2021|      1|
|        103|Bob Johnson| bob@example.com|3456789012|Nagpur|         3|  Ford|Focus|2019|      1|
+-----------+-----------+----------------+----------+------+----------+------+-----+----+-------+



In [0]:
customer_latest_vehicle_df=latest_vehicle.filter("row_num==1").drop("row_num")

 Why Do We Use .drop("row_num")?
After we filter only the latest row (i.e., row_num == 1),
the row_num column is no longer needed.

So we clean it up like this:

python
Copy
Edit
customer_latest_vehicle_df = latest_vehicle.filter("row_num == 1").drop("row_num")
🔹 It's good practice to drop temporary/technical columns
🔹 Keeps your output clean for downstream use
🔹 Avoids confusion if you write to a Gold Delta table

 Step 5: Show the final DataFrame

In [0]:
customer_latest_vehicle_df.show()


+-----------+-----------+----------------+----------+------+----------+------+-----+----+
|customer_id|       name|           email|     phone|  city|vehicle_id|  make|model|year|
+-----------+-----------+----------------+----------+------+----------+------+-----+----+
|        101|   John Doe|john@example.com|1234567890|Mumbai|         1|Toyota|Camry|2020|
|        102| Jane Smith|jane@example.com|2345678901|  Pune|         2| Honda|Civic|2021|
|        103|Bob Johnson| bob@example.com|3456789012|Nagpur|         3|  Ford|Focus|2019|
+-----------+-----------+----------------+----------+------+----------+------+-----+----+



write to gold layer as delta table

In [0]:
customer_latest_vehicle_df.write.format("delta") \
                                .mode("overwrite") \
                                .save("/mnt/gold/latest_vehicle_per_customer")


verify the delta table


In [0]:
spark.read.format('delta')\
  .load("/mnt/gold/latest_vehicle_per_customer").show()

+-----------+-----------+----------------+----------+------+----------+------+-----+----+
|customer_id|       name|           email|     phone|  city|vehicle_id|  make|model|year|
+-----------+-----------+----------------+----------+------+----------+------+-----+----+
|        101|   John Doe|john@example.com|1234567890|Mumbai|         1|Toyota|Camry|2020|
|        102| Jane Smith|jane@example.com|2345678901|  Pune|         2| Honda|Civic|2021|
|        103|Bob Johnson| bob@example.com|3456789012|Nagpur|         3|  Ford|Focus|2019|
+-----------+-----------+----------------+----------+------+----------+------+-----+----+



Gold Table 4: Vehicle Count Per Brand
Objective:
Count how many vehicles each brand (make) has across all customers.

In [0]:
vehicle_per_brand=customer_360_df.groupBy('make').agg(count("vehicle_id").alias('vehicle_count'))\
  .orderBy('vehicle_count', ascending=False)
vehicle_per_brand.show()

+------+-------------+
|  make|vehicle_count|
+------+-------------+
|  Ford|            1|
|Toyota|            1|
| Honda|            1|
+------+-------------+



why order by required?

 Write to Gold Delta Table

In [0]:
vehicle_per_brand.write.format('delta')\
                       .mode('overwrite')\
                       .save('/mnt/gold/vehicle_count_per_make')

In [0]:
spark.read.format('delta').load('/mnt/gold/vehicle_count_per_make').show()

+------+-------------+
|  make|vehicle_count|
+------+-------------+
|  Ford|            1|
|Toyota|            1|
| Honda|            1|
+------+-------------+



GOLD TBALE 6: CREATION OF CLEAN CONTACT_LIST

In [0]:
clean_contact_list=customer_df_clean.select(
    "customer_id", "name", "email", "phone", "city"
)
