In [0]:
from pyspark.sql.functions import *
import pyspark.sql.types as T
from pyspark.sql import Window

In [0]:
df_2022 = spark.read.parquet("/mnt/processed-prod/Daily/SpeedRoadType/2022/*/*")
df_2022=df_2022.orderBy("dateentry")

In [0]:
df_2022.count()

In [0]:
# def node_selection_f(node_list):
#   """This function going to select the second node from all node (The most common one)"""  
#   if len(node_list) < 2:
#     x= node_list[0]
#   else:
#     x= node_list[1]
#   return x

# #The UDF to apply it on a pyspark dataframe
# node_selection = udf(node_selection_f, T.LongType())
# def parse_embedding_from_string(x):
#     res = json.loads(x)
#     return res

# #LongType no Integer type.... 
# retrieve_embedding = udf(parse_embedding_from_string, T.ArrayType(T.LongType()))

In [0]:
def node_selection_f(node_list):
  """This function going to select the second node from all node (The most common one)"""  
  for i in range (1,len(node_list)+1):
    
    if (i< 2):
      x= node_list[0]
    if (i==2):
      x= node_list[1]
    else:
      x= node_list[i-1]
  return x

#The UDF to apply it on a pyspark dataframe
node_selection = udf(node_selection_f, T.LongType())
def parse_embedding_from_string(x):
    res = json.loads(x)
    return res

#LongType no Integer type.... 
retrieve_embedding = udf(parse_embedding_from_string, T.ArrayType(T.LongType()))

In [0]:
#filter highway (les routes et les routes de liasons)
df_2022=df_2022.filter(col("highway").isin(["motorway","trunk","primary","secondary","tertiary","residential","unclassified","motorway_link","trunk_link","primary_link","secondary_link","tertiary_link"]))

In [0]:
df_2022 = df_2022.withColumn('nodeId',node_selection(col("allnodes")))

In [0]:
df_2022.dtypes

In [0]:
#recuperer les node id et way id
def get_ways(year,spark):
  df_way = spark.read.parquet("/mnt/externaldata/france-"+year[2:]+"0101.osm.pbf.way.parquet")
  return df_way
df_ways=get_ways("2022",spark)
df_with_way = df_ways.select("id",explode("nodes").alias("nodes"))
df_with_nodes = df_with_way.select(col("id").alias("id_way"),col("nodes")["nodeId"].alias("nodeId"),col('nodes')["index"].alias("index"))

In [0]:
#ajouter la colonne node d'intersection
df_nodes_count=df_with_nodes.groupby(col("nodeId")).agg(count_distinct("id_way").alias("count_way"))
df_nodes_count=df_nodes_count.withColumn("node_intersection",when(df_nodes_count.count_way<=1,True).otherwise(False))

In [0]:
df_nodes_count.count()

In [0]:
#jointure pour avoir les 3 colonnes id way , id node, node_intersection
data_node_way_count = (df_with_nodes.join(df_nodes_count.select("nodeId","node_intersection"), on= df_with_nodes.nodeId == df_nodes_count.nodeId,how = "left")
                            .drop(df_nodes_count.nodeId)).drop("index")

In [0]:
#data_node_way_count.display()

In [0]:
#jointure final
data_final_node = (df_2022.join(data_node_way_count.select("nodeId","id_way","node_intersection"), on= df_2022.nodeId == data_node_way_count.nodeId,how = "left")
                            .drop(data_node_way_count.nodeId))

In [0]:
data_final_node.count()

In [0]:
data_final_node.select("nodeId").distinct().count()

In [0]:
id_null=data_final_node.filter(col("id_way").isNull())
id_null.count()

In [0]:
(id_null.select("nodeId").distinct().count()/data_final_node.select("nodeId").distinct().count())*100

In [0]:
# drop les nodes que n'appartiennet à acun chemin qui presente 0.43% de nodes(717869)
 # sans sans id_way : 3108 
data_final_node=data_final_node.dropna()

In [0]:
data_final_node.write.option('header',True).mode('overwrite').parquet('/mnt/datalake/tmp/amani/data_node_way/way_2022')

### data

In [0]:
data = spark.read.parquet('/mnt/datalake/tmp/amani/data_node_way/way_2022')
data=data.orderBy("dateentry")

In [0]:
data.count()

In [0]:
duplicate = data.groupBy("deviceid","rideId","dateentry").count().filter(col("count")>1)

In [0]:
duplicate.count()

In [0]:
duplicate.select(max("count")).display()

In [0]:
duplicate = duplicate.withColumn("duplicate",lit(True))
join_date = data.join(duplicate.select("deviceid","rideId","dateentry","duplicate"),["deviceid","rideId","dateentry"],"left")

In [0]:
join_data = join_date.withColumn("duplicate",when(col("duplicate")== True,True).otherwise(False))

In [0]:
join_data.display()

In [0]:
join_data = join_data.withColumn("true_way",when(col("duplicate")==True,None).otherwise(col("id_way")))

In [0]:
import sys
w = Window.partitionBy("rideId").orderBy(col("dateentry").asc()).rowsBetween(-sys.maxsize, 0)
join_data_2 = join_data.withColumn("true_way_2", last('true_way', True).over(w))

In [0]:
join_data_2.display()

