In [1]:
from IPython.display import display, HTML

display(HTML(data="""
<style>
    div#notebook-container    { width: 95%; }
    div#menubar-container     { width: 95%; }
    div#maintoolbar-container { width: 99%; }
</style>
"""))

# LOPRO2TS : LOst PROperties Tracking Service in Train Stations

## Problem specification

A new startup is willing to create an app to allow users to track lost properties in different trains/stations in France, by providing them a AI-empowered step-by-step guidance based on property's characteristics, train station characteristics, etc. Therefore, their first object is to do a market research.<br>
To acquire a deep understanding of this mysterious unexplored phenomenon 👽 of lost properties in trains and/or stations, they decided to recruit the famous Data Engineer who helped the **Sparkify** startup to become N°1 in the world : Me 🧐!!! <br>
My mission this time is to create an ETL, _uh uhm_, **ELT** pipeline to create a more analytics-friendly version of the dataset they intend to use.

They decided to use some open datasets provided by the main French transportation service : [SNCF Group](https://www.sncf.com/en), and also temperature data provided by www.weathernews.fr.


## Datasets description

### List of French train stations  
This dataset is a json file with around 4K records. It lists french train stations and some of their characteristics. The dataset is updated on a yearly basis and can be downloaded [here](https://ressources.data.sncf.com/explore/dataset/referentiel-gares-voyageurs/download/?format=json&timezone=Europe/Berlin&lang=fr).<br>
Example of a record in the json file: 
```json
{'datasetid': 'referentiel-gares-voyageurs',
 'fields': {'adresse_cp': '93140',
            'alias_libelle_noncontraint': 'Remise à Jorelle',
            'code': '00002-1',
            'commune_code': '010',
            'commune_libellemin': 'Bondy',
            'departement_libellemin': 'Seine-Saint-Denis',
            'departement_numero': '93',
            'gare': '{"DRG_ON": true, "Etrangere_ON": false, "NbPltf": 1, '
                    '"Alias_Libelle_NonContraint": "Remise à Jorelle", '
                    '"Alias_Libelle_Fronton": "Remise à Jorelle", '
                    '"AgenceGC_Libelle": "Direction Générale des Gares '
                    'Île-de-France", "RegionSNCF_Libelle": "REGION DE '
                    'PARIS-EST", "UG_Libelle": null, "UT_Libelle": "BONDY GARE '
                    'REMISE A JORELLE TRAM TRAIN"}',
            'gare_agencegc_libelle': 'Direction Générale des Gares '
                                     'Île-de-France',
            'gare_alias_libelle_fronton': 'Remise à Jorelle',
            'gare_alias_libelle_noncontraint': 'Remise à Jorelle',
            'gare_drg_on': 'True',
            'gare_etrangere_on': 'False',
            'gare_nbpltf': 1,
            'gare_regionsncf_libelle': 'REGION DE PARIS-EST',
            'gare_ut_libelle': 'BONDY GARE REMISE A JORELLE TRAM TRAIN',
            'latitude_entreeprincipale_wgs84': '48.893170',
            'longitude_entreeprincipale_wgs84': '2.487751',
            'rg_libelle': 'Gare La Remise à Jorelle',
            'segmentdrg_libelle': 'b',
            'tvs': 'RJL',
            'tvss': '[{"TVS_Code": "RJL"}]',
            'uic_code': '0087988709',
            'wgs_84': [48.89317, 2.487751]},
 'geometry': {'coordinates': [2.487751, 48.89317], 'type': 'Point'},
 'record_timestamp': '2020-12-29T00:00:51.658+01:00',
 'recordid': 'fbaead07f41e47e2c3cc424e43e92972f898b740'}
```
The most interesting field for us is `fields` with the following sub-fields :
* **uic_code** : train station identification code
* **alias_libelle_noncontraint** : train station name
* **latitude_entreeprincipale_wgs84** and **longitude_entreeprincipale_wgs84** : latitude and longitude resp. of the station main entrance (based on  World Geodetic System 1984)
* **commune_code** and **commune_libellemin** : code and name of city where station is located
* **departement_numero** and **departement_libellemin** : code and name of the county where the station is located
* **adresse_cp** : postal code
* **gare_nbpltf** : number of platforms in station
* **segmentdrg_libelle** : station category encode as `a`, `b` or `c`, depend on the traffic coverage (nationnal/international, regional, local) and yearly number of passengers


### Lost properties declaration dataset 

This dataset is a csv with around 1.14M records. It contains information on loss declarations made by customers through various media (website, app, in person to SNCF staff, ...).  The dataset is updated on a daily basis and can be found [here](https://ressources.data.sncf.com/explore/dataset/objets-trouves-gares/download/?format=csv&timezone=Europe/Berlin&lang=fr&use_labels_for_header=true&csv_separator=%3B).<br>
This dataset is updated on a daily basis.<br>
Following are the list of the dataset's columns :
* **Date** : date of loss declaration by customer
* **Gare** : Name of the train station where the loss was declared
* **Code UIC** : train station identification code
* **Type d'objets** : category of the property (e.g. category can be `electronics` , `clothes`, ...)
* **Nature d'objets** : a more fine-grained category of the property (e.g. `mobile phone`, `scarf`, ...)

* **Type d'enregistrement** : recording type. Currently it has only one value : `Déclaration de Perte` (declaration of loss)


### Found properties dataset 

This dataset is a csv with around 700K records. It contains information on lost properties that have been declared by customers and found in trains/stations by staff. The data tells if the property was recovered by its owner. The dataset is updated on a daily basis and can be found [here](https://ressources.data.sncf.com/explore/dataset/objets-trouves-restitution/download/?format=csv&timezone=Europe/Berlin&lang=fr&use_labels_for_header=true&csv_separator=%3B).<br>
Following are the list of the dataset's columns :
* **Date** : it is unclear whether this is loss declaration date, or date when the property was found by the staff. Therefore we'll consider it as loss declaration date
* **Date et heure de restitution** : if property was recovered by its owner, this is the date of recovery
* **Gare** : Name of the train station where the property was found
* **Code UIC** : train station identification code
* **Type d'objets** : category of the property (e.g. category can be `electronics` , `clothes`, ...)
* **Nature d'objets** : a more fine-grained category of the property (e.g. `mobile phone`, `scarf`, ...)
* **Type d'enregistrement** : recording type. Currently it has only one value : `Objet trouvé` (Found object)


### Daily temperatures dataset 

70% of economic activities are sensitive to temperature data. Also cognitive abilities are shown to be impacted by temperature ([source](https://www.cambridgebrainsciences.com/more/articles/as-temperature-goes-up-cognitive-performance-goes-down#:~:text=A%20hot%20environment%20has%20been,impaired%20more%20than%20simple%20tasks.)). For example uncomfortable heat can diminish cognitive abilities. Thus it seem interesting to include temperature data in our pipeline to hep refine future analysis.

This dataset is a csv with around 100K records. It contains information on daily temperatures for each county in France since 2018. The dataset is updated on a monthly basis and can be found [here](https://opendata.reseaux-energies.fr/explore/dataset/temperature-quotidienne-departementale/download/?format=csv&timezone=Europe/Berlin&lang=fr&use_labels_for_header=true&csv_separator=%3B).<br>
Following are the list of the dataset's columns :
* **Date** : temperature measurement date
* **Code INSEE département** : county code
* **Département** : county name
* **TMin (°C)** : daily minimum temperature in degre celsius
* **TMax (°C)** : daily maximum temperature in degre celsius
* **TMoy (°C)** : daily average temperature in degre celsius

In [56]:
df_temp.head()

Unnamed: 0,Date,Code INSEE département,Département,TMin (°C),TMax (°C),TMoy (°C)
0,2019-12-31,87,Haute-Vienne,2.3,10.0,6.15
1,2019-12-31,19,Corrèze,-2.7,10.9,4.1
2,2019-12-31,83,Var,4.77,14.08,9.43
3,2019-12-31,27,Eure,-3.1,5.5,1.2
4,2019-12-31,23,Creuse,-4.5,13.2,4.35


## Exploratory Data Analysis

In this section we'll do a preliminary data exploration to have an idea of the various datasets content.<br>
Let's first import some packages

In [2]:
import pandas as pd
import numpy as np
import configparser
from datetime import datetime
import time
from dateutil.tz import tzoffset
import os
import sys
import json
import pprint

pd.set_option("display.max_columns", None)

# from pyspark.sql import SparkSession
# from pyspark.sql import functions as F
# from pyspark.sql.types import IntegerType, FloatType, TimestampType, DateType
# spark = SparkSession \
#         .builder \
#         .config("spark.jars.package", "JohnSnowLabs:spark-nlp:1.8.2") \
#         .appName("sparkify") \
#         .getOrCreate()

### 1. List of train stations

In [26]:
%%time
# Load data
with open("input_data/referentiel-gares-voyageurs.json") as f:
    train_stations = json.load(f)
list_colums = ["uic_code", "alias_libelle_noncontraint", "latitude_entreeprincipale_wgs84", "longitude_entreeprincipale_wgs84",
               "commune_code", "commune_libellemin", "departement_numero", "departement_libellemin", "adresse_cp", "gare_nbpltf", "segmentdrg_libelle"]
df_stations = pd.DataFrame([item["fields"] for item in train_stations])[list_colums]

CPU times: user 60.6 ms, sys: 6.95 ms, total: 67.6 ms
Wall time: 66.6 ms


In [27]:
df_stations.head()

Unnamed: 0,uic_code,alias_libelle_noncontraint,latitude_entreeprincipale_wgs84,longitude_entreeprincipale_wgs84,commune_code,commune_libellemin,departement_numero,departement_libellemin,adresse_cp,gare_nbpltf,segmentdrg_libelle
0,87988709,Remise à Jorelle,48.89317,2.487751,10,Bondy,93,Seine-Saint-Denis,93140,1,b
1,87784892,Bena Fanes,42.4580782,1.9167264,66,Enveitg,66,Pyrénées-Orientales,66760,1,c
2,87784769,Fontpédrouse,42.5138062,2.1886585,80,Fontpédrouse,66,Pyrénées-Orientales,66360,1,c
3,87784736,Nyer,42.5416979,2.2630177,123,Nyer,66,Pyrénées-Orientales,66360,1,c
4,87784686,Villefranche - Vernet-les-Bains,42.591998,2.370396,223,Villefranche-de-Conflent,66,Pyrénées-Orientales,66820,1,b


Number of train stations

In [5]:
print(f"We have : {len(df_stations)} train stations in total")

We have : 2867 train stations in total


Check for missing values

In [6]:
df_stations.isnull().sum()

uic_code                            0
alias_libelle_noncontraint          0
latitude_entreeprincipale_wgs84     4
longitude_entreeprincipale_wgs84    4
commune_code                        0
commune_libellemin                  0
departement_numero                  0
departement_libellemin              0
adresse_cp                          0
gare_nbpltf                         0
segmentdrg_libelle                  0
dtype: int64

Perfect duplicates

In [7]:
df_stations.duplicated().sum()

0

Duplicates based on code station

In [8]:
df_stations.duplicated(["uic_code"]).sum()

0

### 2. Lost properties declaration

In [81]:
%%time
# load data
df_lost = pd.read_csv("input_data/objets-trouves-gares.csv", sep=";", dtype={"Code UIC": str})
# convert date string to datetime
_map_date = {dt: datetime.strptime(dt[:19], "%Y-%m-%dT%H:%M:%S") for dt in df_lost.Date.unique() if pd.notnull(dt)}
df_lost["Date"] = df_lost.Date.map(_map_date)

CPU times: user 13.8 s, sys: 112 ms, total: 13.9 s
Wall time: 14.8 s


In [82]:
df_lost.head()

Unnamed: 0,Date,Gare,Code UIC,Nature d'objets,Type d'objets,Type d'enregistrement
0,2019-02-13 23:08:31,,,Autres divers,Divers,Déclaration de Perte
1,2019-02-13 23:22:13,,,Livre scolaire,"Livres, articles de papéterie",Déclaration de Perte
2,2019-02-13 23:24:16,,,Livre de poche,"Livres, articles de papéterie",Déclaration de Perte
3,2019-02-14 07:07:30,,,Sac à dos,"Bagagerie: sacs, valises, cartables",Déclaration de Perte
4,2019-02-14 07:54:09,,,"Ordinateur, ordinateur portable, notebook","Appareils électroniques, informatiques, appare...",Déclaration de Perte


In [83]:
print(f"We have : {len(df_lost):,} loss declarations in total")

We have : 1,148,442 loss declarations in total


In [84]:
print(f"The earliest record in the dataset is on {df_lost.Date.min().date()}, and the latest record on {df_lost.Date.max().date()}")

The earliest record in the dataset is on 2013-05-24, and the latest record on 2020-12-23


Perfect duplicates

In [85]:
df_lost.duplicated().sum()

0

Let's see if we can uniquely identify a loss declaration using `Date`, `station code`, `object nature` and `object type` :

In [86]:
df_lost.duplicated(["Date", "Code UIC", "Nature d'objets"]).sum()

0

> Even though we don't have dupicates using these columns in the current dataset, there is no guarantee that in the future two different declarations can't be done at the same for the same object, in the same station --> we need to use another identifier like auto-increment integer when building data model

Check for missng values

In [87]:
df_lost.isnull().sum() 

Date                          0
Gare                     786193
Code UIC                 786193
Nature d'objets               0
Type d'objets                 0
Type d'enregistrement         0
dtype: int64

> Seems there are a lot of missing train stations. A potential explanation is that a property that is lost in a train during a trip will not be linked to any station.

### 3. Found properties

In [88]:
%%time
# load data
df_found = pd.read_csv("input_data/objets-trouves-restitution.csv", sep=";", dtype={"Code UIC": str})
# convert date string to datetime
_map_date = {dt: datetime.strptime(dt[:19], "%Y-%m-%dT%H:%M:%S") for dt in set(df_found.Date).union(set(df_found["Date et heure de restitution"])) if pd.notnull(dt)}
df_found["Date"] = df_found.Date.map(_map_date)
df_found["Date et heure de restitution"] = df_found["Date et heure de restitution"].map(_map_date)

CPU times: user 12.1 s, sys: 84 ms, total: 12.2 s
Wall time: 12.2 s


In [89]:
df_found.head()

Unnamed: 0,Date,Date et heure de restitution,Gare,Code UIC,Nature d'objets,Type d'objets,Type d'enregistrement
0,2018-01-23 14:33:43,NaT,Bourges,87576207,Autres divers (préciser),Divers,Objet trouvé
1,2014-03-10 09:07:15,NaT,Toulouse Matabiau,87611004,"Sac de voyage, sac de sport, sac à bandoulière","Bagagerie: sacs, valises, cartables",Objet trouvé
2,2014-02-06 08:45:32,NaT,Toulouse Matabiau,87611004,Sac à dos,"Bagagerie: sacs, valises, cartables",Objet trouvé
3,2014-03-10 09:14:38,NaT,Paris Est,87113001,"Manteau, veste, blazer, parka, blouson, cape","Vêtements, chaussures",Objet trouvé
4,2014-02-06 09:01:24,2014-02-07 20:55:39,Paris Est,87113001,"Valise, sac sur roulettes","Bagagerie: sacs, valises, cartables",Objet trouvé


In [90]:
print(f"We have : {len(df_found):,} declared and found properties in total")

We have : 705,747 declared and found properties in total


In [91]:
print(f"The earliest record in the dataset is on {df_found.Date.min().date()}, and the latest record on {df_found.Date.max().date()}")

The earliest record in the dataset is on 2013-05-24, and the latest record on 2020-12-23


In [92]:
print(f"The earliest lost property recovered in the dataset is on {df_found['Date et heure de restitution'].min().date()}, and the latest on {df_found['Date et heure de restitution'].max().date()}")

The earliest lost property recovered in the dataset is on 2013-05-24, and the latest on 2020-12-23


Perfect duplicates

In [93]:
df_found.duplicated().sum()

0

Let's see if we can uniquely identify a loss declaration using `Date`, `station code`, `object nature` and `object type` :

In [94]:
df_found.duplicated(["Date", "Code UIC", "Nature d'objets", "Type d'objets"]).sum()

58253

> Using these columns doesnt' allow us to uniquely identify a record in the dataset --> we'll need to create our own id, such as an auto-increment integer

Check for missng values

In [95]:
df_found.isnull().sum() 

Date                                 0
Date et heure de restitution    493471
Gare                               299
Code UIC                           299
Nature d'objets                      0
Type d'objets                        0
Type d'enregistrement                0
dtype: int64

> Seems there are some missing train stations. A potential explanation is that a property that is lost in a train during a trip will not be linked to any station.

> There are also a lot of missing recovery dates, corresponding most likely to properties that haven't been recovered yet

### 3. Daily temperatures

In [59]:
%%time
# load data
df_temperature = pd.read_csv("input_data/temperature-quotidienne-departementale.csv", sep=";")
# convert date string to datetime
_map_date = {dt: datetime.strptime(dt, "%Y-%m-%d") for dt in df_temperature.Date.unique()}
df_temperature["Date"] = df_temperature.Date.map(_map_date)

CPU times: user 84.9 ms, sys: 3.87 ms, total: 88.7 ms
Wall time: 208 ms


In [60]:
df_temperature.head()

Unnamed: 0,Date,Code INSEE département,Département,TMin (°C),TMax (°C),TMoy (°C)
0,2019-12-31,87,Haute-Vienne,2.3,10.0,6.15
1,2019-12-31,19,Corrèze,-2.7,10.9,4.1
2,2019-12-31,83,Var,4.77,14.08,9.43
3,2019-12-31,27,Eure,-3.1,5.5,1.2
4,2019-12-31,23,Creuse,-4.5,13.2,4.35


In [62]:
df_temperature.Date[0]

Timestamp('2019-12-31 00:00:00')

In [61]:
print(f"We have : {len(df_temperature):,} temperature records in total")

We have : 102,240 temperature records in total


In [64]:
print(f"The earliest record in the dataset is on {df_temperature.Date.min().date()}, and the latest record on {df_temperature.Date.max().date()}")

The earliest record in the dataset is on 2018-01-01, and the latest record on 2020-11-30


Perfect duplicates

In [65]:
df_temperature.duplicated().sum()

0

Let's see if we can uniquely identify a loss declaration using `Date`, `county code`

In [66]:
df_temperature.duplicated(["Date", "Code INSEE département"]).sum()

0

> Using these columns allows us to uniquely identify a record in the temperature dataset

Check for missng values

In [67]:
df_temperature.isnull().sum() 

Date                      0
Code INSEE département    0
Département               0
TMin (°C)                 0
TMax (°C)                 0
TMoy (°C)                 0
dtype: int64

> No missing Values in temperatures dataset

## Data model

The ideal situation would be to join the two datasets, `Lost properties declaration dataset` and `Found properties dataset`, so that we can see which object was lost when and where, whether it was found, and if/when it was recovered by its owner. and this will be used as our fact table.<br>Unfortunately there's currently no id that allows us to identify a record in either dataset. We could've used a collection of columns (`Date`, `station code`, `object nature` and `object type`), but there are duplicates in the `Found properties dataset` using these fields.

Finally we decided to create two fact tables in our datalake and two dimension tables.

The two dimension tables are :
* time : contains date and time related information. The column names of this table are self-explanatory (see the star schema below)
* stations : contains information related to train stations. The column names of this table are self-explanatory (see the star schema below). We'll add a default station to take into account declarations that are not related to any station. This default station will have `NOT_A_STATION` value for all non-numeric columns and `-999` for numeric columns. The `station_category` column encodes the type of station using three levels :
    * "a" : more than 250,000 yearly passengers
    * "b" : more than 100,000 yearly passengers
    * "c" : other stations

The two fact tables are :
* declared_loss : contains information on all loss declaration made by customers through website, app, or in-person. The columns are :
    * declaration_id : auto-incremented integer to identify each customer declaration
    * date_and_time : date and time of the declaration
    * station_id : station identification code
    * property_type : a coarse categorisation of loss properties as explained earlier in the introduction. e.g. `Vêtements, chaussures` (clothes, shoes), `Appareils électroniques, informatiques, appareils photo` (electronic devices), ...
    * property_nature : a more fine-grained categorisation of loss properties as explained earlier in the introduction. e.g. `Manteau, veste, blazer, parka, blouson, cape` (Coat, jacket), `Téléphone portable` (smartphone), ...
    * min_temperature : the minimum temperature recorded this day in the county where station is located
    * max_temperature : the maximum temperature recorded this day in the county where station is located
    * avg_temperature : the average temperature recorded this day in the county where station is located
* declared_and_found_properties : contains information on all lost and found properties declared by customers through app. The columns are :
    * found_id : auto-incremented integer to identify each found property
    * date_and_time : date and time of the declaration
    * station_id : station identification code
    * property_type : same as in the previous table
    * property_nature : same as in previous table
    * recovery_date : for properties that have been returned to their owners, this is the recovery date and time
    * delay_before_recovery : for properties that have been returned to their owners, this is the delay in hours between the loss declaration and recovery date
    * min_temperature : the minimum temperature recorded this day in the county where station is located
    * max_temperature : the maximum temperature recorded this day in the county where station is located
    * avg_temperature : the average temperature recorded this day in the county where station is located
    
![model](assests/data_model.png)

### Explain data model objective !!!!!!!!!

In [53]:
import pandas as pd
import numpy as np
import configparser
from datetime import datetime
import time
from dateutil.tz import tzoffset
import os
import sys
import json
import pprint
from functools import reduce

pd.set_option("display.max_columns", None)

import pyspark
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.types import IntegerType, StringType, FloatType, TimestampType, DateType, StructType


In [2]:
spark = SparkSession \
        .builder \
        .appName("sparkify") \
        .getOrCreate()
#         .config("spark.jars.package", "JohnSnowLabs:spark-nlp:2.6.5") \

In [9]:
schema_lost = StructType()\
    .add("Date", TimestampType(), False)\
    .add("Gare", StringType(), True)\
    .add("Code UIC", StringType(), True)\
    .add("Nature d'objets", StringType(), False)\
    .add("Type d'objets", StringType(), False)\
    .add("Type d'enregistrement", StringType(), False)
schema_found = StructType()\
    .add("Date", TimestampType(), False)\
    .add("Date et heure de restitution", TimestampType(), True)\
    .add("Gare", StringType(), True)\
    .add("Code UIC", StringType(), True)\
    .add("Nature d'objets", StringType(), False)\
    .add("Type d'objets", StringType(), False)\
    .add("Type d'enregistrement", StringType(), False)

columns_mapping = dict(zip(
    ["Date", "Date et heure de restitution", "Gare", "Code UIC", "Nature d'objets", "Type d'objets", "Type d'enregistrement"],
    ["date_and_time", "recovery_date", "station", "station_id", "property_nature", "property_type", "recording_type"]
))

In [10]:
df_lost = spark.read.csv("input_data/objets-trouves-gares.csv", sep=";", header=True, schema=schema_lost)
df_lost = df_lost\
    .select([F.col(c).alias(columns_mapping.get(c, c)) for c in df_lost.columns])

df_found = spark.read.csv("input_data/objets-trouves-restitution.csv", sep=";", header=True, schema=schema_found)
df_found = df_found\
    .select([F.col(c).alias(columns_mapping.get(c, c)) for c in df_found.columns])

In [11]:
list_expr = ["fields.uic_code as station_id", "fields.alias_libelle_noncontraint as station_name", "cast(fields.latitude_entreeprincipale_wgs84 as float) as station_latitude",
             "cast(fields.longitude_entreeprincipale_wgs84 as float) as station_longitude", "fields.commune_code as station_city_code", "fields.commune_libellemin as station_city_name",
            "fields.departement_numero as station_county_code", "fields.departement_libellemin as station_county_name", 
             "fields.adresse_cp as station_postal_code", "cast(fields.gare_nbpltf as int) as station_number_of_platforms", "fields.segmentdrg_libelle as station_category"]
df_stations = spark.read.json("input_data/referentiel-gares-voyageurs.json").selectExpr(*list_expr)

In [12]:
schema_temp = StructType()\
    .add("Date", DateType(), True)\
    .add("Code INSEE département", StringType(), True)\
    .add("Département", StringType(), True)\
    .add("TMin (°C)", FloatType(), True)\
    .add("TMax (°C)", FloatType(), True)\
    .add("TMoy (°C)", FloatType(), True)

columns_mapping = dict(zip(
    ["Date", "Code INSEE département", "Département", "TMin (°C)", "TMax (°C)", "TMoy (°C)"],
    ["date", "county_code", "county_name", "min_temperature", "max_temperature", "avg_temperature"]
))
df_temperature = spark.read.csv("input_data/temperature-quotidienne-departementale.csv", sep=";", header=True, schema=schema_temp)
df_temperature = df_temperature.select([F.col(c).alias(columns_mapping.get(c, c)) for c in df_temperature.columns])

In [13]:
df_lost.count(), df_found.count(), df_stations.count(), df_temperature.count()

(1148442, 705747, 2867, 102240)

In [14]:
df_lost.printSchema()

root
 |-- date_and_time: timestamp (nullable = true)
 |-- station: string (nullable = true)
 |-- station_id: string (nullable = true)
 |-- property_nature: string (nullable = true)
 |-- property_type: string (nullable = true)
 |-- recording_type: string (nullable = true)



In [15]:
df_found.printSchema()

root
 |-- date_and_time: timestamp (nullable = true)
 |-- recovery_date: timestamp (nullable = true)
 |-- station: string (nullable = true)
 |-- station_id: string (nullable = true)
 |-- property_nature: string (nullable = true)
 |-- property_type: string (nullable = true)
 |-- recording_type: string (nullable = true)



In [16]:
df_stations.printSchema()

root
 |-- station_id: string (nullable = true)
 |-- station_name: string (nullable = true)
 |-- station_latitude: float (nullable = true)
 |-- station_longitude: float (nullable = true)
 |-- station_city_code: string (nullable = true)
 |-- station_city_name: string (nullable = true)
 |-- station_county_code: string (nullable = true)
 |-- station_county_name: string (nullable = true)
 |-- station_postal_code: string (nullable = true)
 |-- station_number_of_platforms: integer (nullable = true)
 |-- station_category: string (nullable = true)



In [17]:
df_temperature.printSchema()

root
 |-- date: date (nullable = true)
 |-- county_code: string (nullable = true)
 |-- county_name: string (nullable = true)
 |-- min_temperature: float (nullable = true)
 |-- max_temperature: float (nullable = true)
 |-- avg_temperature: float (nullable = true)



In [18]:
df_lost.show()

+-------------------+-------------------+----------+--------------------+--------------------+--------------------+
|      date_and_time|            station|station_id|     property_nature|       property_type|      recording_type|
+-------------------+-------------------+----------+--------------------+--------------------+--------------------+
|2019-02-13 23:08:31|               null|      null|       Autres divers|              Divers|Déclaration de Perte|
|2019-02-13 23:22:13|               null|      null|      Livre scolaire|Livres, articles ...|Déclaration de Perte|
|2019-02-13 23:24:16|               null|      null|      Livre de poche|Livres, articles ...|Déclaration de Perte|
|2019-02-14 07:07:30|               null|      null|           Sac à dos|Bagagerie: sacs, ...|Déclaration de Perte|
|2019-02-14 07:54:09|               null|      null|Ordinateur, ordin...|Appareils électro...|Déclaration de Perte|
|2019-02-14 08:10:34|               null|      null|  Outils, Accesoires

In [19]:
df_found.show()

+-------------------+-------------------+--------------------+----------+--------------------+--------------------+--------------+
|      date_and_time|      recovery_date|             station|station_id|     property_nature|       property_type|recording_type|
+-------------------+-------------------+--------------------+----------+--------------------+--------------------+--------------+
|2018-01-23 14:33:43|               null|             Bourges|0087576207|Autres divers (pr...|              Divers|  Objet trouvé|
|2014-03-10 09:07:15|               null|   Toulouse Matabiau|0087611004|Sac de voyage, sa...|Bagagerie: sacs, ...|  Objet trouvé|
|2014-02-06 08:45:32|               null|   Toulouse Matabiau|0087611004|           Sac à dos|Bagagerie: sacs, ...|  Objet trouvé|
|2014-03-10 09:14:38|               null|           Paris Est|0087113001|Manteau, veste, b...|Vêtements, chauss...|  Objet trouvé|
|2014-02-06 09:01:24|2014-02-07 20:55:39|           Paris Est|0087113001|Valise, sa

In [22]:
df_stations.show()

+----------+--------------------+----------------+-----------------+-----------------+--------------------+-------------------+-------------------+-------------------+---------------------------+----------------+
|station_id|        station_name|station_latitude|station_longitude|station_city_code|   station_city_name|station_county_code|station_county_name|station_postal_code|station_number_of_platforms|station_category|
+----------+--------------------+----------------+-----------------+-----------------+--------------------+-------------------+-------------------+-------------------+---------------------------+----------------+
|0087988709|    Remise à Jorelle|        48.89317|         2.487751|              010|               Bondy|                 93|  Seine-Saint-Denis|              93140|                          1|               b|
|0087784892|          Bena Fanes|       42.458076|        1.9167264|              066|             Enveitg|                 66|Pyrénées-Orientales| 

In [23]:
df_temperature.show()

+----------+-----------+----------------+---------------+---------------+---------------+
|      date|county_code|     county_name|min_temperature|max_temperature|avg_temperature|
+----------+-----------+----------------+---------------+---------------+---------------+
|2019-12-31|         87|    Haute-Vienne|            2.3|           10.0|           6.15|
|2019-12-31|         19|         Corrèze|           -2.7|           10.9|            4.1|
|2019-12-31|         83|             Var|           4.77|          14.08|           9.43|
|2019-12-31|         27|            Eure|           -3.1|            5.5|            1.2|
|2019-12-31|         23|          Creuse|           -4.5|           13.2|           4.35|
|2019-12-31|         06| Alpes-Maritimes|           5.39|          13.33|           9.36|
|2019-12-31|         15|          Cantal|           -2.4|           10.0|            3.8|
|2019-12-31|         43|     Haute-Loire|           -3.4|           10.3|           3.45|
|2019-12-3

In [26]:
df_lost = df_lost.withColumn("date", F.to_date(F.col("date_and_time")))

In [27]:
df_lost.show()

+-------------------+-------------------+----------+--------------------+--------------------+--------------------+----------+
|      date_and_time|            station|station_id|     property_nature|       property_type|      recording_type|      date|
+-------------------+-------------------+----------+--------------------+--------------------+--------------------+----------+
|2019-02-13 23:08:31|               null|      null|       Autres divers|              Divers|Déclaration de Perte|2019-02-13|
|2019-02-13 23:22:13|               null|      null|      Livre scolaire|Livres, articles ...|Déclaration de Perte|2019-02-13|
|2019-02-13 23:24:16|               null|      null|      Livre de poche|Livres, articles ...|Déclaration de Perte|2019-02-13|
|2019-02-14 07:07:30|               null|      null|           Sac à dos|Bagagerie: sacs, ...|Déclaration de Perte|2019-02-14|
|2019-02-14 07:54:09|               null|      null|Ordinateur, ordin...|Appareils électro...|Déclaration de Pe

In [28]:
df_lost.printSchema()

root
 |-- date_and_time: timestamp (nullable = true)
 |-- station: string (nullable = true)
 |-- station_id: string (nullable = true)
 |-- property_nature: string (nullable = true)
 |-- property_type: string (nullable = true)
 |-- recording_type: string (nullable = true)
 |-- date: date (nullable = true)



In [37]:
tmp1 = [item.station_id for item in df_lost.select("station_id").distinct().collect() if item.station_id is not None]
len(tmp1), tmp1

(180,
 ['0087313510',
  '0087713040',
  '0087343004',
  '0087671008',
  '0087543017',
  '0087592006',
  '0087491001',
  '0087444711',
  '0087144014',
  '0087575001',
  '0087745000',
  '0087113001',
  '0087319012',
  '0087611004',
  '0087172007',
  '0087582668',
  '0087741728',
  '0087734053',
  '0087444265',
  '0087725689',
  '0087781104',
  '0087415018',
  '0087286542',
  '0087673202',
  '0087345520',
  '0087741793',
  '0087688887',
  '0087381897',
  '0087411173',
  '0087276691',
  '0087481002',
  '0087696005',
  '0087345025',
  '0087474007',
  '0087391003',
  '0087174003',
  '0087413385',
  '0087761007',
  '0087295600',
  '0087382655',
  '0087781005',
  '0087313874',
  '0087615286',
  '0087192039',
  '0087484006',
  '0087318964',
  '0087118000',
  '0087317362',
  '0087672006',
  '0087171009',
  '0087485300',
  '0087141150',
  '0087686006',
  '0087726000',
  '0087486019',
  '0087271494',
  '0087481705',
  '0087547000',
  '0087212027',
  '0087471003',
  '0087382861',
  '0087447003',
  

In [39]:
tmp2 = [item.station_id for item in df_found.select("station_id").distinct().collect() if item.station_id is not None]
len(tmp2), tmp2

(181,
 ['0087313510',
  '0087713040',
  '0087343004',
  '0087592006',
  '0087543017',
  '0087671008',
  '0087491001',
  '0087444711',
  '0087144014',
  '0087575001',
  '0087745000',
  '0087113001',
  '0087319012',
  '0087611004',
  '0087172007',
  '0087741728',
  '0087582668',
  '0087734053',
  '0087444265',
  '0087725689',
  '0087781104',
  '0087415018',
  '0087673202',
  '0087286542',
  '0087345520',
  '0087741793',
  '0087688887',
  '0087381897',
  '0087411173',
  '0087276691',
  '0087345025',
  '0087481002',
  '0087696005',
  '0087474007',
  '0087391003',
  '0087174003',
  '0087413385',
  '0087761007',
  '0087382655',
  '0087295600',
  '0087313874',
  '0087781005',
  '0087615286',
  '0087192039',
  '0087484006',
  '0087318964',
  '0087118000',
  '0087317362',
  '0087672006',
  '0087171009',
  '0087485300',
  '0087141150',
  '0087686006',
  '0087726000',
  '0087486019',
  '0087271494',
  '0087481705',
  '0087547000',
  '0087212027',
  '0087471003',
  '0087382861',
  '0087447003',
  

In [40]:
tmp3 = [item.station_id for item in df_stations.select("station_id").distinct().collect()]
len(tmp3), tmp3

(2867,
 ['0087615476',
  '0087342246',
  '0087723700',
  '0087722561',
  '0087491282',
  '0087214213',
  '0087773465',
  '0087313510',
  '0087182725',
  '0087444182',
  '0087141051',
  '0087342071',
  '0087271528',
  '0087582700',
  '0087484360',
  '0087394221',
  '0087645481',
  '0087393470',
  '0087142240',
  '0087718296',
  '0087592121',
  '0087696492',
  '0087723494',
  '0087545269',
  '0087345314',
  '0087343004',
  '0087185595',
  '0087142109',
  '0087751776',
  '0087713040',
  '0087545277',
  '0087576348',
  '0087491357',
  '0087313809',
  '0087296244',
  '0087214643',
  '0087116772',
  '0087784702',
  '0087753582',
  '0087734343',
  '0087718338',
  '0087543090',
  '0087286427',
  '0087115873',
  '0087775841',
  '0087761247',
  '0087671008',
  '0087491001',
  '0087297119',
  '0087672139',
  '0087592006',
  '0087595728',
  '0087313726',
  '0087726406',
  '0087594119',
  '0087444711',
  '0087313882',
  '0087191700',
  '0087543017',
  '0087393165',
  '0087212100',
  '0087734426',
 

In [42]:
len(set(tmp1).intersection(tmp3)) == len(tmp1)

True

In [43]:
len(set(tmp2).intersection(tmp3)) == len(tmp2)

True

In [45]:
print(df_lost.columns)

['date_and_time', 'station', 'station_id', 'property_nature', 'property_type', 'recording_type', 'date']


In [47]:
print(df_temperature.columns)

['date', 'county_code', 'county_name', 'min_temperature', 'max_temperature', 'avg_temperature']


In [50]:
diff_secs_col = F.col("recovery_date").cast("long") - F.col("date_and_time").cast("long")
df_found.withColumn("diff_date", diff_secs_col/3600.0).show()

+-------------------+-------------------+--------------------+----------+--------------------+--------------------+--------------+------------------+
|      date_and_time|      recovery_date|             station|station_id|     property_nature|       property_type|recording_type|         diff_date|
+-------------------+-------------------+--------------------+----------+--------------------+--------------------+--------------+------------------+
|2018-01-23 14:33:43|               null|             Bourges|0087576207|Autres divers (pr...|              Divers|  Objet trouvé|              null|
|2014-03-10 09:07:15|               null|   Toulouse Matabiau|0087611004|Sac de voyage, sa...|Bagagerie: sacs, ...|  Objet trouvé|              null|
|2014-02-06 08:45:32|               null|   Toulouse Matabiau|0087611004|           Sac à dos|Bagagerie: sacs, ...|  Objet trouvé|              null|
|2014-03-10 09:14:38|               null|           Paris Est|0087113001|Manteau, veste, b...|Vêteme

In [51]:
14*24

336

In [62]:
df_time = reduce(pyspark.sql.DataFrame.unionAll, [df_lost.select("date_and_time"), df_found.select("date_and_time"), df_found.select("recovery_date")]).distinct()

In [63]:
df_time.show()

+-------------------+
|      date_and_time|
+-------------------+
|2019-02-15 14:32:49|
|2019-02-18 15:29:55|
|2019-02-19 16:53:07|
|2019-02-25 12:40:32|
|2019-02-25 13:48:30|
|2019-02-26 10:06:24|
|2019-02-26 14:45:29|
|2019-02-27 18:46:09|
|2019-02-28 08:42:35|
|2019-03-01 14:35:49|
|2019-03-04 12:44:20|
|2019-03-27 17:22:13|
|2019-03-28 10:24:14|
|2019-03-28 13:38:11|
|2019-03-29 07:01:21|
|2019-03-30 21:14:55|
|2019-03-31 10:41:33|
|2019-08-11 12:02:24|
|2018-03-07 09:43:13|
|2019-08-12 13:19:55|
+-------------------+
only showing top 20 rows



In [64]:
df_time.printSchema()

root
 |-- date_and_time: timestamp (nullable = true)



In [65]:
df_time.count()

1954980