In [1]:
import pandas as pd
from pyspark.sql import functions as F
import numpy as np
from math import cos, asin, sqrt, pi

In [2]:
from pyspark.sql import SparkSession

# Create a spark session (which will run spark jobs)
spark = (
    SparkSession.builder.appName("MAST30034 Project 1")
    .config("spark.sql.repl.eagerEval.enabled", True) 
    .config("spark.sql.parquet.cacheMetadata", "true")
    .config("spark.sql.session.timeZone", "Etc/UTC")
    .config("spark.executor.memory", "2g")
    .config("spark.driver.memory", "4g")
    .getOrCreate()
)

22/09/16 21:19:35 WARN Utils: Your hostname, DESKTOP-1D7SN6N resolves to a loopback address: 127.0.1.1; using 172.17.150.234 instead (on interface eth0)
22/09/16 21:19:35 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address


Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


22/09/16 21:19:36 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [3]:
# The goal is to find the number of each type of facilities near each property
# and the distance to the nearest facilities
# Therefore we need an individual index for each property
# Furthermore, we only need properties' coordinate to calculate distances

# THIS NOTEBOOK CALCULATE DISTANCE ACCORDING TO STRAIGHT LINE DISTANCE
# THE RESULT WILL BE FURTHER PROCESSED TO GET A MORE ACCURACTE APPROXIMATION VIA API
properties = pd.read_csv('../data/curated/properties.csv') 
properties = properties.reset_index()
properties = properties[['index', 'prop_lat', 'prop_long']]
sparkProperty=spark.createDataFrame(properties) 

In [4]:
"""
Calculate distance between two points, the calculation is based on haversine formula
Reference of implementation:
https://stackoverflow.com/questions/27928/calculate-distance-between-two-latitude-longitude-points-haversine-formula

param: latitude, longitude of the two positions
return: the distance in km between the two positions
"""
EQUATOR_DIAMETER = 12742
def distance(lat1, lon1, lat2, lon2):
    p = pi/180
    a = 0.5 - cos((lat2-lat1)*p)/2 + cos(lat1*p) * cos(lat2*p) * (1-cos((lon2-lon1)*p))/2
    return EQUATOR_DIAMETER * asin(sqrt(a)) 
distanceUDF = F.udf(lambda a,b,c,d: distance(a,b,c,d))

### Calculate Distance to School

In [5]:
schools = pd.read_csv('../data/curated/schools.csv')
sparkSchool = spark.createDataFrame(schools)

In [6]:
# For each property, calculate its distance to every school
school_distance = sparkProperty \
            .crossJoin(sparkSchool) \
            .withColumn("distance", 
                        distanceUDF(F.col("prop_lat"), F.col("prop_long"), F.col("school_lat"), F.col("school_long"))
                       )

In [7]:
# Count for number of schools within 3km of each property
school_count = school_distance \
            .filter(F.col("distance") <= 3) \
            .groupBy(F.col("index")) \
            .count() \
            .join(sparkProperty, 'index', 'right') \
            .orderBy('index') \
            .toPandas()

