## Setting up the environnement

In [20]:
from pyspark.sql import SparkSession, SQLContext
from pyspark.sql.functions import col, explode, json_tuple, regexp_replace, udf
from pyspark.sql.functions import sum as col_sum
from pyspark.sql.types import BooleanType, StructType, StructField, StringType, IntegerType, FloatType

from math import radians, cos, sin, asin, sqrt
from typing import List

import re
import os

## Input parameters

In [None]:
# Radius to check for (in km)
radius = 3

# Position to check from. The location provided here is the location of the A. Chantrainestraat in Wilrijk.
latlng = [51.16388937021345, 4.392073389160737]

## Read the stop data

In [21]:
# Read in the file (as json) and convert to a single column
# The 'haltes' column gets renamed to 'stops'
stops = spark.read.json("data/stops.txt")
stops = stops.select((explode("haltes").alias("stops")))

# Map each entry in the dataframe to its own stop
stops = stops.select('stops').rdd.map(lambda x: x.stops).toDF()
# Drop unnecessary data
stops = stops.drop('links', 'gemeentenummer', 'entiteitnummer')

# Rename columns of dataframe to better name
stops = stops \
            .withColumnRenamed('haltenummer', 'stop_number') \
            .withColumnRenamed('omschrijving', 'desc') \
            .withColumnRenamed('geoCoordinaat', 'coord') \
            .withColumnRenamed('omschrijvingGemeente', 'village') \

# Show the first 20 entries of the dataframe
stops.show()

