# 0) Chargement de spark

In [None]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()

# 1) Lire de la donnée

## 1.1) Lecture brute

Chargez le fichier ville_1.csv dans une variable nommée df.

Vous pouvez afficher votre donnée en utilisant la méthode take() ou la methode collect() de l'objet pyspark DataFrame (attention appeler collect() sur un dataframe est déconseillé si vous avez du vrai big data).

L'objet possède aussi un attribut appelé dtypes, appelez cet attribut pour obtenir la liste des colonnes et leur type.

In [None]:
path = "./data/Villes/ville_1.csv"
# lecture d'un fichier de manière la plus brute
df = spark.read.load(path, format="csv")
df.dtypes

In [None]:
type(df)

In [None]:
df.take(5)

## 1.2) Lecture avec les entêtes

Recharger le même fichier mais cette fois-ci utilisez l'option header pour rajouter les noms de colonnes à votre df.

Appelez l'attribut dtypes et comparez la sortie avec celle de la lecture brute.

In [None]:
# l'option 'header' permet de rajouter les noms des colonnes 
df = spark.read.format('csv').options(header=True).load(path)
df.dtypes

In [None]:
df.take(5)

## 1.3) Lecture avec les types détectés automatiquement

Recharger le fichier avec  l'option inferShema.

L'option 'inferSchema' permet de transformer les colonnes en types plus précis : entier  / booléens / chaines de caractères... bien sûr spark trouve les types uniquement si le fichier d'origine permet de les trouver de manière simple

In [None]:
# l'option 'inferSchema' permet de transformer les colonnes en 
# types plus précis : entier  / booléens / chaines de caractères...
# bien sûr spark trouve les types uniquement si le fichier d'origine
# permet de les trouver de manière simple
df = spark.read.format('csv').options(header=True, inferSchema=True).load(path)
df.dtypes

In [None]:
df.take(5)

## 1.4) L'attribut schema

Il vous permet d'afficher le schéma de votre df, avec pour chaque colonne son nom, son type, et si elle accepte les valeurs nulles ou non. 

In [None]:
df.schema

Vous avez aussi la méthode printSchema() qui permet d'afficher le shéma du df de manière plus lisible.

In [None]:
df.printSchema()

# 2) Ecriture de la dataframe sur le disque

Sauvegardez le df sous différents formats.

## 2.1) choix du format : csv

In [None]:
df.write.format("csv").save("./data/Villes/csv")

## 2.2) choix du format : parquet

In [None]:
df.write.format("parquet").save("./data/Villes/parquet")

## 2.3) choix du format : json

In [None]:
df.write.save("./data/Villes/ville", format="json")

## 2.4) Lecture de différents formats

Vous pouvez choisir de lire le df sous un format ou un autre en utilisant l'argument format dans la fonction spark.read.load

In [None]:
# le ! vous permet d'executer des commandes dans votre terminal depuis le notebook
!ls ./data/Villes/ville/

In [None]:
df_json = spark.read.load("./data/Villes/ville/", format="json")

In [None]:
df_parquet = spark.read.load("./data/Villes/parquet", format="parquet")

# 3) Calculer des résultats : les actions 

## 3.1) Nombre de lignes : count

Chargez les fichiers csv contenus dans le dossiers ./data/Cyclistes/ dans un df nommé cyclistes, puis comptez les lignes du dataframe obtenu.

In [None]:
cyclistes = spark.read.load("./data/Cyclistes/", format="csv", header=True, inferSchema="True")
cyclistes.count()

Afficher le schéma de ce nouveau df

In [None]:
cyclistes.printSchema()

Affichez 10 lignes du df.

In [None]:
cyclistes.take(10)

## 3.2) Moyenne : agg + colonne + mean

A l'aide de la méthode agg(), calculez la moyenne sur la colonne vitesse.

Vous pouvez récuperer le résultat avec la méthode collect().

In [None]:
cyclistes.agg({"vitesse" : "mean"}).show()

## 3.3) Quantile approximatifs pour gagner du temps de calcul

En statistiques et en théorie des probabilités, les quantiles sont les valeurs qui divisent un jeu de données en intervalles contenant le même nombre de données. Il y a donc un quantile de moins que le nombre de groupes créés. Ainsi les quartiles sont les trois quantiles qui divisent un ensemble de données en quatre groupes de taille égale.

