# Etude de cas : analyse des fichiers de logs des cyclistes

Objectif: A partir des fichiers contenu dans le dossier ./data/Cyclistes, calculer la durée de chacun des trajets effectués par chaque cycliste.

In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.types import *
from pyspark.sql import functions as F
from pyspark.sql.types import *
from pyspark.sql.functions import udf
from pyspark.sql.window import Window

## 1)  Charger la donnée
Créez une seesion Spark et chargez les données Cyclistes.

In [2]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()

In [3]:
path = "./data/Cyclistes"
df = spark.read.load(path, header=True,format="csv")
df.dtypes

[('id', 'string'),
 ('timestamp', 'string'),
 ('sur_velo', 'string'),
 ('velo', 'string'),
 ('vitesse', 'string'),
 ('position', 'string'),
 ('destination_finale', 'string')]

## 2) Vérifier le nombre de cyclistes

Comptez le nombre d'id uniques.

In [4]:
nombre_ids_uniques = df.select("id").distinct().count()
print(f"Nombre d'ID uniques : {nombre_ids_uniques}")

Nombre d'ID uniques : 50


In [5]:
df.head(3)

[Row(id='12', timestamp='2018-01-01 00:01:00', sur_velo='False', velo='False', vitesse='0.030000000000000006', position='(lon:2.07 lat:1.24)', destination_finale='False'),
 Row(id='12', timestamp='2018-01-01 00:02:00', sur_velo='False', velo='False', vitesse='0.030000000000000006', position='(lon:2.07 lat:1.24)', destination_finale='False'),
 Row(id='12', timestamp='2018-01-01 00:03:00', sur_velo='False', velo='False', vitesse='0.030000000000000006', position='(lon:2.07 lat:1.24)', destination_finale='False')]

## 3) Transformer la colonne timestamp

Lorsqu'on vérifie le type de donnée de la colonne timestamp, on voit qu'on a une chaîne de caractères. Pour calculer une durée on voudrait transformer en date exploitable en tant que telle.
A l'aide d'une fonction udf, créez une nouvelle colonne date qui contiendra le résultat de la transformation des chaînes de caractères de la colonne timestamp en véritables timestamps.

In [6]:
from pyspark.sql.functions import udf
from pyspark.sql.types import TimestampType
import datetime

@udf(returnType=TimestampType())
def convertir_en_timestamp(valeur):
    if valeur:
        return datetime.datetime.strptime(valeur, "%Y-%m-%d %H:%M:%S") 
    return None

In [7]:
df = df.withColumn("date", convertir_en_timestamp(df["timestamp"]))
df.select("timestamp", "date").show()

+-------------------+-------------------+
|          timestamp|               date|
+-------------------+-------------------+
|2018-01-01 00:01:00|2018-01-01 00:01:00|
|2018-01-01 00:02:00|2018-01-01 00:02:00|
|2018-01-01 00:03:00|2018-01-01 00:03:00|
|2018-01-01 00:04:00|2018-01-01 00:04:00|
|2018-01-01 00:05:00|2018-01-01 00:05:00|
|2018-01-01 00:06:00|2018-01-01 00:06:00|
|2018-01-01 00:07:00|2018-01-01 00:07:00|
|2018-01-01 00:08:00|2018-01-01 00:08:00|
|2018-01-01 00:09:00|2018-01-01 00:09:00|
|2018-01-01 00:10:00|2018-01-01 00:10:00|
|2018-01-01 00:11:00|2018-01-01 00:11:00|
|2018-01-01 00:12:00|2018-01-01 00:12:00|
|2018-01-01 00:13:00|2018-01-01 00:13:00|
|2018-01-01 00:14:00|2018-01-01 00:14:00|
|2018-01-01 00:15:00|2018-01-01 00:15:00|
|2018-01-01 00:16:00|2018-01-01 00:16:00|
|2018-01-01 00:17:00|2018-01-01 00:17:00|
|2018-01-01 00:18:00|2018-01-01 00:18:00|
|2018-01-01 00:19:00|2018-01-01 00:19:00|
|2018-01-01 00:20:00|2018-01-01 00:20:00|
+-------------------+-------------

## 4) Durée des trajets par id.

1) Trouvez les dates min/max par état de sur_velo, puis par id ET par état de sur_velo.

In [8]:
dates_par_sur_velo = df.groupBy("sur_velo").agg(
    {'date' : 'min'}
)
dates_par_sur_velo.show()

dates_par_sur_velo = df.groupBy("sur_velo").agg(
    {'date' : 'max'}
)
dates_par_sur_velo.show()

+--------+-------------------+
|sur_velo|          min(date)|
+--------+-------------------+
|   False|2018-01-01 00:01:00|
|    True|2018-01-01 01:47:00|
+--------+-------------------+

+--------+-------------------+
|sur_velo|          max(date)|
+--------+-------------------+
|   False|2018-02-01 00:00:00|
|    True|2018-01-31 21:32:00|
+--------+-------------------+



In [9]:
dates_par_sur_velo = df.groupBy("id","sur_velo").agg(
    {'date' : 'min'}
)
dates_par_sur_velo.show()

