In [0]:
# read csv from
salesDf = spark.read.option('header','true').csv('/FileStore/tables/ResaleflatpricesSg.csv',inferSchema=True)
salesDf.show()

+----------+----------+---------+-----+-----------------+------------+--------------+--------------+-------------------+------------------+------------+
|     month|      town|flat_type|block|      street_name|storey_range|floor_area_sqm|    flat_model|lease_commence_date|   remaining_lease|resale_price|
+----------+----------+---------+-----+-----------------+------------+--------------+--------------+-------------------+------------------+------------+
|2017-01-01|ANG MO KIO|   2 ROOM|  406|ANG MO KIO AVE 10|    10 TO 12|          44.0|      Improved|               1979|61 years 04 months|    232000.0|
|2017-01-01|ANG MO KIO|   3 ROOM|  108| ANG MO KIO AVE 4|    01 TO 03|          67.0|New Generation|               1978|60 years 07 months|    250000.0|
|2017-01-01|ANG MO KIO|   3 ROOM|  602| ANG MO KIO AVE 5|    01 TO 03|          67.0|New Generation|               1980|62 years 05 months|    262000.0|
|2017-01-01|ANG MO KIO|   3 ROOM|  465|ANG MO KIO AVE 10|    04 TO 06|          68

In [0]:
salesDf.printSchema()

root
 |-- month: date (nullable = true)
 |-- town: string (nullable = true)
 |-- flat_type: string (nullable = true)
 |-- block: string (nullable = true)
 |-- street_name: string (nullable = true)
 |-- storey_range: string (nullable = true)
 |-- floor_area_sqm: double (nullable = true)
 |-- flat_model: string (nullable = true)
 |-- lease_commence_date: integer (nullable = true)
 |-- remaining_lease: string (nullable = true)
 |-- resale_price: double (nullable = true)



In [0]:
# get total records 
salesDf.count()

Out[50]: 177458

In [0]:
#alternative way displaying data using pandas
salesDf.limit(5).toPandas()

Unnamed: 0,month,town,flat_type,block,street_name,storey_range,floor_area_sqm,flat_model,lease_commence_date,remaining_lease,resale_price
0,2017-01-01,ANG MO KIO,2 ROOM,406,ANG MO KIO AVE 10,10 TO 12,44.0,Improved,1979,61 years 04 months,232000.0
1,2017-01-01,ANG MO KIO,3 ROOM,108,ANG MO KIO AVE 4,01 TO 03,67.0,New Generation,1978,60 years 07 months,250000.0
2,2017-01-01,ANG MO KIO,3 ROOM,602,ANG MO KIO AVE 5,01 TO 03,67.0,New Generation,1980,62 years 05 months,262000.0
3,2017-01-01,ANG MO KIO,3 ROOM,465,ANG MO KIO AVE 10,04 TO 06,68.0,New Generation,1980,62 years 01 month,265000.0
4,2017-01-01,ANG MO KIO,3 ROOM,601,ANG MO KIO AVE 5,01 TO 03,67.0,New Generation,1980,62 years 05 months,265000.0


In [0]:
#inspect for empty value counts for each cols
from pyspark.sql.functions import isnan, when, count, col
dfNull = salesDf.select([count( when (col(x).isNull(),x ) ).alias(x) for x in salesDf.columns])
dfNull.show()

+-----+----+---------+-----+-----------+------------+--------------+----------+-------------------+---------------+------------+
|month|town|flat_type|block|street_name|storey_range|floor_area_sqm|flat_model|lease_commence_date|remaining_lease|resale_price|
+-----+----+---------+-----+-----------+------------+--------------+----------+-------------------+---------------+------------+
|    0|   0|        0|    0|          0|           0|             0|         0|                  0|              0|           0|
+-----+----+---------+-----+-----------+------------+--------------+----------+-------------------+---------------+------------+



In [0]:
#inspect nan for number cols
colLis = ['floor_area_sqm','lease_commence_date','resale_price']

dfNan = salesDf.select([count( when (col(x).isNull(),x ) ).alias(x) for x in colLis])
dfNan.show()