22/09/16 21:19:40 WARN ExtractPythonUDFFromJoinCondition: The join condition:(cast(<lambda>(prop_lat#1, prop_long#2, school_lat#7, school_long#6)#15 as int) <= 3) of the join plan contains PythonUDF only, it will be moved out and the join plan will be turned to cross join.


                                                                                

In [8]:
# Find the nearest distance to school for each property
nearestSchool = school_distance \
            .groupBy(F.col('index')) \
            .agg({'distance': 'min'})

In [9]:
# Find the nearest school for each property according to distance
nearestSchoolCoord = school_distance \
            .join(nearestSchool, 'index') \
            .filter(F.col("distance") == F.col("min(distance)")) \
            .select('index','school_lat', 'school_long', 'distance') \
            .toPandas()

22/09/16 21:20:03 WARN ExtractPythonUDFFromJoinCondition: The join condition:isnotnull(<lambda>(prop_lat#1, prop_long#2, school_lat#7, school_long#6)#15) of the join plan contains PythonUDF only, it will be moved out and the join plan will be turned to cross join.


                                                                                

In [10]:
# Fill propertys having no schools within 3km as 0
school_count[['count']] = school_count[['count']].fillna(0)

In [11]:
# Get the number of school, and the position of nearest school
distanceSchool = pd.merge(school_count, nearestSchoolCoord, how='inner', on = 'index')

In [12]:
distanceSchool = distanceSchool.rename(columns = {'count':'numSchool_3km', 'distance': 'distance_school'})

In [13]:
distanceSchool = distanceSchool[['index', 'numSchool_3km', 'school_lat', 'school_long', 'distance_school']]

In [14]:
# Combine the distance to each property
properties = pd.merge(properties, distanceSchool, how='inner', on='index')

### Calculate Distance to Hospital

In [15]:
hospitals = pd.read_csv("../data/curated/hospitals_vic.csv")
sparkHospital = spark.createDataFrame(hospitals)

In [16]:
# For each property, calculate its distance to every hospital
hospital_distance = sparkProperty \
            .crossJoin(sparkHospital) \
            .withColumn("distance", 
                        distanceUDF(F.col("prop_lat"), F.col("prop_long"), F.col("Latitude"), F.col("Longitude"))
                       )

In [17]:
# Count for number of hospitals within 1km of each property
hospital_count = hospital_distance \
            .filter(F.col("distance") <= 1) \
            .groupBy(F.col("index")) \
            .count() \
            .join(sparkProperty, 'index', 'right') \
            .orderBy('index') \
            .toPandas()

22/09/16 21:20:58 WARN ExtractPythonUDFFromJoinCondition: The join condition:(cast(<lambda>(prop_lat#1, prop_long#2, Latitude#82, Longitude#83)#99 as int) <= 1) of the join plan contains PythonUDF only, it will be moved out and the join plan will be turned to cross join.


                                                                                

In [18]:
# Find the nearest distance to hospital for each property
nearestHospital = hospital_distance \
            .groupBy(F.col('index')) \
            .agg({'distance': 'min'})

In [19]:
# Find the nearest hospital for each property according to distance
nearestHospitalCoord = hospital_distance \
            .join(nearestHospital, 'index') \
            .filter(F.col("distance") == F.col("min(distance)")) \
            .select('index','Latitude', 'Longitude', 'distance') \
            .toPandas()

22/09/16 21:21:07 WARN ExtractPythonUDFFromJoinCondition: The join condition:isnotnull(<lambda>(prop_lat#1, prop_long#2, Latitude#82, Longitude#83)#99) of the join plan contains PythonUDF only, it will be moved out and the join plan will be turned to cross join.


                                                                                

In [20]:
# Fill propertys having no hospitals within 1km as 0
hospital_count[['count']] = hospital_count[['count']].fillna(0)

In [21]:
# Get the number of Hospital, and the position of nearest hospital
distanceHospital = pd.merge(hospital_count, nearestHospitalCoord, how='inner', on = 'index')

In [22]:
distanceHospital = distanceHospital.rename(columns = {'count':'numHospital_1km', 
                                                      'Latitude': 'hospital_lat', 
                                                      'Longitude':'hospital_long',
                                                      'distance': 'distance_hospital'})

In [23]:
distanceHospital = distanceHospital[['index', 'numHospital_1km', 'hospital_lat', 'hospital_long', 'distance_hospital']]

In [24]:
# Combine the distance to each property
properties = pd.merge(properties, distanceHospital, how='inner', on='index')

### Calculate Distance to Train Stations

In [25]:
stations = pd.read_csv('../data/curated/traffic_dataset.csv')
sparkStation = spark.createDataFrame(stations).select('STOP_ID', 'LATITUDE', 'LONGITUDE', 'Pax_annual')

In [26]:
# For each property, calculate its distance to every train station
station_distance = sparkProperty \
            .crossJoin(sparkStation) \
            .withColumn("distance", 
                        distanceUDF(F.col("prop_lat"), F.col("prop_long"), F.col("LATITUDE"), F.col("LONGITUDE"))
                       )

In [27]:
# Count for number of stations within 1km of each property
station_count = station_distance \
            .filter(F.col("distance") <= 1) \
            .groupBy(F.col("index")) \
            .count() \
            .join(sparkProperty, 'index', 'right') \
            .orderBy('index') \
            .toPandas()

22/09/16 21:21:29 WARN ExtractPythonUDFFromJoinCondition: The join condition:(cast(<lambda>(prop_lat#1, prop_long#2, LATITUDE#185, LONGITUDE#186)#211 as int) <= 1) of the join plan contains PythonUDF only, it will be moved out and the join plan will be turned to cross join.


                                                                                

In [28]:
# Find the nearest distance to station for each property
nearestStation = station_distance \
            .groupBy(F.col('index')) \
            .agg({'distance': 'min'})

In [29]:
# Find the nearest station for each property according to distance
nearestStationCoord = station_distance \
            .join(nearestStation, 'index') \
            .filter(F.col("distance") == F.col("min(distance)")) \
            .select('index','LATITUDE', 'LONGITUDE', 'distance') \
            .toPandas()

22/09/16 21:21:37 WARN ExtractPythonUDFFromJoinCondition: The join condition:isnotnull(<lambda>(prop_lat#1, prop_long#2, LATITUDE#185, LONGITUDE#186)#211) of the join plan contains PythonUDF only, it will be moved out and the join plan will be turned to cross join.


                                                                                

In [30]:
# Fill propertys having no train stations within 1km as 0
station_count[['count']] = station_count[['count']].fillna(0)

In [31]:
# Get the number of stations, and the position of nearest station
distanceStation = pd.merge(station_count, nearestStationCoord, how='inner', on = 'index')


In [32]:
distanceStation = distanceStation.rename(columns = {'count':'numStation_1km', 
                                                      'LATITUDE': 'station_lat', 
                                                      'LONGITUDE':'station_long',
                                                      'distance': 'distance_station'})


In [33]:
distanceStation = distanceStation[['index', 'numStation_1km', 'station_lat', 'station_long', 'distance_station']]

In [34]:
# Combine the distance to each property
properties = pd.merge(properties, distanceStation, how='inner', on='index')

### Calculate Distance to Entertainment Facilities

In [35]:
facility = pd.read_csv('../data/curated/Facilites_list.csv')
facility = facility[['Latitude', 'Longitude']]

# some different facility shares same position, we decide to treat them as one facility
# This is because we need to ensure each property only has one nearest facility
# Otherwise inconsistency will occur in dataframes, resulting in more records
facility = facility.dropna().drop_duplicates()
sparkFacility = spark.createDataFrame(facility)

In [36]:
# For each property, calculate its distance to every entertainment facility
facility_distance = sparkProperty \
            .crossJoin(sparkFacility) \
            .withColumn("distance", 
                        distanceUDF(F.col("prop_lat"), F.col("prop_long"), F.col("Latitude"), F.col("Longitude"))
                       )

In [37]:
# Count for number of facilities within 1km of each property
facility_count = facility_distance \
            .filter(F.col("distance") <= 3) \
            .groupBy(F.col("index")) \
            .count() \
            .join(sparkProperty, 'index', 'right') \
            .orderBy('index') \
            .toPandas()

22/09/16 21:21:58 WARN ExtractPythonUDFFromJoinCondition: The join condition:(cast(<lambda>(prop_lat#1, prop_long#2, Latitude#289, Longitude#290)#298 as int) <= 3) of the join plan contains PythonUDF only, it will be moved out and the join plan will be turned to cross join.


                                                                                

In [38]:
# Find the nearest distance to facility for each property
nearestFacility = facility_distance \
            .groupBy(F.col('index')) \
            .agg({'distance': 'min'})

In [39]:
# Find the nearest facility for each property according to distance
nearestFacilityCoord = facility_distance \
            .join(nearestFacility, 'index', 'inner') \
            .filter(F.col("distance") == F.col("min(distance)")) \
            .select('index','Latitude', 'Longitude', 'distance') \
            .orderBy('index') \
            .toPandas()

22/09/16 21:22:26 WARN ExtractPythonUDFFromJoinCondition: The join condition:isnotnull(<lambda>(prop_lat#1, prop_long#2, Latitude#289, Longitude#290)#298) of the join plan contains PythonUDF only, it will be moved out and the join plan will be turned to cross join.


                                                                                

In [40]:
# Fill propertys having no facilities within 3km as 0
facility_count[['count']] = facility_count[['count']].fillna(0)

In [41]:
# Get the number of facilities, and the position of nearest facility
distanceFacility = pd.merge(facility_count, nearestFacilityCoord, how='inner', on = 'index')

In [42]:
distanceFacility = distanceFacility.rename(columns = {'count':'numFacility_3km', 
                                                      'Latitude': 'facility_lat', 
                                                      'Longitude':'facility_long',
                                                      'distance': 'distance_facility'})


In [43]:
distanceFacility = distanceFacility[['index', 'numFacility_3km', 'facility_lat', 'facility_long', 'distance_facility']]

In [44]:
# Combine the distance to each property
properties = pd.merge(properties, distanceFacility, how='inner', on='index')

In [45]:
properties = properties.drop_duplicates(subset = 'index').set_index('index')

In [46]:
properties.to_csv('../data/curated/distance_to_property.csv', index = False)

In [47]:
# Here is the result dataframe, it contains each property's index and its location
# also the number of different buildings near them, as well as the distance to nearest buildings
properties

Unnamed: 0_level_0,prop_lat,prop_long,numSchool_3km,school_lat,school_long,distance_school,numHospital_1km,hospital_lat,hospital_long,distance_hospital,numStation_1km,station_lat,station_long,distance_station,numFacility_3km,facility_lat,facility_long,distance_facility
index,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1,Unnamed: 8_level_1,Unnamed: 9_level_1,Unnamed: 10_level_1,Unnamed: 11_level_1,Unnamed: 12_level_1,Unnamed: 13_level_1,Unnamed: 14_level_1,Unnamed: 15_level_1,Unnamed: 16_level_1,Unnamed: 17_level_1,Unnamed: 18_level_1
0,-37.780782,144.813020,33.0,-37.78219,144.82020,0.6501404158195666,0.0,-37.693365,144.758677,10.83147814710251,2.0,-37.777653,144.824704,1.0842142151889327,58.0,-37.781765,144.814164,0.14850882293301257
1,-37.869028,144.807250,12.0,-37.86390,144.81167,0.6896526476051238,1.0,-37.869448,144.828494,1.8653500382034183,2.0,-37.865163,144.813494,0.6964726085221332,21.0,-37.873873,144.811386,0.6496779101999998
2,-37.276411,142.920079,6.0,-37.27736,142.92100,0.1333351425115027,1.0,-37.278816,142.932858,1.1618697631779715,0.0,-37.579091,144.727319,163.09079964570816,20.0,-37.279115,142.914920,0.5465960785569935
3,-37.292905,142.921881,6.0,-37.28600,142.92196,0.7678326903981126,1.0,-37.278816,142.932858,1.843199687372491,0.0,-37.579091,144.727319,162.54898938018138,21.0,-37.292104,142.935676,1.2235626246941527
4,-37.782361,144.808833,30.0,-37.78753,144.80752,0.5862197076000492,0.0,-37.693365,144.758677,10.834261807364065,1.0,-37.777653,144.824704,1.489848932777189,55.0,-37.782572,144.805283,0.31284398999338375
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
9077,-38.105231,145.134350,15.0,-38.10444,145.13050,0.34816382989589356,0.0,-38.243369,143.985357,101.60528521826753,2.0,-38.104021,145.128230,0.5521356365337967,31.0,-38.101721,145.129820,0.5562710099279955
9078,-38.354365,144.758948,2.0,-37.46289,144.59658,100.14569569048717,0.0,-38.361748,144.883554,10.895777765396797,0.0,-38.337390,145.178027,36.59569383784788,7.0,-38.362027,144.764383,0.9749237018181851
9079,-37.833083,144.966977,50.0,-37.83589,144.97186,0.5303940155951052,2.0,-37.834246,144.972151,0.47241226296551453,1.0,-37.818305,144.966964,1.6431834497373843,70.0,-37.839805,144.967048,0.7475336064227691
9080,-37.840129,144.996587,58.0,-37.83762,144.99768,0.29501884947033125,6.0,-37.847714,144.997225,0.8452948276125342,8.0,-37.838449,144.992342,0.41694301000288997,85.0,-37.836983,144.995195,0.3705376632873842
