In [0]:
pip install geopandas

In [0]:
from pyspark.sql.functions import *
from pyspark.sql.types import *
from pyspark.sql import Window
#recuperer la fonction de distance
import fonctions 
import geocode
from datetime import date, timedelta, datetime

#### data

In [0]:
#Get Variables
#recuperer la date d'aujourd'hui
today = date.today()
#recuperer la date d'hier 
yesterday = today - timedelta(days = 28)
#à partier de la date d'hier on recuper l'année ,le mois, et le jour 
year = yesterday.year
month = yesterday.month
day = yesterday.day

In [0]:
#on recupere que les données d'hier pour l'automatisation 
path = f"/mnt/processed-prod/Daily/SpeedRoadType/"+str(year)+'/'+'{:02d}'.format(month)+'/'+'{:02d}'.format(day)+'/'
data= spark.read.option("header",True).option("inferSchema",True).parquet(path)

#### Les fonctions

In [0]:
#data_lag pour calculer la distance
def ajouter_distance(data_lag):
  #calculer la fonction lag pour calculer la distance de chaque trajet
  w = Window().partitionBy("rideid").orderBy(col("deviceid").asc(), col("dateentry").asc())
  data_lag = data_lag.select("*", lag("dateentry").over(w).alias("previousDateentry")).na.drop()
  data_lag= data_lag.select("*", lag("deviceid").over(w).alias("previousDeviceid")).na.drop()
  data_lag = data_lag.select("*", lag("latitude").over(w).alias("previousLatitude")).na.drop()
  data_lag = data_lag.select("*", lag("longitude").over(w).alias("previousLongitude")).na.drop()
  data_lag = data_lag.select("*", lag("speed").over(w).alias("previousSpeed")).na.drop()
  data_lag= data_lag.withColumn("distance",fonctions.get_distance(data_lag.longitude,data_lag.latitude, data_lag.previousLongitude, data_lag.previousLatitude))
  return data_lag


def processing_data(data):
  #explode la colonne allnodes pour calculer le nombre de nodes 
  data_processing=data.select("*",explode(data.allnodes).alias("nodes"))
  #transformer la dateentry en date
  data_processing=data_processing.withColumn("date", to_date(from_unixtime(col('dateentry')/1000)))
  data_processing=data_processing.withColumn('token', split(data_processing['deviceid'], '#').getItem(1))
  return data_processing


def groupby_token(data):
  DF=data.groupby("token").agg( min("date").alias('date_premiere_cap'),
                               max("date").alias('date_derniere_cap'),
                              ((max("dateentry")-min("dateentry"))/(1000*3600) ).alias("Age_token"),
                               count("rideid").alias('nombre_trajet'),
                               #count_distinct("deviceid").alias('nombre_device'),
                               sum("distance").alias('nombre_km'),
                               count("nodes").alias('nombre_nodes') ,
                                first("longitude").alias("long_start"),
                                first("latitude").alias("lat_start"),
                              )
  return DF

def info_token(data):
  #enlever le token qui a 12 device
  #data=data.filter(col("nombre_device")!=12)
  #colonne fréquence
  data=(data.withColumn("frequence",col('nombre_trajet')/col("Age_token"))
  # temps_inactivite : derniere cap current_date()
           .withColumn("temps_inactivite",datediff(current_date(),col('date_derniere_cap')))
  #age de token en jour 
           .withColumn("Age_token_jour",col("Age_token")/24)
  #age de token en mois
           .withColumn("Age_token_mois",col("Age_token_jour")/30)
  #temps d'inactivité en mois
           .withColumn("temps_inactivite_mois",col("temps_inactivite")/30))
  return data


def temps_inactivite(data):
  #les differents temps_inactivite pour chaque token
  windows = Window.partitionBy("token","rideid").orderBy(col("dateentry").asc())
  data_trajet = (data.withColumn('temps_cap_debut_trajet', min("dateentry").over(windows))
                     .withColumn('temps_cap_fin_trajet', max("dateentry").over(windows))
                      .withColumn('lead_temps_cap_debut_trajet', lead('temps_cap_debut_trajet').over(windows)).dropna()
                   )
    #duree entre les trajets
  data_trajet= data_trajet.withColumn("temps_inactivite_entre_trajet",(col('temps_cap_fin_trajet')- col('lead_temps_cap_debut_trajet'))/(1000*3600))
  #temps d'innactivité maximal pour chaque token entre deux trajets différents
  data_trajet=data_trajet.groupby("token").agg(max("temps_inactivite_entre_trajet").alias("temps_inactivite_max_entre_2trajets"))
  return data_trajet


#creer un udf pour recuperer le code de dep
#geocode fichier.py pour recuper la fonction findCity
@udf
def getDepartment(longitude, latitude):
  postalCode, cityname = geocode.findCity(longitude, latitude)
  return postalCode[:2]


#Ajouter le code de departement
def add_code(data):
  data_dep= data.withColumn("dpt", getDepartment(col("long_start"), col("lat_start")))
  #enlever les villes que se trouvent pas en france 
  data_dep=data_dep.filter((col("dpt")!= "-1")) 
  #remplacer 2A et 2B par  20
  data_dep= data_dep.withColumn("dpt",when((col("dpt")=='2A')| (col("dpt")=='2B'),"20").otherwise(col("dpt")))
  return data_dep

#### Apl des fonctions

In [0]:
#apl la fonction processing_data
data_distance=ajouter_distance(data)
#preparer les données
data_processing=processing_data(data_distance)
#agg pour recuperer les informations de chaque token
data_token=groupby_token(data_processing)
#ajouter des infos sur le token
data_inf_token = info_token(data_token)
#ajouter la colonne de date d'incativité maximale en é trajets differents
data_temps_inactivite=temps_inactivite(data_processing)

#### Jointure

In [0]:
#jointure pour ajouter au data qui contient tt les informations concerant chaque token , la date max entre deux trajets
data_final = (data_inf_token.join(data_temps_inactivite, on= data_inf_token.token == data_temps_inactivite.token,how = "left")
                            .drop(data_temps_inactivite.token))

#### Ajouter le code de departement

In [0]:
data_dep= add_code(data_final)

#### Enregistrement des données processés pour le jour précédent

In [0]:
#data_final.write.format("delta").mode("append").saveAsTable("users_db.info_token")