In [1]:
import pandas as pd
from datetime import datetime as dt
from functools import reduce
from pyspark.sql import SparkSession
from pyspark.sql.functions import  col, upper,lit ,date_format, input_file_name,current_date
import calendar
import datetime

In [2]:
spark=SparkSession.builder.appName('LANDING TO BRONZE').getOrCreate()

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
23/10/06 15:22:49 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
23/10/06 15:22:50 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.
23/10/06 15:22:50 WARN Utils: Service 'SparkUI' could not bind on port 4041. Attempting port 4042.


In [3]:
landing_path = '../DataLake/Landing/craigslist_vehicles.csv'
bronze_path = '../DataLake/Bronze/'

### Load data from landing into Spark dataframe

In [4]:
raw_data_df = (spark.read.format('csv')
                            .option('header', True)
                            .option("inferSchema", "true")
                            .option("sep", ",")   
                            .load(landing_path)
                            .withColumn("FILE_NAME", input_file_name()))
# Dropping null columns significantly reduceses dataframe size hence commenting it out.
# raw_data_df = raw_data_df.na.drop()
raw_data_df = raw_data_df.drop('_c0')

                                                                                

### Standardize columns and Dates

In [5]:
stdzd_data_df = (raw_data_df.select([col(cols).alias(cols.replace(' ','_').upper()) for cols in raw_data_df.columns])
                 .withColumn("POSTING_DATE",date_format("POSTING_DATE","yyyyMMdd"))
                 .withColumn("REMOVAL_DATE",date_format("REMOVAL_DATE","yyyyMMdd")))

### Add Watermark Columns

In [6]:
stdzd_data_df = (stdzd_data_df.withColumn("YEAR",stdzd_data_df.YEAR.cast('int'))
                 .withColumn('RUN_DATE_ID', date_format(current_date(),"yyyyMMdd").cast('int')))

### Show 5 records as Pandas DataFrame for better visibility

In [7]:
stdzd_data_df.limit(10).toPandas().head()

23/10/06 15:22:58 WARN SparkStringUtils: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.


Unnamed: 0,ID,URL,REGION,REGION_URL,PRICE,YEAR,MANUFACTURER,MODEL,CONDITION,CYLINDERS,...,IMAGE_URL,DESCRIPTION,COUNTY,STATE,LAT,LONG,POSTING_DATE,REMOVAL_DATE,FILE_NAME,RUN_DATE_ID
0,7307679724,https://abilene.craigslist.org/ctd/d/abilene-2...,abilene,https://abilene.craigslist.org,4500,2002,bmw,x5,,,...,https://images.craigslist.org/00m0m_iba78h8ty9...,"$4,500 Cash 2002 BMW X5 8 cylinder 4.4L moto...",,tx,32.401556,-99.884713,20210416,20210502,file:///Users/gaylord/Documents/Athena/DataLak...,20231006
1,7311833696,https://abilene.craigslist.org/ctd/d/abilene-2...,abilene,https://abilene.craigslist.org,4500,2002,bmw,x5,,,...,https://images.craigslist.org/00m0m_iba78h8ty9...,"$4,500 Cash 2002 BMW X5 8 cylinder 4.4L moto...",,tx,32.401556,-99.884713,20210424,20210428,file:///Users/gaylord/Documents/Athena/DataLak...,20231006
2,7311441996,https://abilene.craigslist.org/ctd/d/abilene-2...,abilene,https://abilene.craigslist.org,4900,2006,toyota,camry,excellent,4 cylinders,...,https://images.craigslist.org/00808_5FkOw2aGjA...,2006 TOYOTA CAMRY LE Sedan Ready To Upgrade ...,,tx,32.453848,-99.7879,20210423,20210525,file:///Users/gaylord/Documents/Athena/DataLak...,20231006
3,7307680715,https://abilene.craigslist.org/ctd/d/abilene-2...,abilene,https://abilene.craigslist.org,6500,2008,ford,expedition,,,...,https://images.craigslist.org/00M0M_i9CoFvVq8o...,$6500.00 2008 Ford Expedition 8 cylinder 5.4L...,,tx,32.401556,-99.884713,20210416,20210426,file:///Users/gaylord/Documents/Athena/DataLak...,20231006
4,7311834578,https://abilene.craigslist.org/ctd/d/abilene-2...,abilene,https://abilene.craigslist.org,6500,2008,ford,expedition,,,...,https://images.craigslist.org/00M0M_i9CoFvVq8o...,$6500.00 2008 Ford Expedition 8 cylinder 5.4L...,,tx,32.401556,-99.884713,20210424,20210512,file:///Users/gaylord/Documents/Athena/DataLak...,20231006