In [0]:
w = Window.partitionBy("rideId").orderBy(col("dateentry").desc()).rowsBetween(-sys.maxsize, 0)
join_data_3 = join_data_2.withColumn("true_way_3", last('true_way_2', True).over(w))

In [0]:
ride1=join_data_3.filter(col("rideid")=="0002b29b-67d6-46cc-9e5f-9a58d608bdd2_unknown_1620334414#33553055").orderBy(col("dateentry").asc())

In [0]:
ride1.groupBy("deviceid","rideId","dateentry").count().filter(col("count")>1).display()

In [0]:
join_data_2.filter(col("rideid")=="0002b29b-67d6-46cc-9e5f-9a58d608bdd2_unknown_1620334414#33553055").orderBy(col("dateentry").asc()).display()

In [0]:
join_data_3.filter(col("rideId")=="01759867-7507-4e9b-9f38-66bcb1d9b99f_400CA7AC-B653-42B1-A89C-66F6C3F4AB0C#62026869").display()

In [0]:
df_null=join_data_3.filter((col('true_way_3').isNull()))
df_null.groupby(col("rideId")).count().display()

In [0]:
rideid_1=join_data_3.filter(col("rideId")=="7a86ecf5-6909-4104-9b53-c70b6373c6ef_03BB6063-1C07-4450-8B5B-228D2020A051#ANONYMOUSTOKEN2934")
rideid_1.groupby(col("true_way_3")).count().display()

In [0]:
join_data_3.filter(col("rideId")=="01759867-7507-4e9b-9f38-66bcb1d9b99f_400CA7AC-B653-42B1-A89C-66F6C3F4AB0C#62026869").display()

In [0]:
join_data_3.filter(col("rideId")=="039569e3-5cf5-4a9d-baac-d0945cd70877_2014DFDC-3786-4BF3-A5B2-09ED0DAC0DDB#89387561").display()

In [0]:
join_data_3.filter(col("rideId") == "039a4712-4505-4903-a312-1f9739a65b7a_60D1ACC8-AFA1-4F89-9F62-D6F4D66FEEC2#ANONYMOUSTOKEN4369").display()

In [0]:
#Window.partitionBy('session').orderBy('ts').rowsBetween(-sys.maxsize, 0))).show()

In [0]:
data_drop_duplicate=join_data_3.dropDuplicates(["deviceid","rideId","dateentry"])

In [0]:
data_drop_duplicate.count()

In [0]:
data_drop_duplicate=data_drop_duplicate.drop("true_way","true_way_2")

In [0]:
data_drop_duplicate.groupBy("deviceid","rideId","dateentry").count().count()

In [0]:

data_drop_duplicate.filter(col("true_way_3").isNull()).count()

In [0]:
true_way_3windows = Window.partitionBy("rideid").orderBy(col("dateentry").asc())
df_lag = (
          data_drop_duplicate.withColumn('lag_wayid', lag('true_way_3').over(windows))
          
         )
             

In [0]:
#df_lag=df_lag.withColumn("nodeId2",when(df_lag.node_intersection==True,df_lag.lag_node_end).otherwise(df_lag.nodeId))
df_lag=df_lag.withColumn("id_way2",when(df_lag.node_intersection==True,df_lag.lag_wayid).otherwise(df_lag.true_way_3))

In [0]:
df_lag.count()

In [0]:
df_lag.filter(col("rideid")=="34d013fb-6c02-4afd-a443-61cbd66ae12d_FB987EAF-AD5A-4840-B8CB-CCD56D1A0797#04065471").orderBy(col("dateentry").asc()).display()

In [0]:
df_count=df_lag.groupby(col("nodeId")).agg(count_distinct("id_way2").alias("count_way"))

In [0]:
df_count.select(max("count_way")).display()

In [0]:
#df_count.filter(col("count_way")==42).display()

In [0]:
#ride1=df_lag.filter(col("rideId")=="34d013fb-6c02-4afd-a443-61cbd66ae12d_FB987EAF-AD5A-4840-B8CB-CCD56D1A0797#04065471").orderBy(col("dateentry").asc())

In [0]:
#ride1.select("nodeId","node_intersection","id_way","id_way2").display()

In [0]:
df_lag.select(col("id_way2")).distinct().count()

In [0]:
df_way=df_lag.groupby("id_way2").agg(collect_list('nodeId').alias("list_nodes"))

In [0]:
df_way=df_way.withColumn("tronçon_physique", monotonically_increasing_id()) 

In [0]:
df_way.display()

In [0]:
#jointure final
data_tronçons = (df_lag.join(df_way, on= df_lag.id_way2 == df_way.id_way2,how = "left").drop(df_way.id_way2))

In [0]:
data_tronçons.display()

In [0]:
data_tronçons.filter(col("rideId")=="34d013fb-6c02-4afd-a443-61cbd66ae12d_FB987EAF-AD5A-4840-B8CB-CCD56D1A0797#04065471").orderBy(col("dateentry").asc()).display()

In [0]:
data_final=data_tronçons.groupby("rideId","deviceid","tronçon_physique").agg(first("nodeId").alias('node_start'),last("nodeId").alias('node_end'),first("lat").alias('lat_start'),
                                                                            first("lng").alias('lng_start'),last("lat").alias('lat_end'),last("lng").alias('lng_end'),
                                                                            avg("speed").alias("vitesse_moy"),avg("acceleration").alias('acceleration_moy'))

In [0]:
data_final.display()