+--------------+-------------------+------------+
|floor_area_sqm|lease_commence_date|resale_price|
+--------------+-------------------+------------+
|             0|                  0|           0|
+--------------+-------------------+------------+



In [0]:
salesDf.select('remaining_lease').display()

remaining_lease
61 years 04 months
60 years 07 months
62 years 05 months
62 years 01 month
62 years 05 months
63 years
61 years 06 months
58 years 04 months
61 years 06 months
61 years 04 months


In [0]:
from pyspark.sql.functions import udf

#handle whern there is months and month
#thee will be years tho
def getTotalMonths(s1):

    yrs=0
    mth=0
    lis:list = s1.split()

    if 'month' in s1  or 'months' in s1:
        mth = int(lis[-2])
    
    yrs = int(lis[0])

    #get total months

    return yrs*12 + mth





In [0]:
#convert method to udf , to be use
from pyspark.sql.types import IntegerType

getTotalMonths_udf = udf(getTotalMonths,IntegerType())

In [0]:
#test the columns udf function on the remaining lease

leaseDf = salesDf.select('remaining_lease',getTotalMonths_udf('remaining_lease').alias('remaining_Lease_months'))

leaseDf.show(1000)


+------------------+----------------------+
|   remaining_lease|remaining_Lease_months|
+------------------+----------------------+
|61 years 04 months|                   736|
|60 years 07 months|                   727|
|62 years 05 months|                   749|
| 62 years 01 month|                   745|
|62 years 05 months|                   749|
|          63 years|                   756|
|61 years 06 months|                   738|
|58 years 04 months|                   700|
|61 years 06 months|                   738|
|61 years 04 months|                   736|
| 62 years 01 month|                   745|
|59 years 08 months|                   716|
|59 years 08 months|                   716|
|59 years 06 months|                   714|
| 62 years 01 month|                   745|
|          60 years|                   720|
|61 years 04 months|                   736|
|62 years 08 months|                   752|
|          61 years|                   732|
|62 years 08 months|            

In [0]:
#get the years from the date columm

from pyspark.sql.functions import year
dateDf = salesDf.select('month',year('month').alias('year'))
dateDf.display()

month,year
2017-01-01,2017
2017-01-01,2017
2017-01-01,2017
2017-01-01,2017
2017-01-01,2017
2017-01-01,2017
2017-01-01,2017
2017-01-01,2017
2017-01-01,2017
2017-01-01,2017


In [0]:
#add new col to the dataframe
df = salesDf.withColumn('remaining_Lease_months',getTotalMonths_udf('remaining_lease'))
df = df.withColumn('year',year('month'))

df.show(10)



+----------+----------+---------+-----+-----------------+------------+--------------+--------------+-------------------+------------------+------------+----------------------+----+
|     month|      town|flat_type|block|      street_name|storey_range|floor_area_sqm|    flat_model|lease_commence_date|   remaining_lease|resale_price|remaining_Lease_months|year|
+----------+----------+---------+-----+-----------------+------------+--------------+--------------+-------------------+------------------+------------+----------------------+----+
|2017-01-01|ANG MO KIO|   2 ROOM|  406|ANG MO KIO AVE 10|    10 TO 12|          44.0|      Improved|               1979|61 years 04 months|    232000.0|                   736|2017|
|2017-01-01|ANG MO KIO|   3 ROOM|  108| ANG MO KIO AVE 4|    01 TO 03|          67.0|New Generation|               1978|60 years 07 months|    250000.0|                   727|2017|
|2017-01-01|ANG MO KIO|   3 ROOM|  602| ANG MO KIO AVE 5|    01 TO 03|          67.0|New Genera

In [0]:
df.tail(1)

Out[60]: [Row(month=datetime.date(2024, 4, 1), town='YISHUN', flat_type='EXECUTIVE', block='826', street_name='YISHUN ST 81', storey_range='04 TO 06', floor_area_sqm=146.0, flat_model='Maisonette', lease_commence_date=1988, remaining_lease='62 years 10 months', resale_price=900000.0, remaining_Lease_months=754, year=2024)]

