In [0]:
from math import radians, cos, sin, asin, sqrt
from pyspark.sql.functions import isnan, when, count, col,round
from pyspark.sql import Window
import pyspark.sql.functions as F
import numpy as np
import matplotlib.pyplot as plt
from pyspark.ml.stat import Correlation
from pyspark.ml.linalg import DenseMatrix, Vectors
from pyspark.ml.feature import VectorAssembler
from pyspark.sql.functions import *

In [0]:
df_dico = {}
for n in [2018,2019,2020,2021]:  
    path = f"/mnt/datalake/tmp/guillaume/profiling/production/raw_data/{n}_raw_data_user"
    df_dico[n] = spark.read.option("header",True).option("inferSchema",True).csv(path)
df_dico[2022]=df_2022

In [0]:
def get_distance(longit_a, latit_a, longit_b, latit_b):
  # Transform to radians
  longit_a, latit_a, longit_b, latit_b = map(radians, [longit_a, latit_a, longit_b, latit_b])
  dist_longit = longit_b - longit_a
  dist_latit = latit_b - latit_a
  # Calculate area
  area = sin(dist_latit / 2) ** 2 + cos(latit_a) * cos(latit_b) * sin(dist_longit / 2) ** 2
  # Calculate the central angle
  central_angle = 2 * asin(sqrt(area))
  radius = 6371
  # Calculate Distance 
  distance = central_angle * radius
  return abs(distance)

In [0]:
for n in [2018,2019,2020,2021]: 
  w = Window().partitionBy("rideid").orderBy(col("deviceid").asc(), col("dateentry").asc())
  df_dico[n] = df_dico[n].select("*", lag("dateentry").over(w).alias("previousDateentry")).na.drop()
  df_dico[n]= df_dico[n].select("*", lag("deviceid").over(w).alias("previousDeviceid")).na.drop()
  df_dico[n] = df_dico[n].select("*", lag("latitude").over(w).alias("previousLatitude")).na.drop()
  df_dico[n] = df_dico[n].select("*", lag("longitude").over(w).alias("previousLongitude")).na.drop()
  df_dico[n] = df_dico[n].select("*", lag("speed").over(w).alias("previousSpeed")).na.drop()
  df_dico[n]= df_dico[n].withColumn("distance",get_distance(df_dico[n].longitude,df_dico[n].latitude, df_dico[n].previousLongitude, df_dico[n].previousLatitude))
 
  

In [0]:
df_dico.keys()

In [0]:
def all_data():
  df1=df_dico[2018].union(df_dico[2019])
  df2=df1.union(df_dico[2020])
  df3=df2.union(df_dico[2021])
  df4=df3.union(df_dico[2022])
  return df4
df=all_data()

In [0]:
df.write.option('header',True).mode('overwrite').parquet('/mnt/datalake/tmp/amani/ride_data/all_data')

In [0]:
data=df.select("*")

In [0]:
#Convert dateentry to date
#add month and week
def get_date(data):
  data=(data.withColumn("date", F.to_date(F.from_unixtime(F.col('dateentry')/1000)))
                     .withColumn("month",F.month("date"))
                     .withColumn("week",F.weekofyear("date"))
                     .withColumn("year",F.year("date"))
                     )
  return data
data_date=get_date(data)

In [0]:
data_date.groupby(F.col("year")).count().display()

year,count
2018,22085962
2019,116215042
2020,73547493
2021,88247623
2022,5837433
1999,399


In [0]:
data_date.filter(F.col("year")==2018).groupby(F.col('month')).count().display()

month,count
12,3411590
6,1811238
3,107387
5,2781580
9,2048537
4,591475
8,1205226
7,1782503
10,4570599
11,3662103


In [0]:
data_date.filter(F.col("year")==2019).groupby(F.col('month')).count().display()

month,count
12,9211268
1,2543915
6,15380062
3,2347998
5,17569262
9,12354407
4,4148194
8,13887572
7,15968544
10,11902924


In [0]:
data_date.filter(F.col("year")==2020).groupby(F.col('month')).count().display()

month,count
12,7166039
1,8687321
6,4486570
3,4497297
5,3077598
9,8015044
4,1463472
8,6310172
7,7000627
10,9609383


In [0]:
data_date.filter(F.col("year")==2021).groupby(F.col('month')).count().display()

month,count
12,7255888
1,6405108
6,6975385
3,7596965
5,7214109
9,7104728
4,6059603
8,5498532
7,5963981
10,11662326


In [0]:
data_date.filter(F.col("year")==2022).groupby(F.col('month')).count().display()

month,count
1,4512197
2,1325236


In [0]:
data_date=data_date.filter(F.col("year")!=1999)

