# Extraction of Interest Points

### Setup Spark and GeoSpark

In [39]:
%%configure -f
{
    "conf": {
        "spark.jars": "s3a://ecb-hackathon-202006-dbn-shared/jars/geospark-1.3.1_2.11.jar,s3a://ecb-hackathon-202006-dbn-shared/jars/geo_wrapper_2.11-0.3.0.jar",
        "spark.executor.memory": "2g",
        "spark.driver.memory": "3g",
        "spark.kryoserializer.buffer.max":"1g",
        "geospark.global.index":"false",
        "geospark.global.indextype": "rtree",
        "geospark.join.gridtype": "kdbtree",
        "spark.serializer": "org.apache.spark.serializer.KryoSerializer",
        "spark.kryo.registrator":"org.datasyslab.geospark.serde.GeoSparkKryoRegistrator"
    }
}


Starting Spark application


ID,YARN Application ID,Kind,State,Spark UI,Driver log,Current session?
25,application_1592922764313_0026,pyspark,idle,Link,Link,✔


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

SparkSession available as 'spark'.


ID,YARN Application ID,Kind,State,Spark UI,Driver log,Current session?
19,application_1592922764313_0020,pyspark,idle,Link,Link,
22,application_1592922764313_0023,pyspark,idle,Link,Link,
25,application_1592922764313_0026,pyspark,idle,Link,Link,✔


In [40]:
from pyspark.sql import SparkSession

from geospark.utils.adapter import Adapter

from geospark.register import GeoSparkRegistrator
from geospark.utils.adapter import Adapter
from geospark.core.spatialOperator import KNNQuery
from geospark.core.formatMapper import GeoJsonReader

from shapely.geometry import Point


from geospark.register import GeoSparkRegistrator
from geospark.core.enums import GridType
from geospark.core.spatialOperator import JoinQuery

from pyspark.sql.functions import explode


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [41]:
GeoSparkRegistrator.registerAll(spark)


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

True

### Read input files

In [42]:
osmParquetDF = spark.read.parquet("s3a://ecb-hackathon-202006-dbn-shared/data/openstreetmap/")


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

OSM data is huge and most of it is not relevant for our challenge. <br>
We explode the tags so that we can quickly filter based on useful Keys and Values and reduce the size of our operations. <br>
At the same time, we transform the geometry so that we can later use a spatial join to aggregate our data at NUTS levels.

In [43]:
osmParquetDF.createOrReplaceTempView("osmParquetDF")

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [44]:
# = osmParquetDF.select("id", explode("map_tags"), "geometry", "geometry_3857").where("size(map_tags)>0")

explode_osm_tags_df = spark.sql(
    """
        SELECT id, explode(map_tags),geometry,geometry_3857 
        FROM osmParquetDF
        WHERE size(map_tags)>0
    """
)
explode_osm_tags_df.createOrReplaceTempView("explodeOSMTagsDF")

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [45]:
#explode_osm_tags_df.select("id", "key", "value", ST_GeomFromWKT("geometry").alias("geom"),"geometry", "geometry_3857").where("KEY in ('amenity', 'leisure', 'shop', 'tourism')")


tags = spark.sql(
    """
        SELECT id, key, value,  ST_GeomFromWKT(geometry) as geom, geometry, geometry_3857
        FROM explodeOSMTagsDF
        WHERE KEY in ('amenity', 'leisure', 'shop', 'tourism')
    """
)
tags.createOrReplaceTempView("tags_df")

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Now, a very useful cell: (too late to remove it, embrace the uselessness)

In [46]:
ip = spark.sql("SELECT * FROM tags_df")
ip.createOrReplaceTempView("ipDF")

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Little sanity check that I ran from time to time to be sure that I am using the correct data, and have an idea of the size of the whole thing.

In [11]:
ip.distinct().count()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

10096310

### Define Interest Points
Based on OSM.wiki, we can see what are the most common groups (KEYS) and which are the values (Interest Points) tied to them. <br>
Out of all, we keep those that we feel are somehow directly related to cash spending. <br>
This includes tags <b>shop</b>, <b>amenity</b>, <b>leisure</b> and <b>tourism</b><br>
However, we can still filer down to the most important values. This allows us to filter out a lot of rows (for example, amenities such as parkings, fountains, etc.) that we deem would not directly generate a cash demand. In particular, we decided to keep only interest points that generate cash demand because they involve payments (restaurants, bars, hotels, etc.) and leaving out points that might generate cash demand because they create aggregations of people (social centers, churches, etc.). <br>
Finally, atms and banks are separated as they might instead be used to define cash accessibility instead.

In [65]:
amenities_tags = ['bar', 'bbq', 'biergarten', 'cafe', 'fast_food', 'food_court', 'pub', 'restaurant', 'brothel', 'casino', 'cinema', 'gambling', 'nightclub', 'stripclub', 'theatre', 'swingerclub', 'planetarium']
leisure_tags = ['adult_gaming_centre', 'amusement_arcade', 'bowling_alley', 'dance', 'miniature_golf', 'stadium', 'swimming_pool', 'water_park']

tourism_tags = ['aquarium', 'camp_site', 'gallery', 'hostel', 'hotel', 'motel', 'museum', 'theme_park', 'zoo']

