##### Access Azure data lake and read the data using
1. Databricks secretScope
2. Azure key-vault

In [0]:
dbutils.secrets.help()

In [0]:
dbutils.secrets.listScopes()

Out[2]: [SecretScope(name='kijiji-data-scope')]

In [0]:
dbutils.secrets.list(scope='kijiji-data-scope')

Out[3]: [SecretMetadata(key='account-key')]

In [0]:
account_key = dbutils.secrets.get(scope='kijiji-data-scope',key='account-key')

In [0]:
spark.conf.set(
    "fs.azure.account.key.kijijidata.dfs.core.windows.net",
    account_key
)

##### check the access to ADLS

In [0]:
display(dbutils.fs.ls("abfs://raw@kijijidata.dfs.core.windows.net"))

path,name,size,modificationTime
abfs://raw@kijijidata.dfs.core.windows.net/house/,house/,0,1680861678000


In [0]:
display(dbutils.fs.ls("abfs://raw@kijijidata.dfs.core.windows.net/house/"))

path,name,size,modificationTime
abfs://raw@kijijidata.dfs.core.windows.net/house/2023-04-01/,2023-04-01/,0,1681308161000
abfs://raw@kijijidata.dfs.core.windows.net/house/2023-04-02/,2023-04-02/,0,1681308177000
abfs://raw@kijijidata.dfs.core.windows.net/house/2023-04-03/,2023-04-03/,0,1681308181000
abfs://raw@kijijidata.dfs.core.windows.net/house/2023-04-04/,2023-04-04/,0,1681308195000
abfs://raw@kijijidata.dfs.core.windows.net/house/2023-04-05/,2023-04-05/,0,1681308220000
abfs://raw@kijijidata.dfs.core.windows.net/house/2023-04-06/,2023-04-06/,0,1681308235000
abfs://raw@kijijidata.dfs.core.windows.net/house/2023-04-07/,2023-04-07/,0,1681308250000


In [0]:
display(dbutils.fs.ls("abfs://raw@kijijidata.dfs.core.windows.net/house/2023-04-06/"))

path,name,size,modificationTime
abfs://raw@kijijidata.dfs.core.windows.net/house/2023-04-06/kijiji_gta_1_2023-04-06.csv,kijiji_gta_1_2023-04-06.csv,189767,1681309550000
abfs://raw@kijijidata.dfs.core.windows.net/house/2023-04-06/kijiji_gta_21_2023-04-06.csv,kijiji_gta_21_2023-04-06.csv,79542,1681309550000
abfs://raw@kijijidata.dfs.core.windows.net/house/2023-04-06/kijiji_gta_41_2023-04-06.csv,kijiji_gta_41_2023-04-06.csv,60,1681309550000
abfs://raw@kijijidata.dfs.core.windows.net/house/2023-04-06/kijiji_gta_61_2023-04-06.csv,kijiji_gta_61_2023-04-06.csv,60,1681309550000
abfs://raw@kijijidata.dfs.core.windows.net/house/2023-04-06/kijiji_gta_81_2023-04-06.csv,kijiji_gta_81_2023-04-06.csv,60,1681309550000


In [0]:
display(spark.read.csv("abfs://raw@kijijidata.dfs.core.windows.net/house/2023-04-06/kijiji_gta_21_2023-04-06.csv").limit(5))

_c0,_c1,_c2,_c3,_c4,_c5,_c6,_c7
id,title,location,rent,bed_rooms,date_posted,url,scraped_on
1655524549,"Spacious 4 bed, 4 bath, 3300sf detached house in Oshawa for rent",Oshawa,Please Contact,4 + Den,Yesterday,https://www.kijiji.ca/v-apartments-condos/oshawa-durham-region/spacious-4-bed-4-bath-3300sf-detached-house-in-oshawa-for-rent/1655524549,2023-04-06
1655493604,****BUY HOUSE WITH 0% DOWN****,Brampton,"$2,000.00",3,04/04/2023,https://www.kijiji.ca/v-apartments-condos/mississauga-peel-region/buy-house-with-0-down/1655493604,2023-04-06
1654499999,Gorgeous House 4 Beds + 4 Baths in Mississauga Churchill Meadows,Mississauga,"$3,700.00",4,< 17 hours ago,https://www.kijiji.ca/v-apartments-condos/mississauga-peel-region/gorgeous-house-4-beds-4-baths-in-mississauga-churchill-meadows/1654499999,2023-04-06
1655040573,Bright and clean 3+1 Br semi in Bronte Oakville & steps to lake,Oakville,"$3,700.00",3,31/03/2023,https://www.kijiji.ca/v-apartments-condos/oakville-halton-region/bright-and-clean-3-1-br-semi-in-bronte-oakville-steps-to-lake/1655040573,2023-04-06