La méthode approxQuantile permet de laisser une tolérance a l'erreur ce qui réduit le temps de calul sur d'énormes jeux de données.

In [None]:
import time

In [None]:
def calcul_quantile(df, erreur_acceptee):
    debut            = time.time()
    colonne          = "vitesse"
    quantiles_voulus = [0.25, 0.50, 0.75]
    resultat         =  df.approxQuantile(colonne, quantiles_voulus , erreur_acceptee )
    fin              = time.time()
    delais           = fin -debut
    print ("delais =%.2f sec, quantiles = %s"%(delais, resultat))

In [None]:
calcul_quantile(cyclistes, 0.05)

In [None]:
calcul_quantile(cyclistes, 0.01)

In [None]:
calcul_quantile(cyclistes, 0)

## Reload de la dataframe villes

Chargez le fichier villes dans un df nommé villes.

In [None]:
villes =spark.read.load("./data/Villes/", format="csv", header=True, inferSchema="True")
villes.printSchema()

In [None]:
villes.take(10)

## 3.4) corrélation

En probabilités et en statistique, la corrélation entre plusieurs variables aléatoires ou statistiques est une notion de liaison qui contredit leur indépendance.

Calculez la corrélation entre les colonnes age et vitesse_a_velo.

In [None]:
villes.corr("vitesse_a_velo", "sportivite")

## 3.5) covariance

La covariance entre deux variables aléatoires est un nombre permettant de quantifier leurs écarts conjoints par rapport à leurs espérances respectives. Elle s’utilise également pour deux séries de données numériques (écarts par rapport aux moyennes).
La covariance est une extension de la notion de variance. La corrélation est une forme normalisée de la covariance.

Calculez la covariance entre les colonnes age et vitesse_a_velo.

In [None]:
villes.cov("age", "vitesse_a_velo")

## 3.6) sample

La méthode sample() permet de tirer aléatoirement une fraction du dataframe.

In [None]:
villes.count()

In [None]:
villes_1_pct = villes.sample(False, 0.01)
villes_1_pct.count()

In [None]:
villes_1_pct.collect()

In [None]:
villes.exceptAll(villes_1_pct).count()

## 3.7) filter 

La méthode filter() permet le df selon certaines valeurs dans les colonnes.

Utilisez cette méthode pour récuperer seulement les lignes avec le sexe féminin.

In [None]:
villesF = villes.filter(villes["sexe"]=="F")

In [None]:
villesF.count()

In [None]:
villesF.take(10)

On peux aussi filtrer le df avec la méthode where(). Filtrez le df de la même façon que precedemment en utilisant cette méthode.

In [None]:
villes.where(villes.sexe=="F").count()

# 4) Transformer la données : les transformations!

## Transformations : demandent à être suivi par un collect ou une action (count par exemple)

## 4.1) Obtenir des statistiques sur les colonnes numériques

La méthode describe() permet de calculer les statistiques récapitulatives d'une ou plusieurs colonnes numériques dans un df. Si le nom des colonnes n'est pas spécifié, la méthode calculera des statistiques récapitulatives pour toutes les colonnes numériques présentes dans le df.

Afficher les statistiques de la colonne age.

In [None]:
villes.describe(["age"]).collect()

In [None]:
villes.describe(["age"]).show()

## 4.2) groupby

La méthode groupBy() suivie de la methode agg() permet de grouper le df selon les catgories d'une ou plusieurs colonnes pour faire des calculs sur ces catégories.

Calculez la moyenne de la colonnes sportivité selon le sexe des personnes.

In [None]:
villes.groupBy("sexe").agg({"sportivite" : "mean"}).collect()

Calculez la moyenne de la colonne age et la valeur max de la colonne sportivité par sexe.

In [None]:
villes.groupBy("sexe").agg({"sportivite" : "max", "age":"mean"}).collect()

Calculez la moyenne des colonnes vitesse_a_pied et vitesse_a_velo par sexe.

In [None]:
villes.groupBy("sexe").agg({"vitesse_a_pied" : "mean", "vitesse_a_velo":"mean"}).collect()

## 4.3) summary

La méthode summary() permet des faire des calculs statistiques de base sur toutes les colonnes du df.

