# Etude de cas : analyse des fichiers de logs des cyclistes

Objectif: A partir des fichiers contenu dans le dossier ./data/Cyclistes, calculer la durée de chacun des trajets effectués par chaque cycliste.

In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.types import *
from pyspark.sql import functions as F
from pyspark.sql.types import *
from pyspark.sql.functions import *
from pyspark.sql.window import Window
from datetime import datetime

## 1)  Charger la donnée
Créez une seesion Spark et chargez les données Cyclistes.

In [2]:
spark = SparkSession.builder.getOrCreate()

print("Session SPARK créé")

Session SPARK créé


In [3]:
path = "./data/Cyclistes/*.csv" 
cyclistes = spark.read.format("csv").option("header", "true").load(path, inferSchema=True)
cyclistes.printSchema()

root
 |-- id: integer (nullable = true)
 |-- timestamp: string (nullable = true)
 |-- sur_velo: boolean (nullable = true)
 |-- velo: string (nullable = true)
 |-- vitesse: double (nullable = true)
 |-- position: string (nullable = true)
 |-- destination_finale: string (nullable = true)



## 2) Vérifier le nombre de cyclistes

Comptez le nombre d'id uniques.

In [4]:
nb_id = cyclistes.select("id").count()
nb_id_unique = cyclistes.select("id").distinct().count()

In [5]:
print("Nombre total d'id de cycliste : " + str(nb_id))
print("Nombre total d'id unique : " + str(nb_id_unique))

Nombre total d'id de cycliste : 2232000
Nombre total d'id unique : 50


## 3) Transformer la colonne timestamp

Lorsqu'on vérifie le type de donnée de la colonne timestamp, on voit qu'on a une chaîne de caractères. Pour calculer une durée on voudrait transformer en date exploitable en tant que telle.
A l'aide d'une fonction udf, créez une nouvelle colonne date qui contiendra le résultat de la transformation des chaînes de caractères de la colonne timestamp en véritables timestamps.

In [6]:
from pyspark.sql.types import *
from pyspark.sql.functions import udf

@udf(returnType = TimestampType())
def validDate(colonne):
    return datetime.strptime(colonne, '%Y-%m-%d %H:%M:%S')

In [7]:
cyclistes = cyclistes.withColumn("Date", validDate(cyclistes['timestamp']))
cyclistes.show()

+---+-------------------+--------+-----+--------------------+-------------------+------------------+-------------------+
| id|          timestamp|sur_velo| velo|             vitesse|           position|destination_finale|               Date|
+---+-------------------+--------+-----+--------------------+-------------------+------------------+-------------------+
| 12|2018-01-01 00:01:00|   false|False|0.030000000000000006|(lon:2.07 lat:1.24)|             False|2018-01-01 00:01:00|
| 12|2018-01-01 00:02:00|   false|False|0.030000000000000006|(lon:2.07 lat:1.24)|             False|2018-01-01 00:02:00|
| 12|2018-01-01 00:03:00|   false|False|0.030000000000000006|(lon:2.07 lat:1.24)|             False|2018-01-01 00:03:00|
| 12|2018-01-01 00:04:00|   false|False|0.030000000000000006|(lon:2.07 lat:1.24)|             False|2018-01-01 00:04:00|
| 12|2018-01-01 00:05:00|   false|False|0.030000000000000006|(lon:2.07 lat:1.24)|             False|2018-01-01 00:05:00|
| 12|2018-01-01 00:06:00|   fals

In [8]:
cyclistes.printSchema()

root
 |-- id: integer (nullable = true)
 |-- timestamp: string (nullable = true)
 |-- sur_velo: boolean (nullable = true)
 |-- velo: string (nullable = true)
 |-- vitesse: double (nullable = true)
 |-- position: string (nullable = true)
 |-- destination_finale: string (nullable = true)
 |-- Date: timestamp (nullable = true)



## 4) Durée des trajets par id.

1) Trouvez les dates min/max par état de sur_velo, puis par id ET par état de sur_velo.

In [9]:
cyclistes.groupBy("sur_velo").agg(F.max("Date").alias("Max_Date"), F.min("Date").alias("Min_Date")).show()

+--------+-------------------+-------------------+
|sur_velo|           Max_Date|           Min_Date|
+--------+-------------------+-------------------+
|    true|2018-01-31 21:32:00|2018-01-01 01:47:00|
|   false|2018-02-01 00:00:00|2018-01-01 00:01:00|
+--------+-------------------+-------------------+