###### Perform necessary transformations and save the result as parquet to ADLS
1. read apt listsing and house rental listings from adls
2. Specify Schema
3. drop null column
4. make title column smaller case
5. clean location - group same text to one, eq :- City of Toronto, Toronto --> toronto
6. convert rent column to integer and replace text please contact with 0
7. clean bed_rooms columns and cast to int
7. recalculate the date_posted column. eq :-  21 hours ago should be converted to proper date format
8. create two additional columns. eq :- year and month column

In [0]:
from pyspark.sql.types import StructField, StructType, StringType, IntegerType, DateType
from pyspark.sql.functions import lower, udf, col, when, regexp_replace, trim, to_date, date_add
from dateutil.relativedelta import relativedelta

In [0]:
# Set the timeParserPolicy to LEGACY
spark.conf.set("spark.sql.legacy.timeParserPolicy", "LEGACY")

In [0]:
rental_schema = StructType(fields=[
                                StructField("id", IntegerType(), True),
                                StructField("title", StringType(), True),
                                StructField("location", StringType(), True),
                                StructField("rent", StringType(), True),
                                StructField("bed_rooms", StringType(), True),
                                StructField("date_posted", StringType(), True),
                                StructField("url", StringType(), True),
                                StructField("scraped_on", DateType(), True)
                            ])

In [0]:
house_df = spark.read\
              .option("header", True)\
              .schema(rental_schema)\
              .csv("abfs://raw@kijijidata.dfs.core.windows.net/house/**/*.csv")\
              .drop("null")

In [0]:
house_df.count()

Out[45]: 8322

In [0]:
display(house_df.limit(5))

id,title,location,rent,bed_rooms,date_posted,url,scraped_on
1649068951,Jr. 1 Bedroom Apartment – Spacious & Newly Renovated – Call now!,City of Toronto,"$2,319.00",1,2023-04-01 00:00:00.000,https://www.kijiji.ca/v-apartments-condos/city-of-toronto/jr-1-bedroom-apartment-spacious-newly-renovated-call-now/1649068951,2023-04-03
1655269031,"5 BDRM HOUSE, availaible immediately, Christie and St. Clair",Toronto,"$4,500.00",5+,2023-04-02 00:00:00.000,https://www.kijiji.ca/v-apartments-condos/city-of-toronto/5-bdrm-house-availaible-immediately-christie-and-st-clair/1655269031,2023-04-03
1651533067,1 Bedroom Large Renovated Apartment For Rent in Toronto - 90 Eas,Toronto,"$2,195.00",1,2023-03-26 00:00:00.000,https://www.kijiji.ca/v-apartments-condos/city-of-toronto/1-bedroom-large-renovated-apartment-for-rent-in-toronto-90-eas/1651533067,2023-04-03
1652242175,1 Bedroom - 212 Davis Drive - Steps to Rapidway!,Markham / York Region,"$2,550.00",1,2023-04-03 21:03:02.776,https://www.kijiji.ca/v-apartments-condos/markham-york-region/1-bedroom-212-davis-drive-steps-to-rapidway/1652242175,2023-04-03
1652711864,Gorgeous 4 Bdrm 4 Baths Detached House for Lease in Bradford,Bradford,"$3,500.00",4,2023-03-31 00:00:00.000,https://www.kijiji.ca/v-apartments-condos/markham-york-region/gorgeous-4-bdrm-4-baths-detached-house-for-lease-in-bradford/1652711864,2023-04-03


In [0]:
house_df = house_df.dropna(subset=["id"])

In [0]:
house_df.count()

Out[48]: 8312

In [0]:
house_df = house_df.withColumn("title", lower(house_df["title"]))\
               .withColumn("date_posted", lower(house_df["date_posted"]))\
               .withColumn("location", when(col("location").isNull(), "Not Available").otherwise(lower(col("location"))))\
               .withColumn("rent", when(col("rent") == "Please Contact", 0).otherwise(col("rent")))\
               .withColumn("rent", trim(regexp_replace(col("rent"), "[$,]", "")).cast("int"))\
               .withColumn("bed_rooms", when(col("bed_rooms").isNull(), 0).otherwise(lower(col("bed_rooms"))))

In [0]:
unique_locations = house_df.select("location").distinct().collect()

In [0]:
for location in unique_locations:
    print(location["location"])