finance = ['atm', 'bank']

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [66]:
amenities = ip.filter((ip['key']=='amenity')&(ip['value'].isin(amenities_tags)))
leisures = ip.filter((ip['key']=='leisure')&(ip['value'].isin(leisure_tags)))
tourism = ip.filter((ip['key']=='tourism')&(ip['value'].isin(tourism_tags)))
shops = ip.filter((ip['key']=='shop'))
financedf = ip.filter((ip['key']=='amenity')&(ip['value'].isin(finance)))

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [67]:
amenities.createOrReplaceTempView("amenitiesDF")
leisures.createOrReplaceTempView("leisuresDF")
tourism.createOrReplaceTempView("tourismDF")
shops.createOrReplaceTempView("shopsDF")
financedf.createOrReplaceTempView("financeDF")

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [68]:
total = leisures.union(tourism).union(shops).union(amenities)
total.createOrReplaceTempView("totalDF")

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

### Prepare NUTS data 
We use NUTS2, as it's the one we are going to use in the modeling, as NUTS3 has a lot of missing data in SUCH. <br>
20M granulariy gives us some misclassified points, but it's not a major difference.

In [70]:
projection="4326"
level="_LEVL_2"
eurostatfile="NUTS2016-20M/NUTS_RG_20M_2016_"+projection+level
nutssourcefile="s3a://ecb-hackathon-202006-dbn-shared/data/eurostat/nuts/"+eurostatfile+".geojsonlines"

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [71]:
allowTopologyInvalidGeometries = True
skipSyntaxInvalidGeometries = True 

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [72]:
nutsRegionsSpatialRDD = GeoJsonReader.readToGeometryRDD(spark.sparkContext,nutssourcefile ,allowTopologyInvalidGeometries,skipSyntaxInvalidGeometries)
nutsRegionsSpatialRDD.analyze()
nutsoriginal_df = Adapter.toDf(nutsRegionsSpatialRDD, spark)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [33]:
nutsoriginal_df.printSchema()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

root
 |-- geometry: string (nullable = true)
 |-- id: string (nullable = true)
 |-- COAST_TYPE: string (nullable = true)
 |-- MOUNT_TYPE: string (nullable = true)
 |-- CNTR_CODE: string (nullable = true)
 |-- FID: string (nullable = true)
 |-- NUTS_ID: string (nullable = true)
 |-- NUTS_NAME: string (nullable = true)
 |-- LEVL_CODE: string (nullable = true)
 |-- URBN_TYPE: string (nullable = true)

In [73]:
nutsoriginal_df.createOrReplaceTempView("nutsOriginalDF")


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [74]:
# convert the geometry
nuts_df = spark.sql(
    """
        SELECT ST_GeomFromWKT(geometry) as geom,NUTS_ID,LEVL_CODE,NUTS_NAME 
        FROM nutsOriginalDF
    """
)        
nuts_df.createOrReplaceTempView("nutsDF")

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

### Join Interest Points and Regions

In [75]:
spatialjoin_df = spark.sql(
    """
        SELECT totalDF.id,totalDF.key,totalDF.value,nutsDF.NUTS_ID,nutsdf.LEVL_CODE,nutsdf.NUTS_NAME,totalDF.geometry
        FROM nutsDF,totalDF
        WHERE ST_Intersects(totalDF.geom,nutsDF.geom)
    """
)
spatialjoin_df.createOrReplaceTempView("spatialJoinDF")

spatialjoin_finance = spark.sql(
    """
        SELECT financeDF.id,financeDF.key,totalDF.value,nutsDF.NUTS_ID,nutsdf.LEVL_CODE,nutsdf.NUTS_NAME,totalDF.geometry
        FROM nutsDF,financeDF
        WHERE ST_Intersects(financeDF.geom,nutsDF.geom)
    """
)
spatialjoin_df.createOrReplaceTempView("spatialJoinFinanceDF")


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [76]:
final = spark.sql(
    """SELECT COUNT(DISTINCT ID) AS COUNT, KEY, VALUE, NUTS_ID, NUTS_NAME
        FROM spatialJoinDF
        GROUP BY NUTS_ID, NUTS_NAME, KEY, VALUE
    """
)
final.createOrReplaceTempView("finalDF")

final_finance = spark.sql(
    """SELECT COUNT(DISTINCT ID) AS COUNT, KEY, VALUE, NUTS_ID, NUTS_NAME
        FROM spatialJoinFinanceDF
        GROUP BY NUTS_ID, NUTS_NAME, KEY, VALUE
    """
)
final.createOrReplaceTempView("finalFinanceDF")

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [77]:
#PD_FINAL_Finance = final_finance.toPandas()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [80]:
#PD_FINAL = final.toPandas()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

array(['atm', 'bank'], dtype=object)

### Execute computations and save

In [83]:
final.write.format("parquet").save("s3a://ecb-hackathon-202006-dbn-group1/data/OSM/InterestPointsFinal.parquet")

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [None]:
final_finance.write.format("parquet").save("s3a://ecb-hackathon-202006-dbn-group1/data/OSM/InterestPointsFinalFinance.parquet")