In [10]:
cyclistes.groupBy("id","sur_velo").agg(F.max("Date").alias("Max_Date"), F.min("Date").alias("Min_Date")).show()

+---+--------+-------------------+-------------------+
| id|sur_velo|           Max_Date|           Min_Date|
+---+--------+-------------------+-------------------+
| 41|   false|2018-02-01 00:00:00|2018-01-01 00:01:00|
| 29|   false|2018-02-01 00:00:00|2018-01-01 00:01:00|
| 36|    true|2018-01-31 21:32:00|2018-01-01 08:47:00|
| 15|    true|2018-01-31 20:31:00|2018-01-01 10:19:00|
| 41|    true|2018-01-31 20:35:00|2018-01-01 08:38:00|
| 12|    true|2018-01-31 16:49:00|2018-01-01 08:57:00|
| 12|   false|2018-02-01 00:00:00|2018-01-01 00:01:00|
| 29|    true|2018-01-30 22:29:00|2018-01-01 05:24:00|
| 43|    true|2018-01-30 18:53:00|2018-01-01 07:44:00|
| 36|   false|2018-02-01 00:00:00|2018-01-01 00:01:00|
| 43|   false|2018-02-01 00:00:00|2018-01-01 00:01:00|
| 15|   false|2018-02-01 00:00:00|2018-01-01 00:01:00|
| 42|   false|2018-02-01 00:00:00|2018-01-01 00:01:00|
| 23|   false|2018-02-01 00:00:00|2018-01-01 00:01:00|
| 24|    true|2018-01-31 19:16:00|2018-01-01 08:57:00|
| 10|    t

2) Le résultat n'est pas trés pertinent, il faudrait plutôt le début et la fin de chaque trajet par id. Pour cela, il faudrait détecter les changements d'état "sur_vélo".

Créez une fonction python (voir fonction udf) qui permet de detecter ces changements d'état.
Utilisez la classe Window() et la fonction F.lag() avec votre fonction udf pour créer une nouvelle colonne que vous appellerez changement, contenant un 0 si l'état précedent de sur_velo est le même et un 1 si l'état vient de changer pour chaque id.

In [11]:
from pyspark.sql.types import *
from pyspark.sql.functions import udf

@udf(returnType = IntegerType())
def findChg(last,current):
    if(last != None and last != current):
        return 1
    elif (last == None):
        return 1
    else :
        return 0

In [12]:
cyclistes = cyclistes.withColumn("Changement", findChg(F.lag(cyclistes["sur_velo"]).over(Window.partitionBy("id").orderBy("id","Date")), F.lag(cyclistes["sur_velo"],0).over(Window.partitionBy("id").orderBy("id","Date"))))
cyclistes.show()

+---+-------------------+--------+-----+-----------------+-------------------+------------------+-------------------+----------+
| id|          timestamp|sur_velo| velo|          vitesse|           position|destination_finale|               Date|Changement|
+---+-------------------+--------+-----+-----------------+-------------------+------------------+-------------------+----------+
| 26|2018-01-01 00:01:00|   false|False|0.592331729166367|(lon:3.84 lat:0.82)|             False|2018-01-01 00:01:00|         1|
| 26|2018-01-01 00:02:00|   false|False|0.592331729166367|(lon:3.84 lat:0.82)|             False|2018-01-01 00:02:00|         0|
| 26|2018-01-01 00:03:00|   false|False|0.592331729166367|(lon:3.84 lat:0.82)|             False|2018-01-01 00:03:00|         0|
| 26|2018-01-01 00:04:00|   false|False|0.592331729166367|(lon:3.84 lat:0.82)|             False|2018-01-01 00:04:00|         0|
| 26|2018-01-01 00:05:00|   false|False|0.592331729166367|(lon:3.84 lat:0.82)|             False|

3) Grâce à cette nouvelle colonne changement, trouvez un moyen qui permettra de numeroter les trajets pour chaque id et stockez les résulats dans une nouvelle colonne appelée numero_de_trajet.

In [13]:
from pyspark.sql.types import *
from pyspark.sql.functions import udf

cpt = 0

@udf(returnType = IntegerType())
def idTravelChg(last_id, current_id,sur_velo,chg):
    global cpt
    
    if(last_id == None):
        cpt = 0
    elif(int(last_id) != int(current_id)):
        cpt = 0
    
    if(chg == 1 and sur_velo == True):
        cpt = cpt + 1
        
    return cpt

