# EQ Challenge Notebook

## Content
0. Data Collection
1. Clean-Up
2. Label
3. Analysis
4. Data Science/Engineering Tracks
    a. Model
        1- Visualize
        2- Bonus
    b. Pipeline Dependency
  

In [None]:
#https://gist.github.com/woozyking/f1d50e1fe1b3bf52e3748bc280cf941f

## 0. Data Collection 

In [1]:
#!pip install haversine

In [2]:
#!pip install folium

In [3]:
import os
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *
from pyspark.sql import SQLContext
import numpy as np
import haversine as hs
import folium
from itertools import chain
import math

In [4]:
# Starting Spark Session
spark = SparkSession.builder \
       .master("local") \
       .appName("EQ Challenge") \
       .config("spark.executor.memory", "3g")\
       .config("spark.driver.memory", "3g")\
       .config("spark.sql.broadcastTimeout", "36000")\
       .config("spark.cores.max", "4")\
       .getOrCreate()
               
sc = spark.sparkContext
sqlContext = SQLContext(sc)
spark.sparkContext._conf.getAll()  # check the config

[('spark.master', 'local'),
 ('spark.executor.memory', '3g'),
 ('spark.sql.broadcastTimeout', '36000'),
 ('spark.executor.id', 'driver'),
 ('spark.driver.port', '7830'),
 ('spark.cores.max', '4'),
 ('spark.driver.host', '192.168.0.17'),
 ('spark.driver.memory', '3g'),
 ('spark.app.id', 'local-1621444225324'),
 ('spark.rdd.compress', 'True'),
 ('spark.serializer.objectStreamReset', '100'),
 ('spark.submit.deployMode', 'client'),
 ('spark.app.name', 'EQ Challenge'),
 ('spark.ui.showConsoleProgress', 'true')]

In [5]:
# Importing Data

inputPath=os.getcwd()+ '\\data\\'
df = spark.read.options(
    header='True',
    inferSchema='True',
    delimiter=',',
).csv(os.path.join(inputPath+'DataSample.csv'))

poi = spark.read.options(header='True', inferSchema='True', delimiter=',').csv(os.path.join(inputPath,'POIList.csv'))

In [6]:
# Take a look on the Schema
df.printSchema()
poi.printSchema()

root
 |-- _ID: integer (nullable = true)
 |--  TimeSt: timestamp (nullable = true)
 |-- Country: string (nullable = true)
 |-- Province: string (nullable = true)
 |-- City: string (nullable = true)
 |-- Latitude: double (nullable = true)
 |-- Longitude: double (nullable = true)

root
 |-- POIID: string (nullable = true)
 |--  Latitude: double (nullable = true)
 |-- Longitude: double (nullable = true)



In [7]:
# Strip Column names
df = df.select([col(cl).alias(cl.replace(' ', '')) for cl in df.columns])
df = df.select([col(cl).alias(cl.replace('_', '')) for cl in df.columns])
poi = poi.select([col(cl).alias(cl.replace(' ', '')) for cl in poi.columns])

In [8]:
# look at some data
df.show(n=5)
original_df_count=df.count()
print('Number of Records:',original_df_count)

+-------+--------------------+-------+--------+---------+--------+---------+
|     ID|              TimeSt|Country|Province|     City|Latitude|Longitude|
+-------+--------------------+-------+--------+---------+--------+---------+
|4516516|2017-06-21 00:00:...|     CA|      ON| Waterloo|43.49347|-80.49123|
|4516547|2017-06-21 18:00:...|     CA|      ON|   London| 42.9399| -81.2709|
|4516550|2017-06-21 15:00:...|     CA|      ON|   Guelph| 43.5776| -80.2201|
|4516600|2017-06-21 15:00:...|     CA|      ON|Stratford| 43.3716| -80.9773|
|4516613|2017-06-21 15:00:...|     CA|      ON|Stratford| 43.3716| -80.9773|
+-------+--------------------+-------+--------+---------+--------+---------+
only showing top 5 rows

Number of Records: 22025


In [9]:
poi.show()
original_poi_count=poi.count()
print('Number of POI\'s:',original_poi_count)

+-----+---------+-----------+
|POIID| Latitude|  Longitude|
+-----+---------+-----------+
| POI1|53.546167|-113.485734|
| POI2|53.546167|-113.485734|
| POI3|45.521629| -73.566024|
| POI4| 45.22483| -63.232729|
+-----+---------+-----------+

Number of POI's: 4


## 1. Clean-up Data

Records that have identical geoinfo and timest as are considered suspicious and should be removed