dates_par_sur_velo = df.groupBy("id","sur_velo").agg(
    {'date' : 'max'}
)
dates_par_sur_velo.show()

+---+--------+-------------------+
| id|sur_velo|          min(date)|
+---+--------+-------------------+
| 12|   False|2018-01-01 00:01:00|
| 29|   False|2018-01-01 00:01:00|
| 43|    True|2018-01-01 07:44:00|
| 36|    True|2018-01-01 08:47:00|
| 41|    True|2018-01-01 08:38:00|
| 43|   False|2018-01-01 00:01:00|
| 36|   False|2018-01-01 00:01:00|
| 41|   False|2018-01-01 00:01:00|
| 29|    True|2018-01-01 05:24:00|
| 15|   False|2018-01-01 00:01:00|
| 12|    True|2018-01-01 08:57:00|
| 15|    True|2018-01-01 10:19:00|
| 14|   False|2018-01-01 00:01:00|
| 42|   False|2018-01-01 00:01:00|
| 23|   False|2018-01-01 00:01:00|
| 47|    True|2018-01-01 04:09:00|
| 47|   False|2018-01-01 00:01:00|
| 10|    True|2018-01-01 08:35:00|
| 14|    True|2018-01-01 07:48:00|
| 42|    True|2018-01-01 05:57:00|
+---+--------+-------------------+
only showing top 20 rows

+---+--------+-------------------+
| id|sur_velo|          max(date)|
+---+--------+-------------------+
| 12|   False|2018-02-01 00:0

2) Le résultat n'est pas trés pertinent, il faudrait plutôt le début et la fin de chaque trajet par id. Pour cela, il faudrait détecter les changements d'état "sur_vélo".

Créez une fonction python (voir fonction udf) qui permet de detecter ces changements d'état.
Utilisez la classe Window() et la fonction F.lag() avec votre fonction udf pour créer une nouvelle colonne que vous appellerez changement, contenant un 0 si l'état précedent de sur_velo est le même et un 1 si l'état vient de changer pour chaque id.

In [10]:
from pyspark.sql.window import Window
from pyspark.sql.functions import lag, col, when, lit

In [11]:
# window_spec = Window.partitionBy("id").orderBy("timestamp")
# etat_precedent = lag("sur_velo", 1).over(window_spec)
# df = df.withColumn(
#     "changement",
#     when(col("sur_velo") != etat_precedent, 1).otherwise(0)
# )
# df.select("id", "timestamp", "sur_velo", "changement").show(500)

#solution prof
# df.select("id", "timestamp", "sur_velo", "changement").sort("id", "timestamp").filter((df.sur_velo == True)).show(10)

# sol 2
changement_a_true = df.filter((col("changement") == 1) & (col("sur_velo") == True))
changement_a_true.select("id", "timestamp", "sur_velo", "changement").show()

AnalysisException: [UNRESOLVED_COLUMN.WITH_SUGGESTION] A column or function parameter with name `changement` cannot be resolved. Did you mean one of the following? [`date`, `position`, `velo`, `vitesse`, `id`].;
'Filter (('changement = 1) AND (cast(sur_velo#19 as boolean) = true))
+- Project [id#17, timestamp#18, sur_velo#19, velo#20, vitesse#21, position#22, destination_finale#23, convertir_en_timestamp(timestamp#18)#47 AS date#48]
   +- Relation [id#17,timestamp#18,sur_velo#19,velo#20,vitesse#21,position#22,destination_finale#23] csv


3) Grâce à cette nouvelle colonne changement, trouvez un moyen qui permettra de numeroter les trajets pour chaque id et stockez les résulats dans une nouvelle colonne appelée numero_de_trajet.

In [None]:

from pyspark.sql.functions import sum, col
window_spec = Window.partitionBy("id").orderBy("timestamp")
df = df.withColumn(
    "numero_de_trajet",
    sum(when(col("changement") == 1, 1).otherwise(0)).over(window_spec)
)
df.select("id", "timestamp", "sur_velo", "changement", "numero_de_trajet").show(550)


4) Il suffit maintenant de repêter la première étape, c'est a dire récupérer la début et la fin de chaque trajet pour chaque id, puis calculer la durée des trajets. 

(Pensez à récuperer les "vrai trajet au préalable (avec un état sur_vélo = 1).

In [12]:
# Filtrer uniquement les lignes où sur_velo est True
vrais_trajets = df.filter(col("sur_velo") == True)


## 5) Data visualisation

Convertissez votre dataframe pyspark en dataframe pandas.

1) A l'aide des librairies matplotlib et/ou seaborn, réalisez un graphique en barre montrant le temps total passé à vélo par chaque cycliste.

In [None]:
import seaborn as sns
%matplotlib inline

2) Réalisez un graphique en barre qui affiche le temps de chaque trajet d'un cycliste. Faites en sorte qu'on puisse choisir un id et afficher les trajets de cet id.

## 6) Sauvegarde

Sauvegardez votre dataset trajets au format csv dans le dossier data.