# Data Cleaning and Routing Calculation

Nootebook takes employee registry and offices registry already georeferenced by external proprietary software eGeCo in .csv format and return as output cleaned employee registry enriched with distance (in Km) and duration (in minutes) variables and a cleaned offices registry.

During the data cleaning process some transformations are performed to fix issue related data input format (decimal characters, column casting and renaming, case when transformations, ...) and adjustment to match offices codes links employee and offices registries. 

A parallellized job using __Pandas User Defined Function__ is implemented to obtain routing information between two points on earth. 

In [None]:
# import different packages
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, pandas_udf
import pyspark.sql.functions as F
from pyspark.sql.types import IntegerType, DoubleType, StructType, StringType, ArrayType, StructField

import pandas as pd
import numpy as np
import requests
import json

raw_path = 'path/to/raw_data'
harbour_path = 'path/to/curated_data'

spark = SparkSession.builder\
  .master("local")\
  .appName("application-name")\
  .getOrCreate()

In [None]:
# define udf
# calculate distances from osrm api
def get_distances(origin_lat, origin_lon, destination_lat, destination_lon, by='car'):
    try:
        r = requests.get(
            "http://router.project-osrm.org/route/v1/{}/{},{};{},{}"\
            .format(by, origin_lon, origin_lat, destination_lon, destination_lat)
            )
        response = json.loads(r.content)

        distance=float(response['routes'][0].get('distance')) # in meters
        duration=float(response['routes'][0].get('duration')) # in seconds
    except:
        distance=None
        duration=None

    return [distance, duration]

# register as pandas udf
@pandas_udf('distance long, duration long')
def get_distances_udf(origin_lat: pd.Series, origin_lon: pd.Series, destination_lat: pd.Series, destination_lon: pd.Series) -> pd.DataFrame:
    frame={
        'origin_lat': origin_lat, # y
        'origin_lon': origin_lon, # x
        'destination_lat': destination_lat, # y
        'destination_lon': destination_lon  # x
        }
  
    result = pd.DataFrame(frame).apply(
        lambda x: get_distances(x['origin_lat'], x['origin_lon'], x['destination_lat'], x['destination_lon']), 
        axis=1, result_type='expand'
        )
    return result


# Data Import, cleaning and transformation

In [None]:
# clean, transform and select employee data
raw_employee_registry = spark.read.csv(
        raw_path + 'employee_registry.csv', sep=';', inferSchema=False, header=True
        )\
    .filter(F.col('SESSO').isNotNull())\
    .withColumn('LON', F.regexp_replace('X', ',', '.').cast('double'))\
    .withColumn('LAT',  F.regexp_replace('Y', ',', '.').cast('double'))\
    .withColumn('ENTE', F.lit('RER'))\
    .select(
        F.concat_ws('-', 'ente', 'id_dipendente').alias('id_dipendente'), 
        'ente',  
        F.col('provincia_sigla').alias('provincia'), 
        'cap', 
        F.col('citta').alias('comune'), 
        'indirizzo', 
        'id_sede', 
        'lat', 'lon',
        F.when(F.col('SMART') == 'N', 0).otherwise(1).alias('flg_sw'),
        F.col('SESSO').alias('sesso'),
        F.col('ETA').alias('eta'),
        F.lit('In regione').alias('flg_regione'))\
    .distinct()

raw_employee_registry.show()

In [None]:
# import offices data already georeferenced
raw_offices_registry = spark.read.csv(
        raw_path + 'offices_registry.csv', sep=';', inferSchema=False, header=True
        )\
        .withColumn('LON', F.regexp_replace('X', ',', '.').cast('double'))\
        .withColumn('LAT',  F.regexp_replace('Y', ',', '.').cast('double'))\
        .withColumn('provincia_sigla', F.when(F.col('provincia') == 'BOLOGNA', 'BO')\
                .when(F.col('provincia') == 'FERRARA', 'FE')\
                .when(F.col('provincia') == 'RAVENNA', 'RA')\
                .when(F.col('provincia') == 'FORLI CESENA', 'FC')\
                .when(F.col('provincia') == 'REGGIO EMILIA', 'RE')\
                .when(F.col('provincia') == 'RIMINI', 'RN')\
                .when(F.col('provincia') == 'PARMA', 'PR')\
                .when(F.col('provincia') == 'PIACENZA', 'PC')\
                .when(F.col('provincia') == 'MODENA', 'MO')
        )\
        .select('id_sede', 
                F.lit('RER').alias('ente'), 
                F.col('provincia_sigla').alias('provincia'), 
                'cap', 
                F.col('citta').alias('comune'), 
                'indirizzo', 
                'lat', 'lon',
                F.lit(0).alias('cancellato'),
                F.lit('In regione').alias('flg_regione'))\
        .distinct()

raw_offices_registry.show()

# Routing (distance, duration)

In [None]:
# calculate distances
# join in order to obtain distances from offices
raw_employee_registry_geo = raw_employee_registry\
    .join(
        raw_offices_registry.select('id_sede', F.col('lat').alias('sede_lat'), F.col('lon').alias('sede_lon')), 
        on='id_sede')\
    .withColumn('Route', 
        get_distances_udf('lat', 'lon', 'sede_lat', 'sede_lon'))\
    .withColumn('distanza', F.col('Route.distance')/1000.0)\
    .withColumn('durata', F.col('Route.duration')/60.0)\
    .drop('Route', 'sede_lat', 'sede_lon')

In [None]:
raw_employee_registry_geo.printSchema()

# Persist data to Harbour path

In [None]:
# write data back
raw_employee_registry_geo.write.parquet(harbour_path + 'anag_dip_geo', mode='overwrite')

In [None]:
# write data back
raw_offices_registry.write.parquet(harbour_path + 'anag_sedi', mode='overwrite')