In [14]:
windowSpec = Window.orderBy("id","Date").partitionBy("id")

cyclistes = cyclistes.withColumn("numero_de_trajet", idTravelChg(\
F.lag(col("id")).over(windowSpec),\
F.lag(col("id"),0).over(windowSpec),\
F.lag(cyclistes["sur_velo"],0).over(windowSpec),\
F.lag(cyclistes["Changement"],0).over(windowSpec),\
))

cyclistes.show()

----------------------------------------
Exception occurred during processing of request from ('127.0.0.1', 56814)
Traceback (most recent call last):
  File "/opt/conda/lib/python3.9/socketserver.py", line 316, in _handle_request_noblock
    self.process_request(request, client_address)
  File "/opt/conda/lib/python3.9/socketserver.py", line 347, in process_request
    self.finish_request(request, client_address)
  File "/opt/conda/lib/python3.9/socketserver.py", line 360, in finish_request
    self.RequestHandlerClass(request, client_address, self)
  File "/opt/conda/lib/python3.9/socketserver.py", line 747, in __init__
    self.handle()
  File "/usr/local/spark/python/pyspark/accumulators.py", line 262, in handle
    poll(accum_updates)
  File "/usr/local/spark/python/pyspark/accumulators.py", line 235, in poll
    if func():
  File "/usr/local/spark/python/pyspark/accumulators.py", line 239, in accum_updates
    num_updates = read_int(self.rfile)
  File "/usr/local/spark/python/pysp

Py4JError: An error occurred while calling o143.showString

In [None]:
cyclistes.filter((cyclistes.numero_de_trajet >= 1) & (cyclistes.Changement == 1)).orderBy("id","numero_de_trajet").show(200)

4) Il suffit maintenant de repêter la première étape, c'est a dire récupérer la début et la fin de chaque trajet pour chaque id, puis calculer la durée des trajets. 

(Pensez à récuperer les "vrai trajet au préalable (avec un état sur_vélo = 1).

In [16]:
trajet = cyclistes.filter(cyclistes.sur_velo == 1)
#trajet = trajet.withColumn("duree", (F.max(cyclistes.Date) - F.min(cyclistes.Date)) )
duree = trajet.groupBy("id","numero_de_trajet").agg(F.max(cyclistes.Date),F.min(cyclistes.Date))
duree = duree.withColumn("duree", (F.unix_timestamp(duree["max(Date)"]) -  F.unix_timestamp(duree["min(Date)"]))/60).orderBy("id","numero_de_trajet")

ValueError: Unable to parse datatype from schema. [Errno 111] Connection refused

In [None]:
duree.show()

## 5) Data visualisation

Convertissez votre dataframe pyspark en dataframe pandas.

In [15]:
df = duree.toPandas()

NameError: name 'duree' is not defined

1) A l'aide des librairies matplotlib et/ou seaborn, réalisez un graphique en barre montrant le temps total passé à vélo par chaque cycliste.

In [None]:
import seaborn as sns
import pandas as pd
import matplotlib.pyplot as plt
%matplotlib inline
pd.set_option('display.max_rows', 500)

In [None]:
df.head(20)

In [None]:
fig_dims = (30, 10)
fig, ax = plt.subplots(figsize=fig_dims)

sns.barplot(x = 'id', y = 'duree',ax=ax, data = df.groupby(["id"], as_index=False).sum(["duree"]))

ax.set(xlabel='Cycliste', ylabel='Durée (min)')

plt.show()



2) Réalisez un graphique en barre qui affiche le temps de chaque trajet d'un cycliste. Faites en sorte qu'on puisse choisir un id et afficher les trajets de cet id.

In [None]:
def onCylisteDureeTrajet(_id: int):
    fig_dims = (30, 10)
    fig, ax = plt.subplots(figsize=fig_dims)

    sns.barplot(x = 'numero_de_trajet', y = 'duree', ax=ax, data = df.loc[df['id'] == _id])

    ax.set(xlabel='Numéro de trajet', ylabel='Durée (min)')

    plt.show()

In [None]:
onCylisteDureeTrajet(8)

## 6) Sauvegarde

Sauvegardez votre dataset trajets au format csv dans le dossier data.

In [None]:
df.to_csv(r"data/TD_spark_result.csv",index=False)