Appliquez un count et un max sur toutes les colonnes du df et afficher les résultats.

In [None]:
villes.summary("count", "max").collect()

## 4.4) Union de dataframe

#### Ajouter les colonnes les unes à côté des autres : join

In [None]:
villes.join(villes, on="id").printSchema()

#### Ajouter les lignes les unes sous les autres : union

In [None]:
villes.unionByName(villes).count()

## 4.6) Concaténation de colonne : F.concat

In [None]:
from pyspark.sql.types import *
from pyspark.sql import functions as F

Nous allons ici reprendre le df cyclistes.

Utiliser les méthodes withColumn() et F.concat() pour ajouter une colonne au df qui contiendra la concatenation des valeurs des colonnes id et sur_velo. 

In [None]:
path = "./data/Cyclistes/*.csv" 
cyclistes = spark.read.format("csv").option("header", "true").load(path, inferSchema=True)
cyclistes.count()

In [None]:
cyclistes.take(10)

In [None]:
cyclistes.withColumn("id_sur_velo", F.concat(cyclistes.id, cyclistes.sur_velo)).first()

# 5) Fonctions udf 
Il est possible d'enregistrer des fonctions python que l'on écrit nous même pour les appliquer sur une colonne d'une dataframe, c'est ce qu'on appelle les udf, pour User Defined Functions.

Voici une fonction qui prend en argument une colonne et calcule le carré des valeurs de cette colonne.
Appliquez cette fonction sur la colonne salaire de votre df. Affichez le résultat.

In [None]:
from pyspark.sql.types import *
from pyspark.sql.functions import udf

@udf(returnType = FloatType())
def cube(colonne):
    return colonne*colonne

In [None]:
villes.select(cube("salaire")).collect()

# 6)	Etude de cas : analyse des fichiers de logs des cyclistes

In [None]:
from pyspark.sql.types import *
from pyspark.sql import functions as F
from pyspark.sql.types import *
from pyspark.sql.functions import udf

In [None]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()

## 6.1)  Charger la donnée

In [None]:
path = "./data/Cyclistes/*.csv" 
cyclistes = spark.read.format("csv").option("header", "true").load(path, inferSchema=True)
cyclistes.count()

In [None]:
cyclistes.printSchema()

In [None]:
cyclistes.take(10)

## 6.2) vérifier le nombre de cycles

Comptez le nombre d'id uniques.

In [None]:
ids = cyclistes.select("id").drop_duplicates()

In [None]:
ids.count()

## 6.3) transformer les timestamp en date

Voici une fonction qui permert de récuperer la date sous forme de chaîne de caractère dans la colonne timestamps pour la transformer en date exploitable en tant que telle.

Créez une nouvelle colonne dans votre df stockant le résultat de cette fonction.

In [None]:
from pyspark.sql.types import TimestampType

@udf(returnType = TimestampType())
def transform_timestamp_in_date(timestamp):
    from datetime import datetime
    return datetime.strptime(timestamp, "%Y-%m-%d %H:%M:%S")

In [None]:
cyclistes = cyclistes.withColumn("date", transform_timestamp_in_date("timestamp"))

In [None]:
cyclistes.take(5)

In [None]:
cyclistes.select("date").show(5)

In [None]:
cyclistes.printSchema()

## 6.4) Durée des trajets par id.

A partir d'ici, il s'agit de traiter votre donnée pour récupérer la durée de chaque trajet effectué par chaque id.

1) trouvez les dates min/max par état de sur_velo, puis par id ET par état de sur_velo.

In [None]:
_ = cyclistes.groupBy(["sur_velo"]).agg(F.min(cyclistes.date), F.max(cyclistes.date))
_.collect()

In [None]:
_ = cyclistes.sort(["id", "date"]).groupBy(["id", "sur_velo"])\
                      .agg(F.min(cyclistes.date), F.max(cyclistes.date)).sort("id")
_.take(5)

2) Le résultat n'est pas trés pertinent, il faudrait plutôt le début et la fin de chaque trajet par id. Pour cela, il faudrait détecter les changements d'états "sur_vélo".
Utilisez la classe Window() et la fonction F.lag() pour créer une nouvelle colonne que vous appellerez changement, contenant un 0 si l'état précedent de sur_velo est le même et un 1 si l'état vient de changer (fonction changement() ci-dessous) pour chaque id.