In [0]:
df.printSchema()

root
 |-- month: date (nullable = true)
 |-- town: string (nullable = true)
 |-- flat_type: string (nullable = true)
 |-- block: string (nullable = true)
 |-- street_name: string (nullable = true)
 |-- storey_range: string (nullable = true)
 |-- floor_area_sqm: double (nullable = true)
 |-- flat_model: string (nullable = true)
 |-- lease_commence_date: integer (nullable = true)
 |-- remaining_lease: string (nullable = true)
 |-- resale_price: double (nullable = true)
 |-- remaining_Lease_months: integer (nullable = true)
 |-- year: integer (nullable = true)



In [0]:
# highest resale-price
df.orderBy('resale_price',ascending=[False]).limit(10).display() 

month,town,flat_type,block,street_name,storey_range,floor_area_sqm,flat_model,lease_commence_date,remaining_lease,resale_price,remaining_Lease_months,year
2024-01-01,TOA PAYOH,5 ROOM,139A,LOR 1A TOA PAYOH,40 TO 42,117.0,DBSS,2012,87 years 04 months,1568888.0,1048,2024
2024-01-01,TOA PAYOH,5 ROOM,138C,LOR 1A TOA PAYOH,31 TO 33,117.0,DBSS,2012,87 years 03 months,1540000.0,1047,2024
2023-06-01,BUKIT MERAH,4 ROOM,50,MOH GUAN TER,04 TO 06,176.0,Adjoined flat,1973,48 years 08 months,1500000.0,584,2023
2023-08-01,CENTRAL AREA,5 ROOM,1D,CANTONMENT RD,28 TO 30,107.0,Type S2,2011,86 years 05 months,1480000.0,1037,2023
2024-02-01,BISHAN,EXECUTIVE,286,BISHAN ST 24,22 TO 24,172.0,Maisonette,1992,67 years 06 months,1480000.0,810,2024
2023-08-01,TOA PAYOH,5 ROOM,139B,LOR 1A TOA PAYOH,40 TO 42,114.0,DBSS,2012,87 years 09 months,1460000.0,1053,2023
2023-11-01,BUKIT MERAH,5 ROOM,95C,HENDERSON RD,31 TO 33,113.0,Improved,2019,94 years 06 months,1460000.0,1134,2023
2024-01-01,BISHAN,5 ROOM,273B,BISHAN ST 24,40 TO 42,120.0,DBSS,2011,86 years 08 months,1458000.0,1040,2024
2024-03-01,BUKIT MERAH,5 ROOM,9A,BOON TIONG RD,19 TO 21,112.0,Improved,2016,90 years 11 months,1450000.0,1091,2024
2024-01-01,CENTRAL AREA,5 ROOM,1A,CANTONMENT RD,46 TO 48,105.0,Type S2,2011,86 years 01 month,1450000.0,1033,2024


In [0]:
df.orderBy('resale_price',ascending=[False]).limit(1).display() 

month,town,flat_type,block,street_name,storey_range,floor_area_sqm,flat_model,lease_commence_date,remaining_lease,resale_price,remaining_Lease_months,year
2024-01-01,TOA PAYOH,5 ROOM,139A,LOR 1A TOA PAYOH,40 TO 42,117.0,DBSS,2012,87 years 04 months,1568888.0,1048,2024


Databricks visualization. Run in Databricks to view.

In [0]:
#using temp view sql query to do the above task

df.createOrReplaceTempView("tbl")

res0=spark.sql("select * from tbl order by resale_price desc limit 10")
# spark.catalog.dropTempView("tbl")
res0.show()