In [8]:
# Check Dataframe columns
stdzd_data_df.printSchema()

root
 |-- ID: string (nullable = true)
 |-- URL: string (nullable = true)
 |-- REGION: string (nullable = true)
 |-- REGION_URL: string (nullable = true)
 |-- PRICE: string (nullable = true)
 |-- YEAR: integer (nullable = true)
 |-- MANUFACTURER: string (nullable = true)
 |-- MODEL: string (nullable = true)
 |-- CONDITION: string (nullable = true)
 |-- CYLINDERS: string (nullable = true)
 |-- FUEL: string (nullable = true)
 |-- ODOMETER: string (nullable = true)
 |-- TITLE_STATUS: string (nullable = true)
 |-- TRANSMISSION: string (nullable = true)
 |-- VIN: string (nullable = true)
 |-- DRIVE: string (nullable = true)
 |-- SIZE: string (nullable = true)
 |-- TYPE: string (nullable = true)
 |-- PAINT_COLOR: string (nullable = true)
 |-- IMAGE_URL: string (nullable = true)
 |-- DESCRIPTION: string (nullable = true)
 |-- COUNTY: string (nullable = true)
 |-- STATE: string (nullable = true)
 |-- LAT: string (nullable = true)
 |-- LONG: string (nullable = true)
 |-- POSTING_DATE: string (n

### Generate Dates from the dataframe to be used for partitioning when writing into Bronze layer

In [9]:
unique_dates = stdzd_data_df.select("POSTING_DATE").distinct().rdd.flatMap(lambda x: x).collect()

dates_processed = [] 
for dates_ in unique_dates : 
    dates_processed.append(dates_)

#Create a comma separated list
dates_processed_str = ",".join(str(i) for i in dates_processed)

partition_predicate = f"POSTING_DATE IN ({dates_processed_str}) " 

                                                                                

In [10]:
#this is to force caching process
print(stdzd_data_df.count())

[Stage 6:>                                                        (0 + 12) / 12]

441802


                                                                                

In [11]:
"""WRITTING STANDARDIZED DATA TO BRONZE"""
(stdzd_data_df.write.format("parquet")
 .partitionBy("POSTING_DATE")
 .option("replaceWhere",  f"{partition_predicate}")
 .mode("overwrite").save(bronze_path))

stdzd_data_df.unpersist(blocking=True)

23/10/06 15:23:08 WARN MemoryManager: Total allocation exceeds 95.00% (906,992,014 bytes) of heap memory
Scaling row group sizes to 96.54% for 7 writers
23/10/06 15:23:08 WARN MemoryManager: Total allocation exceeds 95.00% (906,992,014 bytes) of heap memory
Scaling row group sizes to 84.47% for 8 writers
23/10/06 15:23:08 WARN MemoryManager: Total allocation exceeds 95.00% (906,992,014 bytes) of heap memory
Scaling row group sizes to 75.08% for 9 writers
23/10/06 15:23:08 WARN MemoryManager: Total allocation exceeds 95.00% (906,992,014 bytes) of heap memory
Scaling row group sizes to 67.58% for 10 writers
23/10/06 15:23:08 WARN MemoryManager: Total allocation exceeds 95.00% (906,992,014 bytes) of heap memory
Scaling row group sizes to 61.43% for 11 writers
23/10/06 15:23:08 WARN MemoryManager: Total allocation exceeds 95.00% (906,992,014 bytes) of heap memory
Scaling row group sizes to 56.31% for 12 writers
23/10/06 15:23:09 WARN MemoryManager: Total allocation exceeds 95.00% (906,992,

DataFrame[ID: string, URL: string, REGION: string, REGION_URL: string, PRICE: string, YEAR: int, MANUFACTURER: string, MODEL: string, CONDITION: string, CYLINDERS: string, FUEL: string, ODOMETER: string, TITLE_STATUS: string, TRANSMISSION: string, VIN: string, DRIVE: string, SIZE: string, TYPE: string, PAINT_COLOR: string, IMAGE_URL: string, DESCRIPTION: string, COUNTY: string, STATE: string, LAT: string, LONG: string, POSTING_DATE: string, REMOVAL_DATE: string, FILE_NAME: string, RUN_DATE_ID: int]