In [10]:
df = df.dropDuplicates(['TimeSt','Latitude','Longitude'])
print('Removed',original_df_count-df.count(),'suspecious records')

Removed 2026 suspecious records


Remove Duplicate POI with the same location

In [11]:
poi = poi.dropDuplicates(poi.drop('POIID').columns)
print('Removed',original_poi_count-poi.count(),'redundent POI')

Removed 1 redundent POI


## 2. Label

Assign each request to the closest  POI

In [12]:
# first create a map type column called 'map'
#make_map = udf(lambda x, y: dict(zip(x, y)), MapType(DoubleType(), DoubleType()))
#df_coor=df.select('ID',make_map(col('Latitude'), col('Longitude')).alias('coor'))


In [13]:
def distance_from(lat1, long1, lat2, long2): 
    """ defining a  function to calculate distance between two locations
        loc1= location of an existing POI
        loc2= location of customer"""
    # convert decimal degrees to radians 
    lat1, long1, lat2, long2 = map(toRadians, [lat1, long1, lat2, long2])
    # Haversine formula https://www.movable-type.co.uk/scripts/latlong.html
    distlon = long2 - long1 
    distlat = lat2 - lat1 
    a = sin(distlat/2)**2 + cos(lat1) * cos(lat2) * sin(distlon/2)**2
    c=2 * asin(sqrt(a)) 
    # Radius of the earth in kilometers is 6371
    distKm = 6371* c
    return round(abs(distKm), 4)

In [14]:
# Combine POI table with Requests Table by cross join
comb_df = df.select(['ID', 'Latitude', 'Longitude']).crossJoin(
            poi.withColumnRenamed('Latitude', 'POI_LAT')
                .withColumnRenamed('Longitude', 'POI_LON')
        )
# Calculate the distance from request to each POI using Haversine formula
comb_df = comb_df.withColumn(
            'Distance', distance_from('Latitude','Longitude','POI_LAT','POI_LON'))
# Group request location and calculate the minimum distance to POI
group_data = comb_df.groupBy(['Latitude', 'Longitude']).agg(
            min('Distance')).withColumnRenamed('min(Distance)', 'Distance')
# Left join the minimum distances grouped table to the combined table 
comb_df = comb_df.join(group_data,
                                 on=['Latitude', 'Longitude', 'Distance'],
                                 how='left_semi')
# left join the latest combined table to the initial request table on ID 
comb_df = df.join(
            comb_df.select(['ID', 'POIID', 'Distance']), on=['ID'],
            how='left'
        )
# Show final table sample
comb_df.show(n=5)

+-------+--------------------+-------+--------+-------------+--------+---------+-----+--------+
|     ID|              TimeSt|Country|Province|         City|Latitude|Longitude|POIID|Distance|
+-------+--------------------+-------+--------+-------------+--------+---------+-----+--------+
|4517905|2017-06-21 17:00:...|     CA|      ON|      Windsor| 42.2957| -82.9599| POI3|832.9559|
|4526426|2017-06-21 17:00:...|     CA|      QC|Saint-Nicolas|46.72072|-71.30409| POI3|219.4615|
|4535091|2017-06-21 11:00:...|     CA|      AB|      Calgary|  51.049|-113.9622| POI1|279.5545|
|4545807|2017-06-21 04:01:...|     CA|      ON|      Markham|43.85883|-79.29809| POI3|489.2499|
|4559622|2017-06-21 18:01:...|     CA|      ON|     Kingston| 44.2647| -76.5504| POI3|273.4697|
+-------+--------------------+-------+--------+-------------+--------+---------+-----+--------+
only showing top 5 rows



## 3. Analysis

In [25]:
group_df = comb_df.groupBy(['POIID'])

analys_poi = group_df.agg(
             mean('Distance'), stddev('Distance'),max('Distance'),count('POIID') ) \
            .withColumnRenamed('avg(Distance)', 'Average Distance') \
            .withColumnRenamed('stddev(Distance)', 'SD Distance') \
            .withColumnRenamed('max(Distance)', 'Radius') \
            #.withColumnRenamed('count(POIID)', 'Density')

analys_poi = analys_poi.withColumn('Density', col('count') / (math.pi * col('Radius')**2))
analys_poi.show()

+-----+------------------+------------------+----------+--------------------+
|POIID|  Average Distance|       SD Distance|    Radius|             Density|
+-----+------------------+------------------+----------+--------------------+
| POI4|497.27872746331195|1472.9378033111789| 9349.5728|1.736940618101474...|
| POI1| 301.9068558856792|   412.43003446935|11531.8208|2.328269629635223...|
| POI3| 451.5275540786115|223.35055601525525|  1474.581|0.001433894834424...|
+-----+------------------+------------------+----------+--------------------+