+--------------------+-----------+--------------------+---------+
|               coord|stop_number|                desc|  village|
+--------------------+-----------+--------------------+---------+
|[51.1638893702134...|     101000| A. Chantrainestraat|  Wilrijk|
|[51.2062496902375...|     101001|           Zurenborg|Antwerpen|
|[51.1660665941742...|     101002|Verenigde Natieslaan|  Hoboken|
|[51.1660216374063...|     101003|Verenigde Natieslaan|  Hoboken|
|[51.1740548394127...|     101004|     D. Baginierlaan|  Hoboken|
|[51.1630084393468...|     101005| A. Chantrainestraat|  Wilrijk|
|[51.1597748887066...|     101006|      Fotografielaan|  Wilrijk|
|[51.1599636330007...|     101007|      Fotografielaan|  Wilrijk|
|[51.1629556669243...|     101008|            Moerelei|  Wilrijk|
|[51.1634592883462...|     101009|            Moerelei|  Wilrijk|
|[51.1887431659368...|     101010|        J. De Voslei|Antwerpen|
|[51.1829725415369...|     101011|   Middelheim Vijver|Antwerpen|
|[51.16220

## Find centers of villages

In [26]:
data = sc.textFile("data/zipcodes.csv")
data = data.map(lambda x: re.sub(r' \(.*\)', '', x))
data = data.map(lambda x: re.sub(r"[^\S\n\t]+", ' ', x))
data = data.map(lambda x: x.split(';'))

schema = StructType([
    StructField('postal code', StringType(), False),
    StructField('village', StringType(), False),
    StructField('lat_center', StringType(), False),
    StructField('lon_center', StringType(), False),
    StructField('url', StringType(), False)
])


data = data.toDF(schema=schema).drop("url")
data = data.withColumn("lat_center", data['lat_center'].cast(FloatType()))
data = data.withColumn("lon_center", data['lon_center'].cast(FloatType()))

centers = data.where(col("lat_center").isNotNull() & col("lon_center").isNotNull())

centers.show()

+-----------+--------------------+----------+----------+
|postal code|             village|lat_center|lon_center|
+-----------+--------------------+----------+----------+
|       1000|             Brussel|  50.84275|   4.35155|
|       1000|           Bruxelles|  50.84275|   4.35155|
|       1005|Brusselse Hoofdst...| 50.844875| 4.3514333|
|       1005|Conseil Region Br...|  50.84786|  4.367408|
|       1008|Chambre des Repr�...| 50.846558|  4.364662|
|       1008|Kamer van Volksve...| 50.846558|  4.364662|
|       1009|   Senat de Belgique|  50.79834|   4.39565|
|       1010|Rijksadministrati...|  50.82433|  4.513954|
|       1012|Parlement de la C...| 50.846638| 4.3619924|
|       1020|             Brussel| 50.884216| 4.3580003|
|       1020|           Bruxelles| 50.884216| 4.3580003|
|       1020|              Laeken| 50.884216| 4.3580003|
|       1020|               Laken| 50.884216| 4.3580003|
|       1030|             Brussel|  50.86744|   4.37727|
|       1030|           Bruxell

## Haversine function
A function to calculate the distance in between two geocoordinates. <br>
Found at https://stackoverflow.com/questions/4913349/haversine-formula-in-python-bearing-and-distance-between-two-gps-points

In [23]:
def haversine(lon1, lat1, lon2, lat2):
    """
    Calculate the great circle distance between two points 
    on the earth (specified in decimal degrees)
    """
    # convert decimal degrees to radians 
    lon1, lat1, lon2, lat2 = map(radians, [lon1, lat1, lon2, lat2])

    # haversine formula 
    dlon = lon2 - lon1 
    dlat = lat2 - lat1 
    a = sin(dlat/2)**2 + cos(lat1) * cos(lat2) * sin(dlon/2)**2
    c = 2 * asin(sqrt(a)) 
    r = 6371 # Radius of earth in kilometers. Use 3956 for miles
    return c * r

## Create filter function and parameters

In [24]:
@udf(returnType=BooleanType())
def in_radius(lat1, lon1):    
    lat2 = latlng[0]
    lon2 = latlng[1]
    
    return haversine(lon1, lat1, lon2, lat2) <= radius

## Villages within the radius

In [27]:
cities_in_radius = centers.filter(in_radius('lat_center', 'lon_center')) \
                            .drop("postal code", "lat_center", "lon_center") \
                            .distinct().sort(col("village").asc())

cities_in_radius.show()

+---------+
|  village|
+---------+
|Antwerpen|
|  Wilrijk|
+---------+



## The amount of stops per town

In [28]:
per_town = stops.groupBy("village").count().select(col("village"), col("count").alias("# stops"))
per_town.show()

+-------------------+-------+
|            village|# stops|
+-------------------+-------+
|              Bever|     28|
|           Waanrode|     40|
|              Duras|      2|
|           Ettelgem|      8|
|          Zillebeke|     25|
|Sint-Job-in-'t-Goor|     24|
|               Lint|     24|
|          Merelbeke|     85|
|             Essene|     31|
|             Parike|     10|
|          Huizingen|     21|
| Glabbeek-Zuurbemde|     16|
|             Gorsem|      6|
|          Harelbeke|     81|
|                Mol|    140|
|        Hoogstraten|     38|
|       Oud-Turnhout|     47|
|           Hoevenen|     24|
|         Rupelmonde|      6|
|            Edingen|     12|
+-------------------+-------+
only showing top 20 rows



## The amount of citizens per town

In [29]:
citizens = sc.textFile("data/citizens.txt")
citizens.collect()

# Remove unnecessary headers
citizens = citizens.map(lambda x: re.sub(
    r'^KONINKRIJK.*|^BRUSSELS.*|^ARR.*|^ARRONDISSEMENT.*|^PROVINC.*|^VLAAMS.*|^REGION.*', '', x))

# Replace 'village / village-in-french' with 'village'
citizens = citizens.map(lambda x: re.sub(r'/ .* ', '', x))

# Remove everything in between parentheses
citizens = citizens.map(lambda x: re.sub(r'\(.*\)', '', x))

# Remove excess whitespaces
citizens = citizens.map(lambda x: re.sub(r"[^\S\n\t]+", ' ', x))

# Remove empty lines
citizens = citizens.filter(lambda x: x != '')

# Remove '.' from the numbers
citizens = citizens.map(lambda x: re.sub(r'\.', '', x))

# Split on space: gives a list of lists [[village, amount], [village, amount]...]
citizens = citizens.map(lambda x: x.rsplit(' ', 1))


for test in citizens.collect():
    if len(test) > 2:
        print(test)
        

# Create schema for dataframe
# For some reason, the citizens field needs to be StringType here, else .show() will not work.
s = StructType([
    StructField('vill', StringType(), False),
    StructField('citizens', StringType(), False)
])

# Create schema and cast the citizens column to IntegerType
citizens = citizens.toDF(schema=s)

# Cast to citizens column to integers
citizens = citizens.withColumn("citizens", citizens['citizens'].cast(IntegerType()))

    
citizens.show()
citizens.printSchema()

+--------------------+--------+
|                vill|citizens|
+--------------------+--------+
|          Anderlecht|  117724|
|             Brussel|  177112|
|              Elsene|   86336|
|           Etterbeek|   47410|
|               Evere|   41016|
|           Ganshoren|   24794|
|               Jette|   52144|
|          Koekelberg|   21765|
|            Oudergem|   33725|
|          Schaarbeek|  132097|
| Sint‐Agatha‐Berchem|   24831|
|         Sint‐Gillis|   49361|
| Sint‐Jans‐Molenbeek|   95455|
| Sint‐Joost‐ten‐Node|   26813|
|Sint‐Lambrechts‐W...|   56212|
| Sint‐Pieters‐Woluwe|   41513|
|               Ukkel|   82038|
|               Vorst|   55694|
| Watermaal‐Bosvoorde|   25001|
|          Aartselaar|   14298|
+--------------------+--------+
only showing top 20 rows

root
 |-- vill: string (nullable = false)
 |-- citizens: integer (nullable = true)



## The amount of stops per citizen of each town

In [40]:
data = per_town.join(citizens, citizens.vill == per_town.village)
data = data.drop('vill')

result = data.withColumn("Result", col("# stops")/col("citizens")).sort(col('Result').desc())
result = result.drop('# stops', 'citizens')

result.show()

+-------------+--------------------+
|      village|              Result|
+-------------+--------------------+
|    Herstappe| 0.06818181818181818|
|        Bever|0.012698412698412698|
|     Geetbets|0.010941644562334218|
|Nieuwerkerken|0.010205548368549663|
|        Alken| 0.01020408163265306|
|   Wachtebeke|0.008989056800416884|
|     Overpelt|0.008533747090768037|
|   Diepenbeek|0.008310249307479225|
|     Pepingen| 0.00800182898948331|
|   Zuienkerke|0.007686676427525...|
|       Wellen|0.007296311309282529|
|   Hoegaarden|0.006772334293948127|
|    Koekelare|0.006709882861366996|
|    Ruiselede|0.006685236768802228|
| Begijnendijk|0.006664677210782851|
|        Herne| 0.00662451068955134|
|         Bree|           0.0061875|
|        Balen|0.006109253065774805|
|      Berlaar|0.006083782374413...|
|        Melle|0.006051698798305...|
+-------------+--------------------+
only showing top 20 rows



## The amount of stops per citizen of each town within a certain radius

In [41]:
result = result.join(cities_in_radius, "village", "inner")
result = result.filter(col("Result").isNotNull())

result.show()

+---------+--------------------+
|  village|              Result|
+---------+--------------------+
|Antwerpen|0.001159714767673...|
+---------+--------------------+