kitchener
town of caledon
caledon east
concord
acton
oshawa / durham region
orangeville
markham / york region
thorold
queensville
roseneath
bradford
ashburn
caledon
toronto
burlington
sutton
schomberg
erin
milton
cobourg
blackstock
nobleton
city of toronto
brampton
innisfil
mississauga
churchville
east york
kettleby
vaughan
stouffville
mineola
orillia
etobicoke
whitchurch-stouffville
eden mills
erin mills
newmarket
oshawa
east credit
georgetown
ajax
guelph
bolton
new tecumseth
markham
hamilton
north york
whitby
mississauga / peel region
woodbridge
tottenham
bowmanville
deerfield
old malton village
mount albert
richmond hill
halton hills
township of melancthon
east gwillimbury
bronte
thornhill
newcastle
oakville / halton region
georgina
meadowvale village
king city
courtice
uxbridge
bramalea
rockwood
valley creek
scarborough
regional municipality of york
york
laurel
old toronto
Not Available
hampton
oakville
campbellville
pickering
aurora
windfields
newtonville
bradford west gwillimbury

In [0]:
house_df[house_df["location"]=="city of toronto"].show(5)

+----------+--------------------+---------------+----+---------+--------------------+--------------------+----------+
|        id|               title|       location|rent|bed_rooms|         date_posted|                 url|scraped_on|
+----------+--------------------+---------------+----+---------+--------------------+--------------------+----------+
|1649068951|jr. 1 bedroom apa...|city of toronto|2319|        1|2023-04-01 00:00:...|https://www.kijij...|2023-04-03|
|1649856271|      1 bdrm and den|city of toronto|2615|  1 + den|2023-04-03 00:00:...|https://www.kijij...|2023-04-03|
|1640353318|1 bdrm and den- k...|city of toronto|2850|  1 + den|2023-04-03 00:00:...|https://www.kijij...|2023-04-03|
|1649220565|1 bdrm + den +din...|city of toronto|3220|        1|2023-04-03 00:00:...|https://www.kijij...|2023-04-03|
|1655388109|beautiful 3 bedro...|city of toronto|3400|        3|2023-04-03 00:00:...|https://www.kijij...|2023-04-03|
+----------+--------------------+---------------+----+--

In [0]:
@udf(returnType=StringType())
def clean_location(location):
    # split the location text by / and return first element
    # eq : mississauga / peel region - > mississauga
    location_dict = {"city of toronto":"toronto"}
    if str(location) == "None":
        return "N/A"
    location = location.split("/")[0]
    if location in location_dict:
        return location_dict[location]
    return location.strip()

In [0]:
house_df = house_df.withColumn("location", clean_location(house_df["location"]))

In [0]:
unique_beds = house_df.select("bed_rooms").distinct().collect()
for beds in unique_beds:
    print(beds["bed_rooms"])

2 + den
1 + den
3
4 + den
0
3 + den
5+
bachelor/studio
1
4
2


In [0]:
@udf(returnType=IntegerType())
def clean_beds(beds):
    if 'bachelor' in beds:
        return 0
    if 'studio' in beds:
        return 0
    
    beds = beds.split("+")[0]
    return int(beds)

In [0]:
house_df = house_df.withColumn("bed_rooms", clean_beds(house_df["bed_rooms"]))

In [0]:
unique_beds = house_df.select("bed_rooms").distinct().collect()
for beds in unique_beds:
    print(beds["bed_rooms"])

1
3
5
4
2
0


In [0]:
from pyspark.sql.functions import year, month, to_date, datediff
from dateutil.relativedelta import relativedelta
from datetime import datetime, date, timedelta

In [0]:
unique_dates = house_df.select("date_posted").distinct().collect()
for dates in unique_dates:
    print(dates["date_posted"])

< 48 minutes ago
2023-03-24 00:00:00.000
2023-03-30 00:00:00.000
08/03/2023
2023-03-28 00:00:00.000
< 9 hours ago
2023-04-01 17:04:56.304
2023-04-01 17:11:56.306
2023-04-01 17:00:56.305
2023-04-01 17:21:56.304
2023-04-01 17:04:56.306
2023-04-03 21:22:02.777
2023-03-04 00:00:00.000
2023-03-02 00:00:00.000
2023-03-20 00:00:00.000
2023-04-01 17:12:56.306
31/03/2023
< 6 minutes ago
2023-02-22 00:00:00.000
< 7 hours ago
29/03/2023
2023-04-01 17:03:01.994
2023-04-01 17:04:56.305
2023-04-03 21:02:02.777
< 41 minutes ago
< 40 minutes ago
< 58 minutes ago
2023-04-01 17:09:56.304
2023-04-01 17:03:56.306
2023-04-03 21:23:02.779
13/03/2023
2023-04-03 21:18:02.778
2023-02-27 00:00:00.000
12/03/2023
2023-04-03 21:21:02.777
< 31 minutes ago
< 35 minutes ago
None
2023-03-17 00:00:00.000
2023-03-31 00:00:00.000
2023-04-03 00:00:00.000
< 52 minutes ago
2023-04-01 17:19:56.305
2023-04-01 17:08:01.994
2023-03-26 00:00:00.000
2023-04-01 17:06:56.305
2023-04-01 17:14:56.306
< 6 hours ago
< 8 minutes ago
< 2

