In [1]:
f = "1903.json"
# use multiline because each json record is split into many lines as can be seen in notepad++
df = spark.read.format("json").option("multiLine", True).option("header" , "true").option("inferSchema", "true").load(f)



In [2]:
df.printSchema()

root
 |-- Result: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- developer: string (nullable = true)
 |    |    |-- developerSales: array (nullable = true)
 |    |    |    |-- element: struct (containsNull = true)
 |    |    |    |    |-- highestPrice: long (nullable = true)
 |    |    |    |    |-- launchedInMonth: long (nullable = true)
 |    |    |    |    |-- launchedToDate: long (nullable = true)
 |    |    |    |    |-- lowestPrice: long (nullable = true)
 |    |    |    |    |-- medianPrice: long (nullable = true)
 |    |    |    |    |-- refPeriod: string (nullable = true)
 |    |    |    |    |-- soldInMonth: long (nullable = true)
 |    |    |    |    |-- soldToDate: long (nullable = true)
 |    |    |    |    |-- unitsAvail: long (nullable = true)
 |    |    |-- district: string (nullable = true)
 |    |    |-- marketSegment: string (nullable = true)
 |    |    |-- project: string (nullable = true)
 |    |    |-- propertyType: string (

In [3]:
# Now we want to explode 2 times to enter into developerSales so later we get the dataframe at developerSales level
from pyspark.sql.functions import explode

df2 = df.select(explode(df.Result).alias("R"))
df2.printSchema()



root
 |-- R: struct (nullable = true)
 |    |-- developer: string (nullable = true)
 |    |-- developerSales: array (nullable = true)
 |    |    |-- element: struct (containsNull = true)
 |    |    |    |-- highestPrice: long (nullable = true)
 |    |    |    |-- launchedInMonth: long (nullable = true)
 |    |    |    |-- launchedToDate: long (nullable = true)
 |    |    |    |-- lowestPrice: long (nullable = true)
 |    |    |    |-- medianPrice: long (nullable = true)
 |    |    |    |-- refPeriod: string (nullable = true)
 |    |    |    |-- soldInMonth: long (nullable = true)
 |    |    |    |-- soldToDate: long (nullable = true)
 |    |    |    |-- unitsAvail: long (nullable = true)
 |    |-- district: string (nullable = true)
 |    |-- marketSegment: string (nullable = true)
 |    |-- project: string (nullable = true)
 |    |-- propertyType: string (nullable = true)
 |    |-- street: string (nullable = true)
 |    |-- x: string (nullable = true)
 |    |-- y: string (nullable = tr

In [4]:
# Now we want to explode 2 times to enter into developerSales so later we get the dataframe at developerSales level
from pyspark.sql.functions import explode

df2 = df.select(explode(df.Result).alias("R"))
df2.printSchema()


root
 |-- R: struct (nullable = true)
 |    |-- developer: string (nullable = true)
 |    |-- developerSales: array (nullable = true)
 |    |    |-- element: struct (containsNull = true)
 |    |    |    |-- highestPrice: long (nullable = true)
 |    |    |    |-- launchedInMonth: long (nullable = true)
 |    |    |    |-- launchedToDate: long (nullable = true)
 |    |    |    |-- lowestPrice: long (nullable = true)
 |    |    |    |-- medianPrice: long (nullable = true)
 |    |    |    |-- refPeriod: string (nullable = true)
 |    |    |    |-- soldInMonth: long (nullable = true)
 |    |    |    |-- soldToDate: long (nullable = true)
 |    |    |    |-- unitsAvail: long (nullable = true)
 |    |-- district: string (nullable = true)
 |    |-- marketSegment: string (nullable = true)
 |    |-- project: string (nullable = true)
 |    |-- propertyType: string (nullable = true)
 |    |-- street: string (nullable = true)
 |    |-- x: string (nullable = true)
 |    |-- y: string (nullable = tr

In [5]:
# this is end explosion to enter into developer sales.
# cannot just do 1 explosion of df.Result.developerSales, because it will endup as struct is array
df3= df2.select(explode(df2.R.developerSales).alias("developerSales"))
df3.printSchema()
df4 = df3.select(df3.developerSales.launchedToDate.alias("NumUnitsLaunchedToDate"), \
                df3.developerSales.soldToDate.alias("NumUnitsSoldToDate"), \
                df3.developerSales.highestPrice.alias("HighestPricePerSQFT"), \
                df3.developerSales.lowestPrice.alias("LowestPricePerSQFT"), \
                df3.developerSales.medianPrice.alias("MediumPricePerSQFT"), \
                df3.developerSales.soldInMonth.alias("SoldInMonth"))
df4.show()

root
 |-- developerSales: struct (nullable = true)
 |    |-- highestPrice: long (nullable = true)
 |    |-- launchedInMonth: long (nullable = true)
 |    |-- launchedToDate: long (nullable = true)
 |    |-- lowestPrice: long (nullable = true)
 |    |-- medianPrice: long (nullable = true)
 |    |-- refPeriod: string (nullable = true)
 |    |-- soldInMonth: long (nullable = true)
 |    |-- soldToDate: long (nullable = true)
 |    |-- unitsAvail: long (nullable = true)

+----------------------+------------------+-------------------+------------------+------------------+-----------+
|NumUnitsLaunchedToDate|NumUnitsSoldToDate|HighestPricePerSQFT|LowestPricePerSQFT|MediumPricePerSQFT|SoldInMonth|
+----------------------+------------------+-------------------+------------------+------------------+-----------+
|                    19|                 9|                  0|                 0|                 0|          0|
|                   720|               699|                  0|         

In [6]:
# calculate PercentageSold
import pyspark.sql.functions as func
df5 = df4.withColumn("PercentageSold",(df4["NumUnitsSoldToDate"]/df4["NumUnitsLaunchedToDate"])*100)
# round 'PercentageSold' in a new column:
# https://stackoverflow.com/questions/47046827/trouble-with-pyspark-round-function
df5 = df5.withColumn("PercentageSold", func.round(df5["PercentageSold"], 2))

df5.show()

+----------------------+------------------+-------------------+------------------+------------------+-----------+--------------+
|NumUnitsLaunchedToDate|NumUnitsSoldToDate|HighestPricePerSQFT|LowestPricePerSQFT|MediumPricePerSQFT|SoldInMonth|PercentageSold|
+----------------------+------------------+-------------------+------------------+------------------+-----------+--------------+
|                    19|                 9|                  0|                 0|                 0|          0|         47.37|
|                   720|               699|                  0|                 0|                 0|          0|         97.08|
|                   200|                77|               1614|              1281|              1434|         77|          38.5|
|                   505|               505|               1455|              1455|              1455|          1|         100.0|
|                     0|                 0|                  0|                 0|               

In [7]:
# add a key to this table because this table is made from the json exploded 2 levels, but other
# values we draw like Project and District are exploded 1 level in, so there is no connection

from pyspark.sql.functions import monotonically_increasing_id
df6 = df5.select("*").withColumn("id", monotonically_increasing_id())
df6.show()
df6.count()

+----------------------+------------------+-------------------+------------------+------------------+-----------+--------------+---+
|NumUnitsLaunchedToDate|NumUnitsSoldToDate|HighestPricePerSQFT|LowestPricePerSQFT|MediumPricePerSQFT|SoldInMonth|PercentageSold| id|
+----------------------+------------------+-------------------+------------------+------------------+-----------+--------------+---+
|                    19|                 9|                  0|                 0|                 0|          0|         47.37|  0|
|                   720|               699|                  0|                 0|                 0|          0|         97.08|  1|
|                   200|                77|               1614|              1281|              1434|         77|          38.5|  2|
|                   505|               505|               1455|              1455|              1455|          1|         100.0|  3|
|                     0|                 0|                  0|      

131

In [8]:
# now explode only the layer of "Result", which is df2 above
df7 = df2.select(df2.R.project.alias("Project"),df2.R.district.alias("District"), df2.R.street.alias("Street"),\
                 df2.R.marketSegment.alias("MarketSegment"), \
                 df2.R.propertyType.alias("PropertyType"))
df8 = df7.select("*").withColumn("id", monotonically_increasing_id())
df8.show()
df8.count()

+--------------------+--------+--------------------+-------------+-------------+---+
|             Project|District|              Street|MarketSegment| PropertyType| id|
+--------------------+--------+--------------------+-------------+-------------+---+
|   CAYMAN RESIDENCES|      15|   EAST COAST AVENUE|          OCR|       Landed|  0|
|GRANDEUR PARK RES...|      16|BEDOK SOUTH AVENUE 3|          OCR|   Non-Landed|  1|
|THE FLORENCE RESI...|      19|    HOUGANG AVENUE 2|          OCR|   Non-Landed|  2|
|  THE CLEMENT CANOPY|      05|   CLEMENTI AVENUE 1|          OCR|   Non-Landed|  3|
|COASTLINE RESIDENCES|      15|          AMBER ROAD|          RCR|   Non-Landed|  4|
|             REZI 24|      14|   LORONG 24 GEYLANG|          RCR|   Non-Landed|  5|
|THE GREEN COLLECTION|      04|          COVE DRIVE|          CCR|Strata-Landed|  6|
|  SEASIDE RESIDENCES|      15|         SIGLAP LINK|          OCR|   Non-Landed|  7|
|               ARTRA|      03|      ALEXANDRA VIEW|          RCR

131

In [9]:
# joining df6 and df8 by the id key
df_final = df6.join(df8, df8["id"] == df6["id"])
df_final= df_final.select("Project","PercentageSold","NumUnitsLaunchedToDate",
                          "NumUnitsSoldToDate","District","Street", 
                          "HighestPricePerSQFT",
                          "LowestPricePerSQFT",
                          "MediumPricePerSQFT",
                          "MarketSegment",
                          "SoldInMonth")

df_final.printSchema()

root
 |-- Project: string (nullable = true)
 |-- PercentageSold: double (nullable = true)
 |-- NumUnitsLaunchedToDate: long (nullable = true)
 |-- NumUnitsSoldToDate: long (nullable = true)
 |-- District: string (nullable = true)
 |-- Street: string (nullable = true)
 |-- HighestPricePerSQFT: long (nullable = true)
 |-- LowestPricePerSQFT: long (nullable = true)
 |-- MediumPricePerSQFT: long (nullable = true)
 |-- MarketSegment: string (nullable = true)
 |-- SoldInMonth: long (nullable = true)



In [10]:
# cast the datatype according to the specs
df_final = df_final.withColumn("NumUnitsLaunchedToDate", df_final['NumUnitsLaunchedToDate'].cast("int"))
df_final = df_final.withColumn("NumUnitsSoldToDate", df_final['NumUnitsSoldToDate'].cast("int"))
df_final = df_final.withColumn("District", df_final['District'].cast("int"))
df_final = df_final.withColumn("HighestPricePerSQFT", df_final['District'].cast("double"))
df_final = df_final.withColumn("LowestPricePerSQFT", df_final['District'].cast("double"))
df_final = df_final.withColumn("MediumPricePerSQFT", df_final['District'].cast("double"))
df_final = df_final.withColumn("SoldInMonth", df_final['SoldInMonth'].cast("int"))
df_final.printSchema()

root
 |-- Project: string (nullable = true)
 |-- PercentageSold: double (nullable = true)
 |-- NumUnitsLaunchedToDate: integer (nullable = true)
 |-- NumUnitsSoldToDate: integer (nullable = true)
 |-- District: integer (nullable = true)
 |-- Street: string (nullable = true)
 |-- HighestPricePerSQFT: double (nullable = true)
 |-- LowestPricePerSQFT: double (nullable = true)
 |-- MediumPricePerSQFT: double (nullable = true)
 |-- MarketSegment: string (nullable = true)
 |-- SoldInMonth: integer (nullable = true)



In [11]:
# Run SQL queries on Dataframe: filter top 20
# Register the DataFrame as a SQL temporary view
df_final.createOrReplaceTempView("privateProperties2")

sqlDF = spark.sql("SELECT * FROM privateProperties2 \
WHERE PercentageSold IS NOT NULL")
sqlDF.show()


+--------------------+--------------+----------------------+------------------+--------+--------------------+-------------------+------------------+------------------+-------------+-----------+
|             Project|PercentageSold|NumUnitsLaunchedToDate|NumUnitsSoldToDate|District|              Street|HighestPricePerSQFT|LowestPricePerSQFT|MediumPricePerSQFT|MarketSegment|SoldInMonth|
+--------------------+--------------+----------------------+------------------+--------+--------------------+-------------------+------------------+------------------+-------------+-----------+
|   CAYMAN RESIDENCES|         47.37|                    19|                 9|      15|   EAST COAST AVENUE|               15.0|              15.0|              15.0|          OCR|          0|
|GRANDEUR PARK RES...|         97.08|                   720|               699|      16|BEDOK SOUTH AVENUE 3|               16.0|              16.0|              16.0|          OCR|          0|
|THE FLORENCE RESI...|        

In [12]:
# save the final dataframe
sqlDF.repartition(1).write.format("csv").option("header","true").mode("overwrite").save("All_Unsold_PrivateProperties_bonus.csv")