+----------+------------+---------+-----+----------------+------------+--------------+-------------+-------------------+------------------+------------+----------------------+----+
|     month|        town|flat_type|block|     street_name|storey_range|floor_area_sqm|   flat_model|lease_commence_date|   remaining_lease|resale_price|remaining_Lease_months|year|
+----------+------------+---------+-----+----------------+------------+--------------+-------------+-------------------+------------------+------------+----------------------+----+
|2024-01-01|   TOA PAYOH|   5 ROOM| 139A|LOR 1A TOA PAYOH|    40 TO 42|         117.0|         DBSS|               2012|87 years 04 months|   1568888.0|                  1048|2024|
|2024-01-01|   TOA PAYOH|   5 ROOM| 138C|LOR 1A TOA PAYOH|    31 TO 33|         117.0|         DBSS|               2012|87 years 03 months|   1540000.0|                  1047|2024|
|2023-06-01| BUKIT MERAH|   4 ROOM|   50|    MOH GUAN TER|    04 TO 06|         176.0|Adjoined 

In [0]:
#get the average reasale for each town area

res1=spark.sql("select town,avg(resale_price) as avg_resale_price from tbl group by town order by avg_resale_price desc")
res1.display()


town,avg_resale_price
BUKIT TIMAH,743203.5533035714
BISHAN,679638.1756139809
CENTRAL AREA,660803.0296327096
QUEENSTOWN,612929.623015485
BUKIT MERAH,609076.5202581024
KALLANG/WHAMPOA,557140.5632925222
PASIR RIS,553177.4905517763
MARINE PARADE,542026.4446477585
SERANGOON,534352.4571410968
CLEMENTI,529135.32091977


Databricks visualization. Run in Databricks to view.

In [0]:
res1b=spark.sql(" select flat_type, avg(resale_price) as flat_type_avgPrice from tbl group by flat_type")
res1b.display()

flat_type,flat_type_avgPrice
3 ROOM,348748.3643997615
1 ROOM,200017.51515151517
4 ROOM,495397.8870216689
2 ROOM,274679.54196028184
EXECUTIVE,695137.6746673228
5 ROOM,591274.1322836071
MULTI-GENERATION,837319.7435897436


Databricks visualization. Run in Databricks to view.

In [0]:
res2=spark.sql("select town,max(resale_price) as max_resale_price from tbl group by town order by max_resale_price desc")
res2.display()

town,max_resale_price
TOA PAYOH,1568888.0
BUKIT MERAH,1500000.0
BISHAN,1480000.0
CENTRAL AREA,1480000.0
QUEENSTOWN,1418000.0
BUKIT TIMAH,1360000.0
KALLANG/WHAMPOA,1300888.0
CLEMENTI,1280000.0
ANG MO KIO,1268000.0
PASIR RIS,1238000.0


In [0]:
#find the avg resale price over the years

res3=spark.sql("select year,avg(resale_price) as avg_resale_price from tbl group by year order by avg_resale_price desc")
res3.display()



year,avg_resale_price
2024,591146.7999710843
2023,571801.2161105503
2022,549714.3307151947
2021,511381.2390012721
2020,452279.38497064245
2017,443888.5205714565
2018,441282.06370344607
2019,432137.9129018299


Databricks visualization. Run in Databricks to view.

In [0]:
#get total transaction for each area

res4=spark.sql("select town,count(*) as total_transaction from tbl group by town order by total_transaction desc")
res4.display()







town,total_transaction
SENGKANG,14745
PUNGGOL,13196
WOODLANDS,12579
YISHUN,12181
TAMPINES,11850
JURONG WEST,11734
BEDOK,9412
HOUGANG,8931
CHOA CHU KANG,8094
ANG MO KIO,7315


Databricks visualization. Run in Databricks to view.

In [0]:
#get total transaction for each type

res5=spark.sql("select town,flat_type,count(*) as transactionCount from tbl group by town,flat_type order by transactionCount desc")
res5.display()


town,flat_type,transactionCount
SENGKANG,4 ROOM,7374
PUNGGOL,4 ROOM,7078
YISHUN,4 ROOM,5917
WOODLANDS,4 ROOM,5673
SENGKANG,5 ROOM,5070
TAMPINES,4 ROOM,4774
JURONG WEST,4 ROOM,4345
PUNGGOL,5 ROOM,4182
HOUGANG,4 ROOM,4037
CHOA CHU KANG,4 ROOM,3925


Databricks visualization. Run in Databricks to view.