In [0]:
#le nombre de km qui ont fait tt  utilisateurs , chaque mois de tt les années
data_group_mois=data_date.sort(data_date["date"].asc()).groupby("year","month").agg(F.count("rideid").alias('nombre_trajet'), F.avg("speed").alias('vitesse_moyenne'), F.avg("acceleration").alias('acceleration_moyenne'),F.sum("distance").alias('distance_totale'))

In [0]:
data_group_mois.count()

In [0]:
data_group_mois.display()

year,month,nombre_trajet,vitesse_moyenne,acceleration_moyenne,distance_totale
2018,8,1205226,12.557313261672364,-0.0095924154553284,24011.44233651541
2018,9,2048537,14.8256915088252,-0.0445139401431782,77144.17659856532
2018,11,3662103,11.09088239474113,-0.018484770612614,312212.45671821054
2019,1,2543915,11.276647377424103,-0.0195920590395736,308256.4557960545
2019,4,4148194,11.18286367197584,0.0244864929004995,105872.83663160302
2019,5,17569262,11.887574156232544,0.0353746195456093,327625.3884468386
2019,6,15380062,11.94688006426731,0.0380609085594971,277400.9113236109
2019,7,15968544,12.594470186257212,0.0393664700084374,294604.14560322696
2019,8,13887572,13.487897264576986,0.0420057750928988,276196.8432015655
2019,9,12354407,11.919281155455032,0.0362805899404851,221903.2420182675


In [0]:
data_group_semaine=data_date.sort(data_date["date"].asc()).groupby("year","week").agg(F.count("rideid").alias('nombre_trajet'),F.count("deviceid").alias('nombre_device'), F.avg("speed").alias('vitesse_moyenne'), F.avg("acceleration").alias('acceleration_moyenne'),F.sum("distance").alias('distance_totale'))

In [0]:
data_group_semaine.display()

year,week,nombre_trajet,nombre_device,vitesse_moyenne,acceleration_moyenne,distance_totale
2018,39,1095159,1095159,14.92390686488734,-0.0576520836137405,49009.5989947312
2018,40,1148522,1148522,14.36049366725732,-0.0275435637727733,28611.27635134165
2018,43,923972,923972,12.493780691657363,-0.0073145030967778,56223.895074240645
2018,45,906320,906320,13.192763237078488,-0.0532024106265775,75938.11312810106
2018,44,869885,869885,11.2418232400849,-0.0071697193227029,68132.57114889384
2018,50,884353,884353,9.95120222791907,-0.0547535336797947,51002.52838101194
2018,48,772419,772419,10.390315264860371,-0.0087353271691758,53861.96726926807
2018,49,848878,848878,9.542771492634676,-0.0040680216528074,43423.03262876294
2019,5,200516,200516,11.366885508850087,-0.0235357298697646,10202.324693246916
2019,8,540623,540623,10.531438948783562,0.0071581187644734,38998.45203327469


In [0]:
data_group_semaine.count()

In [0]:
#dataset=data_group_mois.select("nombre_trajet","vitesse_moyenne","distance_totale","acceleration_moyenne")
dataset=data_group_semaine.select("nombre_device","vitesse_moyenne","distance_totale","acceleration_moyenne")

In [0]:
import six
for i in dataset.columns:
    if not( isinstance(dataset.select(i).take(1)[0][0], six.string_types)):
        print( "Correlation to distance for ", i, dataset.stat.corr("distance_totale",i))

In [0]:
# def correlation(df):
#   assembler = VectorAssembler(inputCols=df.columns, outputCol="features",handleInvalid='keep')
#   df_assembler = assembler.transform(df).select("features")
#   # correlation will be in Dense Matrix
#   correlation = Correlation.corr(df_assembler ,"features","pearson").collect()[0][0]

#   # To convert Dense Matrix into DataFrame
#   rows = correlation.toArray().tolist()
#   df_assembler = spark.createDataFrame(rows,df.columns)
#   return df_assembler

In [0]:
# df_corr=correlation(dataset)
# df_corr.display()

In [0]:
trainDF, testDF = dataset.randomSplit([0.8, 0.2], seed=42)
print(trainDF.cache().count()) # Cache because accessing training data multiple times
print(testDF.count())

In [0]:
from pyspark.ml.feature import VectorAssembler,StringIndexer
from pyspark.ml.feature import StandardScaler
features = dataset.drop('distance_totale').columns
vector = VectorAssembler(inputCols=features, outputCol='features')
scale=StandardScaler(inputCol='features',outputCol='standardized')


In [0]:
from pyspark.ml.regression import LinearRegression 
# lr = LinearRegression(featuresCol ='features', labelCol ='distance_totale') 
lr = LinearRegression(featuresCol = 'features', labelCol='distance_totale', maxIter=10, regParam=0.3, elasticNetParam=0.8)

