In [252]:
import pyspark.sql as sq
from pyspark.sql.functions import *
import os
import pandas as pd
import numpy as np

import matplotlib.pyplot as plt

In [253]:
spark = sq.SparkSession.builder.master("local").appName("my app").config("spark.some.config.option", "some-value").getOrCreate()

path = '/home/jovyan/work/Data'


poi = spark.read.csv(os.path.join(path,"POIList.csv"),header = True,inferSchema = True)
data = spark.read.csv(os.path.join(path,"DataSample.csv"),header = True,inferSchema = True)


In [254]:
data_aa = (data.groupBy([' TimeSt', 'Latitude', 'Longitude']).agg(collect_list("_ID").alias("_ID2")).where(size("_ID2") > 1)).select(explode("_ID2").alias("_ID"))
data = data.join(data_aa, data._ID == data_aa._ID, "left_anti").drop(data_aa._ID) 

In [255]:
poi = poi.where("POIID != 'POI1'")

In [256]:
poi = poi.select('POIID',col(" Latitude").alias("poiLatitude"), col("Longitude").alias("poiLongitude"))
data = data.crossJoin(poi)

In [257]:
def hav_dist (lat1, lon1, lat2, lon2):
    R = 6371
    lon1 = toRadians(lon1)
    lat1 = toRadians(lat1)
    lon2 = toRadians(lon2)
    lat2 = toRadians(lat2)
    d_lon = lon2 - lon1
    d_lat = lat2 - lat1
    a = sin(d_lat/2) ** 2 + cos(lat1) * cos(lat2) * sin(d_lon/2) ** 2
    c = 2 * asin(sqrt(a))
    Z = R * c
    return Z


In [258]:
data1 = data.withColumn("Poidistance", hav_dist(data['Latitude'],data['Longitude'], data['poiLatitude'],data['poiLongitude']))

In [261]:
data2 = data1.groupBy('_ID').min('Poidistance')
data_Final = data1.join(data2,(data1['_ID'] == data2['_ID']) & (data1['Poidistance'] == data2['min(Poidistance)'])).drop(data2._ID)

# To view table
#df=data_Final.toPandas()
#print(df)

In [264]:
Avg_Std = data_Final.groupBy('POIID').agg(avg("min(Poidistance)").alias('Average'), stddev("min(Poidistance)").alias('Standard Deviation'))
Avg_Std.show()


+-----+------------------+------------------+
|POIID|           Average|Standard Deviation|
+-----+------------------+------------------+
| POI4| 514.9971719812205|1506.8899707703229|
| POI2| 300.7147475686839| 388.2733852635426|
| POI3|451.65114920151376| 223.6317418310286|
+-----+------------------+------------------+



In [265]:
Radius = data_Final.groupBy('POIID').agg(max("min(Poidistance)").alias('Radius'), count("min(Poidistance)").alias('Count'))
Density = Radius.withColumn('density',Radius['Count'] / (Radius['Radius'] ** 2 * np.pi))
Density.show()

+-----+------------------+-----+--------------------+
|POIID|            Radius|Count|             density|
+-----+------------------+-----+--------------------+
| POI4| 9349.572770487368|  422|1.536664455904176...|
| POI2|11531.820831836454| 8749|2.094174038984837...|
| POI3|1474.5809620285695| 8802|0.001288529145748...|
+-----+------------------+-----+--------------------+



In [242]:
#Removing outliers if any to be more sensitive around the Average

In [273]:
Quan1=data.approxQuantile("min(Poidistance)", [0.30], 0.05)
Quan3=data.approxQuantile("min(Poidistance)", [0.70], 0.05)
IQR = Quan3[0] - Quan1[0]
lowerRange = Quan1[0] - 1.5*IQR
upperRange = Quan3[0] + 1.5*IQR

Avg_Std2=data_Final[(data_Final['min(Poidistance)']>lowerRange) & (data_Final['min(Poidistance)']<upperRange)].groupBy('POIID').agg(avg("min(Poidistance)").alias('AVG'), stddev("min(Poidistance)").alias('STD'))
Avg_Std2.show()

Radius2=data_Final[(data_Final['min(Poidistance)']>lowerRange) & (data_Final['min(Poidistance)']<upperRange)].groupBy('POIID').agg(max("min(Poidistance)").alias('Radius'), count("min(Poidistance)").alias('Count'))
Density2=Radius2.withColumn('density',Radius2['Count']/(Radius2['Radius']**2*np.pi))
Density2.show()

+-----+-----------------+------------------+
|POIID|              AVG|               STD|
+-----+-----------------+------------------+
| POI4|242.9136013544121|229.16911490167877|
| POI2|256.2837722293357|223.29841086593186|
| POI3|435.6164567176206|  194.967417289931|
+-----+-----------------+------------------+

+-----+-----------------+-----+--------------------+
|POIID|           Radius|Count|             density|
+-----+-----------------+-----+--------------------+
| POI4|855.2562887218603|  408|1.775487985779238...|
| POI2|856.2474702903709| 8365|0.003631762749097...|
| POI3|845.6750365816498| 8627|0.003839749597973...|
+-----+-----------------+-----+--------------------+



In [276]:
AD = Density.toPandas() 
minn = np.min(AD['density'])
maxx = np.max(AD['density'])
AD['density_Final']=20*(AD['density']-minn)/(maxx-minn)-10
AD

Unnamed: 0,POIID,Radius,Count,density,density_Final
0,POI4,9349.57277,422,2e-06,-10.0
1,POI2,11531.820832,8749,2.1e-05,-9.698443
2,POI3,1474.580962,8802,0.001289,10.0