In [None]:
from pyspark.sql.functions import udf

In [None]:
@udf(returnType = IntegerType())
def changement(etat_actuel, etat_precedent):
    """
    Détecte si les deux états sont différent.
    
    Parametres :
        etat_actuel : valeur sur la ligne courante
                      renvoyée par F.lag (0)
        etat_precedent : valeur sur la ligne précédente
                      renvoyée par F.lag(1)
    Return: 0 s'ils sont égaux, 1 s'il y a une différence
    """
    if etat_precedent == None:
        return 0
    if etat_precedent == etat_actuel:
        return 0
    if etat_actuel != etat_precedent:
        return 1

In [None]:
from pyspark.sql.window import Window
w = Window.orderBy(["id", "date"])

In [None]:
cyclistes = cyclistes.withColumn("changement", changement( F.lag("sur_velo", 0).over(w), F.lag("sur_velo", 1).over(w)))

In [None]:
cyclistes.printSchema()

In [None]:
#cyclistes.select("id", "timestamp", "velo", "changement").sort("id", "timestamp").take(100)

3) Grâce à la fonction window appliquez la fonction somme() sur la colonne changement pour numeroter les trajets pour chaque id et stocker les résulats dans une nouvelle colonne appelée numero_de_trajet.

In [None]:
@udf(returnType = IntegerType())
def somme(indice_actuel, indice_precedent):
    if indice_precedent == None:
        return 0
    return indice_actuel + indice_precedent

In [None]:
windowval = Window.orderBy(["id", "date"])
windowval = windowval.partitionBy("id")
windowval = windowval.rangeBetween(Window.unboundedPreceding, 0)

In [None]:
cyclistes = cyclistes.withColumn("numero_de_trajet", F.sum("changement").over(windowval))

In [None]:
cyclistes.printSchema()

In [None]:
cyclistes.select("id", "timestamp", "changement", "numero_de_trajet").take(5)

4) Il suffit maintenant de repêter la première étape, c'est a dire récupérer la début et la fin de chaque trajet pour chaque id. Puis calculer la durée des trajets.

In [None]:
if False:
    debut_fin_trajets = cyclistes.groupBy(["id", "numero_de_trajet"])\
                                        .agg(   F.min(cyclistes.date) , 
                                                F.max(cyclistes.date) )


In [None]:
trajets = cyclistes
debut_fin_trajets = trajets.groupBy(["id", "numero_de_trajet"]).agg(F.min(cyclistes.date),F.max(cyclistes.date))
debut_fin_trajets.take(4)

In [None]:
trajets = trajets.groupBy(["id", "numero_de_trajet"])
trajets = trajets.agg(F.unix_timestamp(F.max(cyclistes.date)) - F.unix_timestamp(F.min(cyclistes.date)))
trajets = trajets.sort("id")

In [None]:
trajets.printSchema()

In [None]:
trajets = trajets.withColumnRenamed("(unix_timestamp(max(date), yyyy-MM-dd HH:mm:ss) - unix_timestamp(min(date), yyyy-MM-dd HH:mm:ss))", "duree")
trajets.take(4)


# 7) datavisualisation

Convertissez votre dataframe pyspark en dataframe pandas.

In [None]:
df_pandas = trajets.toPandas()
df_pandas.head()

In [None]:
import seaborn as sns
%matplotlib inline

A l'aide de la librairie seaborn, réalisez un graphique en barre montrant la durée de tout les trajets.

In [None]:
df_pandas.sort_values(["id", "numero_de_trajet"]).plot(kind="bar", x="numero_de_trajet", y="duree", title="Durée des trajets ")
sns.despine()

Faire le même graphique mais cette fois-ci, faire en sorte qu'on puisse choisir un id et afficher seulement les trajets de cet id.

In [None]:
numero_cycliste = [40]
filtre = df_pandas.id.isin(numero_cycliste)
df_pandas[filtre].plot(kind="bar", x="numero_de_trajet", y="duree", title="Durée des trajets pour le cycliste %s" %numero_cycliste)
sns.despine()

Sauvegardez votre dataset trajets au format csv dans le dossier data.

In [None]:
trajets.write.format("csv").save("./data/trajets1.csv")

In [None]:
!uname -a