In [0]:
from pyspark.ml import Pipeline
 
# Define the pipeline based on the stages created in previous steps.
pipeline = Pipeline(stages=[vector,scale, lr ])
 
# Define the pipeline model.
pipelineModel = pipeline.fit(trainDF)
 
# Apply the pipeline model to the test dataset.
predDF = pipelineModel.transform(testDF)

In [0]:
predDF .display()

nombre_trajet,vitesse_moyenne,distance_totale,acceleration_moyenne,features,standardized,prediction
24810,7.115424419010222,391.575320857469,-0.0078946563743487,"Map(vectorType -> dense, length -> 3, values -> List(24810.0, 7.115424419010222, -0.00789465637434872))","Map(vectorType -> dense, length -> 3, values -> List(0.02524853758563222, 3.6493509291320936, -0.385670143602081))",1259.2009964302924
31311,8.567968949477258,489.7150445518909,-0.0039010767091389,"Map(vectorType -> dense, length -> 3, values -> List(31311.0, 8.567968949477258, -0.0039010767091389523))","Map(vectorType -> dense, length -> 3, values -> List(0.03186444822022291, 4.394330345637931, -0.19057559230885096))",3761.391542089783
31770,5.640765236695341,614.932538936557,-0.018826932217404,"Map(vectorType -> dense, length -> 3, values -> List(31770.0, 5.640765236695341, -0.01882693221740406))","Map(vectorType -> dense, length -> 3, values -> List(0.03233156143069471, 2.89302937468537, -0.9197342237041791))",971.4609520494196
200516,11.366885508850164,10202.324693246854,-0.0235357298697647,"Map(vectorType -> dense, length -> 3, values -> List(200516.0, 11.366885508850165, -0.023535729869764724))","Map(vectorType -> dense, length -> 3, values -> List(0.20406028869490653, 5.829835544628092, -1.1497686394742885))",19192.808375706976
265527,10.41113688431276,4078.643474926213,-0.012785056097437,"Map(vectorType -> dense, length -> 3, values -> List(265527.0, 10.41113688431276, -0.01278505609743703))","Map(vectorType -> dense, length -> 3, values -> List(0.27022041271665326, 5.339652257506974, -0.6245761927118677))",14709.05224323125
287213,13.32213473467826,5964.592371321811,-0.0070869631447599,"Map(vectorType -> dense, length -> 3, values -> List(287213.0, 13.32213473467826, -0.007086963144759914))","Map(vectorType -> dense, length -> 3, values -> List(0.2922897309787258, 6.8326415838429835, -0.3462126740085876))",20544.99465988312
358532,11.806647296464524,6938.092065068921,0.0082992593719288,"Map(vectorType -> dense, length -> 3, values -> List(358532.0, 11.806647296464524, 0.008299259371928864))","Map(vectorType -> dense, length -> 3, values -> List(0.36486935419798033, 6.055380079110055, 0.40543582924808097))",13413.587655494097
409318,10.559068399735938,6962.042194214638,-0.006941494136947,"Map(vectorType -> dense, length -> 3, values -> List(409318.0, 10.559068399735938, -0.0069414941369470495))","Map(vectorType -> dense, length -> 3, values -> List(0.4165530393984607, 5.415523207919317, -0.3391062148452566))",15666.56101114134
540623,10.531438948783563,38998.45203327452,0.0071581187644733,"Map(vectorType -> dense, length -> 3, values -> List(540623.0, 10.531438948783565, 0.0071581187644733394))","Map(vectorType -> dense, length -> 3, values -> List(0.5501789655444276, 5.401352645972925, 0.34968877186157815))",13548.898917388444
553349,12.992028659327262,18941.78569302243,-0.0238469464823444,"Map(vectorType -> dense, length -> 3, values -> List(553349.0, 12.992028659327262, -0.02384694648234447))","Map(vectorType -> dense, length -> 3, values -> List(0.5631299082818221, 6.663337148597239, -1.1649722088221548))",28983.195913023577


In [0]:
from pyspark.ml.evaluation import RegressionEvaluator
lr_evaluator = RegressionEvaluator(predictionCol="prediction", \
                 labelCol='distance_totale',metricName="r2")
print("R Squared (R2) on test data = %g" % lr_evaluator.evaluate(predDF))

In [0]:
# test_result =lr_evaluator.evaluate(testDF)
# print("Root Mean Squared Error (RMSE) on test data = %g" % test_result.rootMeanSquaredError)

#### Pandas

In [0]:
df_pandas=data_group_semaine.toPandas()

In [0]:
df=df_pandas.copy()