In [0]:
from pyspark.sql.functions import *
from pyspark.sql.types import *
from pyspark.sql import Window

#### data+distance

In [0]:
#recuper le table distance_tmp, sous format dataframe pour preparer les données 
data= sqlContext.table("token_db.distance_tmp")

#### processing data

In [0]:
def processing_data(data_processing):
  #explode la colonne allnodes pour calculer le nombre de nodes 
  data_processing=data_processing.select("*",explode(data_processing.allnodes).alias("nodes"))
  #transformer la dateentry en date
  data_processing=data_processing.withColumn("date", to_date(from_unixtime(col('dateentry')/1000)))
  #ajouter la colonne year 
  data_processing=data_processing.withColumn("year",year("date"))
  #filtrer data
  data_processing=data_processing.filter(col("year").isin([2019,2020,2021,2022]))
  #ajouter la colonne de token à partir la colonne de deviceid
  data_processing=data_processing.withColumn('token', split(data_processing['deviceid'], '#').getItem(1))
  return data_processing

  

In [0]:
#apl la fonction processing_data
data_processing=processing_data(data)

#### data token

In [0]:
def groupby_token(data):
  DF=data.groupby("token").agg( min("date").alias('date_premiere_cap'),
                               max("date").alias('date_derniere_cap'),
                              ((max("dateentry")-min("dateentry"))/(1000*3600) ).alias("Age_token"),
                               count("rideid").alias('nombre_trajet'),
                               count_distinct("deviceid").alias('nombre_device'),
                               sum("distance").alias('nombre_km'),
                               count("nodes").alias('nombre_nodes') ,
                              )
  return DF

In [0]:
data_token=groupby_token(data_processing)

In [0]:
def info_token(data):
  #enlever le token qui a 12 device
  data=data.filter(col("nombre_device")!=12)
  #colonne fréquence
  data=(data.withColumn("frequence",col('nombre_trajet')/col("Age_token"))
  # temps_inactivite : derniere cap current_date()
           .withColumn("temps_inactivite",datediff(current_date(),col('date_derniere_cap')))
  #age de token en jour 
           .withColumn("Age_token_jour",col("Age_token")/24)
  #age de token en mois
           .withColumn("Age_token_mois",col("Age_token_jour")/30)
  #temps d'inactivité en mois
           .withColumn("temps_inactivite_mois",col("temps_inactivite")/30))
  return data

In [0]:
data_inf_token = info_token(data_token)

#### data_trajet

In [0]:
def temps_inactivite(data):
  #les differents temps_inactivite pour chaque token
  windows = Window.partitionBy("token","rideid").orderBy(col("dateentry").asc())
  data_trajet = (data.withColumn('temps_cap_debut_trajet', min("dateentry").over(windows))
                     .withColumn('temps_cap_fin_trajet', max("dateentry").over(windows))
                      .withColumn('lead_temps_cap_debut_trajet', lead('temps_cap_debut_trajet').over(windows)).dropna()
                   )
    #duree entre les trajets
  data_trajet= data_trajet.withColumn("temps_inactivite_entre_trajet",(col('temps_cap_fin_trajet')- col('lead_temps_cap_debut_trajet'))/(1000*3600))
  #temps d'innactivité maximal pour chaque token entre deux trajets différents
  data_trajet=data_trajet.groupby("token").agg(max("temps_inactivite_entre_trajet").alias("temps_inactivite_max_entre_2trajets"))
  return data_trajet

In [0]:
data_temps_inactivite=temps_inactivite(data_processing)

#### Jointure pour ajouter le temps d'innactivité maximal entre deux trajet pour chaque token

In [0]:
data_final = (data_inf_token.join(data_temps_inactivite, on= data_inf_token.token == data_temps_inactivite.token,how = "left")
                            .drop(data_temps_inactivite.token))

In [0]:
#data_final.write.option('header',True).mode('overwrite').parquet('/mnt/datalake/tmp/amani/data_token/data_info_token3')

####  stocker  data_final dans la base token_db , le table est info_token

In [0]:
data_final.write.format("delta").mode("overwrite").saveAsTable("token_db.info_token")

In [0]:
data_dep= data_trajet.withColumn("dpt", getDepartment(col("long_start"), col("lat_start")))