## 4. Data Science/Engineering Tracks

### 4a. Model

First, remove the outlier requests (away from Inter Quartile Range IQR) which are too far away from the POI. 
IQR is a measure of statistical dispersion, which is equal to the difference between the 75th percentile and the 25th percentile.
IQR Can  be used to detect request outliers in a few easy and straightforward steps:
1.	Calculate the 1st quartile Q1.
2.	Calculate the 3rd quartile Q3.
3.	Calculate IQR=Q3−Q1.
4.	Calculate the Requests Range:
    - Lower bound: Q1−1.5∗IQR
    - Upper bound: Q3+1.5∗IQR
5.	Remove any points outside the Requests Range as suspected outliers.



In [16]:

def calculate_ReqRang(df):
    # 1. Calculate the 1st quartile Q1.
    # 2. Calculate the 3rd quartile Q3.
    requests = {
    c: dict(
            zip(["q1", "q3"], df.approxQuantile(c, [0.25, 0.75], 0))
        ) for c,d in zip(df.columns, df.dtypes) if d[1] == "double" # Distance should be of type double
    }

    for c in requests:
    # 3. Calculate IQR=Q3−Q1
        iqr = requests[c]['q3'] - requests[c]['q1']
    # 4. Calculate the Requests Range:
        requests[c]['min'] = requests[c]['q1'] - (iqr * 1.5)
        requests[c]['max'] = requests[c]['q3'] + (iqr * 1.5)

    return requests

In [17]:
IQR=calculate_ReqRang(comb_df.select(['Distance']))
IQR

In [19]:
# 5 Remove any points outside the Requests Range considered as suspected outliers.
bounded_df=comb_df.filter(col("Distance").between(IQR['Distance']['min'],IQR['Distance']['max']))
print('Number of removed outlaiers requests:',comb_df.count()-bounded_df.count())

19486

# Mapping Rang inot another

In [None]:
# Mapping Rang inot another
#https://stackoverflow.com/questions/5731863/mapping-a-numeric-range-onto-another
#https://stackoverflow.com/questions/36358233/is-it-possible-to-scale-data-by-group-in-spark
#https://codereview.stackexchange.com/questions/185785/scale-numpy-array-to-certain-range
#https://rosettacode.org/wiki/Map_range#Python
#https://github.com/ystcheng/ws-data-application/tree/master/data

In [None]:
# mapping 
#https://en.wikipedia.org/wiki/Multi-armed_bandit

In [None]:
# Hypthesis Testing
#Hypothesis testing

In [None]:
def create_map(cust,pois,zoom=6):
    m = folium.Map(location=[cust.Latitude.mean(), cust.Longitude.mean()], zoom_start=zoom, tiles='OpenStreetMap')
    
    for _, row in cust.iterrows():
         
        if row['Condition']=='Out':
            cluster_colour='blue'
        else:
            cluster_colour = 'green'
        
        folium.CircleMarker(
            location= [row['Latitude'],row['Longitude']],
            radius=5,
            popup= row[['Condition']],
            color=cluster_colour,
            fill=True,
            opacity=0.1,
            fill_opacity=1,
            fill_color=cluster_colour
            ).add_to(m)
    
    for _, row in pois.iterrows(): 
        folium.Marker(
            location= [row['Latitude'],row['Longitude']],
            radius=5,
            popup= row['POIID'],
            color='red',
            fill=True,
            opacity=0.1,
            fill_opacity=1,
            fill_color='red'
             ).add_to(m)
    for _, row in pois.iterrows(): 
        folium.Circle(
            location= [row['Latitude'],row['Longitude']],
            radius=500* 1000,
            popup= row['POIID'],
            color='black',
            fill=False,
        ).add_to(m)
        
    return m

In [None]:
cust_loc= comb_df.toPandas()
poi_loc=poi.toPandas()



In [None]:
cust_loc.head()

In [None]:

#poi_list= [poi.select('POIID').collect()[i].POIID for i in range(poi.count())] 
cust_loc['Condition']='Out'

#for i in poi_list:
cust_loc['Condition'][cust_loc.Condition=='Out']=cust_loc[cust_loc.Condition=='Out']['Distance'].apply(lambda x: 'In' if x<500 else 'Out')
cust_loc.sample(10)

In [None]:

m= create_map(cust_loc,poi_loc,zoom=3)
m

In [None]:
spark.stop()