# Preparation/ Set Up

- **Import necessary libraries**

- **Create Spark Session**

- **Read CSVs into dataframes**
    - Modify headers for easier use

In [1]:
from pyspark.sql import SparkSession
import pyspark.sql.functions as f
from pyspark.sql.types import DoubleType

In [2]:
spark = SparkSession.builder \
    .master("local") \
    .appName("EQ Test") \
    .getOrCreate()

In [3]:
df = spark.read\
.options(header='True', inferSchema='True', delimiter=',')\
.csv('../ws-data-spark/data/DataSample.csv')

df.printSchema()

root
 |-- _ID: integer (nullable = true)
 |--  TimeSt: string (nullable = true)
 |-- Country: string (nullable = true)
 |-- Province: string (nullable = true)
 |-- City: string (nullable = true)
 |-- Latitude: double (nullable = true)
 |-- Longitude: double (nullable = true)



In [4]:
poi = spark.read\
.options(header='True', inferSchema='True', delimiter=',')\
.csv('../ws-data-spark/data/POIList.csv')

poi.printSchema()

root
 |-- POIID: string (nullable = true)
 |--  Latitude: double (nullable = true)
 |-- Longitude: double (nullable = true)



In [5]:
df = df.withColumnRenamed(' TimeSt', 'TimeSt')

poi = poi.withColumnRenamed(' Latitude','PLat').withColumnRenamed('Longitude','PLong')

**Create haversine function to calculate distance between points**
- Use for when determining minimum distance and radius

In [22]:
def dist(lat1, long1, lat2, long2):
    """
    Calculate the great circle distance between two points 
    on the earth (specified in decimal degrees)
    """
    # convert decimal degrees to radians 
    lat1, long1, lat2, long2 = map(f.toRadians, [lat1, long1, lat2, long2])
    # haversine formula 
    dlon = long2 - long1 
    dlat = lat2 - lat1 
    a = f.sin(dlat/2)**2 + f.cos(lat1) * f.cos(lat2) * f.sin(dlon/2)**2
    c = 2 * f.asin(f.sqrt(a)) 
    # Radius of earth in kilometers is 6371
    km = 6371* c
    return f.round(km, 3)


# A. Clean-up

**Remove duplicates from DataSample**

4052 rows removed

In [23]:
df = df.dropDuplicates(['TimeSt','Latitude','Longitude'])

# Also removed duplicate POI (Kept POI 1, dropped POI 2 )
poi = poi.dropDuplicates(['PLat','PLong'])

# B. Label
**Combined both dataframes to identify POI with minimum distance, utilizes haversine function**

In [24]:
combine = df.crossJoin(poi)

In [25]:
combine = combine.withColumn("distance", dist('Latitude','Longitude','PLat','PLong'))

In [26]:
combine.show()

+-------+--------------------+-------+--------+---------+--------+---------+-----+---------+-----------+--------+
|    _ID|              TimeSt|Country|Province|     City|Latitude|Longitude|POIID|     PLat|      PLong|distance|
+-------+--------------------+-------+--------+---------+--------+---------+-----+---------+-----------+--------+
|5613403|2017-06-21 00:22:...|     CA|      ON|Etobicoke| 43.6381| -79.5787| POI1|53.546167|-113.485734|2693.422|
|5613403|2017-06-21 00:22:...|     CA|      ON|Etobicoke| 43.6381| -79.5787| POI4| 45.22483| -63.232729|1307.551|
|5613403|2017-06-21 00:22:...|     CA|      ON|Etobicoke| 43.6381| -79.5787| POI3|45.521629| -73.566024| 520.056|
|5013924|2017-06-21 00:34:...|     CA|      ON|  Toronto| 43.6606| -79.4635| POI1|53.546167|-113.485734| 2699.44|
|5013924|2017-06-21 00:34:...|     CA|      ON|  Toronto| 43.6606| -79.4635| POI4| 45.22483| -63.232729|1297.955|
|5013924|2017-06-21 00:34:...|     CA|      ON|  Toronto| 43.6606| -79.4635| POI3|45.521

In [12]:
minlabel = comp.groupBy('_ID').min('distance')\
.withColumnRenamed('min(distance)','distance')\
.join(comp,['_ID','distance'])

In [13]:
minlabel = minlabel.dropDuplicates(['_ID','distance'])

**Each request is assigned to the closes POI**

In [14]:
minlabel.show(5)

+-------+--------+--------------------+-------+--------+---------+--------+----------+-----+---------+-----------+
|    _ID|distance|              TimeSt|Country|Province|     City|Latitude| Longitude|POIID|     PLat|      PLong|
+-------+--------+--------------------+-------+--------+---------+--------+----------+-----+---------+-----------+
|4548784| 573.303|2017-06-21 17:01:...|     CA|      ON|   Guelph| 43.5412|  -80.2469| POI3|45.521629| -73.566024|
|4553085| 597.379|2017-06-21 04:01:...|     CA|      ON|Kitchener| 43.4381|  -80.5099| POI3|45.521629| -73.566024|
|4576386| 279.972|2017-06-21 09:02:...|     CA|      AB|  Calgary|51.06561|-114.19218| POI1|53.546167|-113.485734|
|4598353|  143.94|2017-06-21 12:03:...|     CA|      AB| Red Deer| 52.2651|  -113.794| POI1|53.546167|-113.485734|
|4598655|  15.162|2017-06-21 11:03:...|     CA|      AB| Edmonton| 53.4113|  -113.452| POI1|53.546167|-113.485734|
+-------+--------+--------------------+-------+--------+---------+--------+-----

# C. Analysis P1

**Calculate average and std. deviation of distance for each POI**

In [15]:
analysis1 = minlabel.groupBy('POIID')\
.agg({'distance':'mean'})\
.join(minlabel.groupBy('POIID')\
      .agg({'distance':'stddev'}),'POIID')

analysis1.show()

+-----+------------------+------------------+
|POIID|     avg(distance)|  stddev(distance)|
+-----+------------------+------------------+
| POI4| 497.2787777777776|1472.9378169838208|
| POI1| 301.9068391076385|412.43003372076623|
| POI3|451.52758356304207| 223.3505570503738|
+-----+------------------+------------------+



# P2

**Create circle with POI as central point, inclusive to all of its requests**
**Calculate each POI circle's area and density**

- The radius is determined as being the distance to the farthest request included for the POI

In [18]:
analysis2 = minlabel.groupBy('POIID')\
.agg({'distance':'max'})\
.join(minlabel.groupBy('POIID')\
      .count(), 'POIID')\
.withColumnRenamed('max(distance)','radius km')

analysis2 = analysis2.withColumn('Density req/km2', f.col('count') / (3.1412 * f.col('radius km')* f.col('radius km')))

analysis2.show()

+-----+---------+-----+--------------------+
|POIID|radius km|count|     Density req/km2|
+-----+---------+-----+--------------------+
| POI4| 9349.573|  477|1.736940543790461...|
| POI1|11531.821| 9727|2.328269548875398...|
| POI3| 1474.581| 9795|0.001433894834424...|
+-----+---------+-----+--------------------+