In [0]:
from dateutil.parser import parse
@udf(returnType=DateType())
def parse_date_string(date_string, date_=None):
    
    date_string = str(date_string)
    if date_string == "None":
        return date_

    if "/" in date_string:
        date_obj = datetime.strptime(date_string, '%d/%m/%Y')
        date_string = date_obj.strftime('%Y-%m-%d')
    elif "-" in date_string:
        try:
            date_obj = datetime.strptime(date_string, '%Y-%m-%d %H:%M:%S.%f')
            date_string = date_obj.strftime('%Y-%m-%d')
        except:
            date_obj = datetime.strptime(date_string, '%Y-%m-%d')
            date_string = date_obj.strftime('%Y-%m-%d')
    
    if date_ is None:
        now = datetime.now().date()
    else:
        now = datetime.strptime(str(date_), '%Y-%m-%d')
    
    if date_string == 'yesterday':
        return (now - relativedelta(days=1)).date()
    
    elif 'minute' in date_string:
        return now.date()
    
    elif date_string.startswith('<'):
        # Convert relative time to absolute time
        minutes_ago = int(date_string.split()[1])
        return datetime.now() - timedelta(minutes=minutes_ago)
    
    else:
        return datetime.strptime(date_string, '%Y-%m-%d').date()

In [0]:
house_df = house_df.withColumn("date_posted_calculated", parse_date_string(house_df["date_posted"], house_df["scraped_on"]))

In [0]:
house_df = house_df.withColumn("year", year(house_df["date_posted_calculated"]))\
               .withColumn("month", month(house_df["date_posted_calculated"]))

In [0]:
house_df.count()

Out[66]: 8312

In [0]:
house_df = house_df.withColumnRenamed('id', 'listing_id')\
       .withColumnRenamed('bed_rooms', 'bedrooms')\
       .withColumnRenamed('date_posted', 'post_date')\
       .withColumnRenamed('scraped_on', 'scrape_date')\
       .withColumnRenamed("date_posted_calculated", "calculated_date")


In [0]:
display(house_df.limit(5))

listing_id,title,location,rent,bedrooms,post_date,url,scrape_date,calculated_date,year,month
1649068951,jr. 1 bedroom apartment – spacious & newly renovated – call now!,toronto,2319,1,2023-04-01 00:00:00.000,https://www.kijiji.ca/v-apartments-condos/city-of-toronto/jr-1-bedroom-apartment-spacious-newly-renovated-call-now/1649068951,2023-04-03,2023-04-01,2023,4
1655269031,"5 bdrm house, availaible immediately, christie and st. clair",toronto,4500,5,2023-04-02 00:00:00.000,https://www.kijiji.ca/v-apartments-condos/city-of-toronto/5-bdrm-house-availaible-immediately-christie-and-st-clair/1655269031,2023-04-03,2023-04-02,2023,4
1651533067,1 bedroom large renovated apartment for rent in toronto - 90 eas,toronto,2195,1,2023-03-26 00:00:00.000,https://www.kijiji.ca/v-apartments-condos/city-of-toronto/1-bedroom-large-renovated-apartment-for-rent-in-toronto-90-eas/1651533067,2023-04-03,2023-03-26,2023,3
1652242175,1 bedroom - 212 davis drive - steps to rapidway!,markham,2550,1,2023-04-03 21:03:02.776,https://www.kijiji.ca/v-apartments-condos/markham-york-region/1-bedroom-212-davis-drive-steps-to-rapidway/1652242175,2023-04-03,2023-04-03,2023,4
1652711864,gorgeous 4 bdrm 4 baths detached house for lease in bradford,bradford,3500,4,2023-03-31 00:00:00.000,https://www.kijiji.ca/v-apartments-condos/markham-york-region/gorgeous-4-bdrm-4-baths-detached-house-for-lease-in-bradford/1652711864,2023-04-03,2023-03-31,2023,3


###### write the processed data back to ADLS

In [0]:
house_df.write\
      .format('parquet')\
      .mode('append')\
      .partitionBy("location")\
      .save('abfs://processed@kijijidata.dfs.core.windows.net/house/')


##### move processed data from raw to raw-processed

In [0]:
dbutils.fs.mv("abfs://raw@kijijidata.dfs.core.windows.net/house/", "abfs://raw-processed@kijijidata.dfs.core.windows.net/house/", recurse=True)

Out[70]: True