In [1]:
# DECLARATIONS #
################

import pandas as _pd
import dask.bag as _DaskBag
import dask.dataframe as _DaskDataFrame

import json

import geopandas as _gpd

from shapely.geometry import Polygon
from shapely.geometry import Point
from math import radians, cos, sin, asin, sqrt

import math
import numpy as _np

from datetime import datetime
import time


import folium
from folium.plugins import HeatMap

import matplotlib.pyplot as plt
import seaborn as sns

In [3]:
# RELOAD DATA PREVIOUSLY COMPUTED #
###################################

# 2024-01-06 - Cell execution time : 12.879 seconds

start_time = time.time()
print ("Current Time :", datetime.now().strftime("%Y-%m-%d %H:%M:%S"))

# Reload the Pandas Dataframe
drivers_gps_pandas_df = _pd.read_parquet('drivers_gps_pandas_df.parquet')
drivers_pandas_df = _pd.read_parquet('drivers_pandas_df.parquet')
zones_pandas_df = _pd.read_csv('zones_pandas_df.csv')

# Reload and sort the Dask Dataframe
drivers_gps_dask_df = _DaskDataFrame.from_pandas(drivers_gps_pandas_df, npartitions=4)
drivers_gps_dask_df = drivers_gps_dask_df.sort_values(by=['driver', 'timestamp'])
drivers_dask_df = _DaskDataFrame.from_pandas(drivers_pandas_df, npartitions=4)
drivers_dask_df = drivers_dask_df.sort_values(by=['driver'])
zones_dask_df = _DaskDataFrame.from_pandas(zones_pandas_df, npartitions=4)

# Refresh zones_geodataframe
zones_dask_df = zones_dask_df.compute()
#zones_geodataframe = _gpd.GeoDataFrame(zones_dask_df, geometry='geometry')
#zones_geodataframe.set_crs(epsg=4326, inplace=True)

print ("Current Time :", datetime.now().strftime("%Y-%m-%d %H:%M:%S"))
elapsed_time = round(time.time() - start_time, 3)
print(f"\nCell execution time : {elapsed_time} seconds")  

Current Time : 2024-01-19 15:28:57
Current Time : 2024-01-19 15:29:09

Cell execution time : 11.555 seconds


In [7]:
drivers_dask_df.head() 

Unnamed: 0,driver,first_date,last_date,mean_speed,total_distance,favorite_zone_by_time,favorite_zone_by_values,gps_count
0,000aa4d2,2017-08-31 14:50:31+00:00,2017-08-31 16:59:53+00:00,15.602668,30.887439,22,22,286
1,001b6172,2017-08-31 15:32:06+00:00,2017-08-31 16:59:41+00:00,15.268061,20.722036,22,22,220
2,00219be2,2017-08-31 16:21:42+00:00,2017-08-31 16:59:54+00:00,19.230185,12.496524,14,14,53
3,0022cc1b,2017-08-31 16:23:46+00:00,2017-08-31 16:51:00+00:00,10.560539,4.380521,22,22,62
4,0022f5ae,2017-08-31 12:00:16+00:00,2017-08-31 16:59:39+00:00,45.675928,40.787504,22,22,395


In [9]:
# Tri du DataFrame Dask par la colonne 'gps_count'
sorted_zones_dask_df = zones_dask_df.sort_values(by='gps_count', ascending=False)

# Afficher les premières lignes du DataFrame trié
print(sorted_zones_dask_df.head(10))


                                              polygon  is_valid  zone_id  \
21  [(-77.10249019404858, -12.044106631207434), (-...      True       22   
20  [(-77.01038495147722, -12.039787447923926), (-...      True       21   
13  [(-77.01038495147722, -12.129320541365702), (-...      True       14   
24  [(-77.10249019404858, -11.954619512552217), (-...      True       25   
12  [(-77.10249019404858, -12.133669908128265), (-...      True       13   
19  [(-77.01038495147722, -11.95033052922269), (-7...      True       20   
22  [(-77.19452474207226, -12.048392180939453), (-...      True       23   
23  [(-77.19452474207226, -11.95887509948719), (-7...      True       24   
25  [(-77.10249019404858, -11.86520920024759), (-7...      True       26   
14  [(-77.01038495147722, -12.218929159905016), (-...      True       15   

                                             geometry  gps_count  \
21  POLYGON ((-77.10249019404858 -12.0441066312074...    1956633   
20  POLYGON ((-77.010384951

In [10]:
# Tri du DataFrame Dask par la colonne 'gps_count'
sorted_zones_dask_df = zones_dask_df.sort_values(by='unique_drivers', ascending=False)

# Afficher les premières lignes du DataFrame trié
print(sorted_zones_dask_df.head(10))

                                              polygon  is_valid  zone_id  \
21  [(-77.10249019404858, -12.044106631207434), (-...      True       22   
20  [(-77.01038495147722, -12.039787447923926), (-...      True       21   
13  [(-77.01038495147722, -12.129320541365702), (-...      True       14   
24  [(-77.10249019404858, -11.954619512552217), (-...      True       25   
12  [(-77.10249019404858, -12.133669908128265), (-...      True       13   
19  [(-77.01038495147722, -11.95033052922269), (-7...      True       20   
22  [(-77.19452474207226, -12.048392180939453), (-...      True       23   
23  [(-77.19452474207226, -11.95887509948719), (-7...      True       24   
25  [(-77.10249019404858, -11.86520920024759), (-7...      True       26   
14  [(-77.01038495147722, -12.218929159905016), (-...      True       15   

                                             geometry  gps_count  \
21  POLYGON ((-77.10249019404858 -12.0441066312074...    1956633   
20  POLYGON ((-77.010384951

In [5]:
zones_dask_df

Unnamed: 0,polygon,is_valid,zone_id,geometry,gps_count,unique_drivers
0,"[(-77.28648789023627, -11.784234676000215), (-...",True,1,POLYGON ((-77.28648789023627 -11.7842346760002...,0,0
1,"[(-76.82596521434296, -12.479157220410556), (-...",True,2,POLYGON ((-76.82596521434296 -12.4791572204105...,22,3
2,"[(-76.64127121444726, -12.201069396462607), (-...",True,3,POLYGON ((-76.64127121444726 -12.2010693964626...,0,0
3,"[(-76.7336521407365, -12.205585634694005), (-7...",True,4,POLYGON ((-76.7336521407365 -12.20558563469400...,0,0
4,"[(-76.7336521407365, -12.116068939612953), (-7...",True,5,POLYGON ((-76.7336521407365 -12.11606893961295...,0,0
5,"[(-76.82596521434296, -12.120520083794023), (-...",True,6,POLYGON ((-76.82596521434296 -12.1205200837940...,0,0
6,"[(-76.82596521434296, -12.210067657646714), (-...",True,7,POLYGON ((-76.82596521434296 -12.2100676576467...,16,1
7,"[(-76.82596521434296, -12.299690140508726), (-...",True,8,POLYGON ((-76.82596521434296 -12.2996901405087...,749,12
8,"[(-76.7336521407365, -12.295177256777961), (-7...",True,9,POLYGON ((-76.7336521407365 -12.29517725677796...,0,0
9,"[(-76.7336521407365, -12.384843153818448), (-7...",True,10,POLYGON ((-76.7336521407365 -12.38484315381844...,0,0


In [None]:
# CREATE A HEAT MAP #
#####################

# 2024-01-15 - Cell execution time : 12.879 seconds

start_time = time.time()
print ("Current Time :", datetime.now().strftime("%Y-%m-%d %H:%M:%S"))

mean_lat, mean_lon = drivers_gps_pandas_df['latitude'].mean(), drivers_gps_pandas_df['longitude'].mean()
m = folium.Map(location=[mean_lat, mean_lon], zoom_start=12)

HeatMap(data=drivers_gps_pandas_df[['latitude', 'longitude']], radius=10).add_to(m)

m

In [None]:
print ("Current Time :", datetime.now().strftime("%Y-%m-%d %H:%M:%S"))
elapsed_time = round(time.time() - start_time, 3)
print(f"\nCell execution time : {elapsed_time} seconds")

In [None]:
plt.figure(figsize=(10, 6))
sns.histplot(drivers_pandas_df['mean_speed'], bins=30, kde=True)
plt.title('Distribution des Vitesses Moyennes')
plt.xlabel('Vitesse Moyenne (km/h)')
plt.ylabel('Nombre de Conducteurs')
plt.show()

plt.figure(figsize=(10, 6))
sns.histplot(drivers_pandas_df['total_distance'], bins=30, kde=True)
plt.title('Distribution des Distances Totales Parcourues')
plt.xlabel('Distance Totale (km)')
plt.ylabel('Nombre de Conducteurs')
plt.show()

In [None]:
plt.figure(figsize=(10, 6))
sns.histplot(drivers_pandas_df['gps_count'], bins=30, kde=True)
plt.title('Distribution des Comptes GPS')
plt.xlabel('Nombre d\'Enregistrements GPS')
plt.ylabel('Nombre de Conducteurs')
plt.show()


plt.figure(figsize=(10, 6))
sns.scatterplot(x='mean_speed', y='total_distance', data=drivers_pandas_df)
plt.title('Link Speed / Total Distance')
plt.xlabel('Mean Speed (km/h)')
plt.ylabel('Total Distance (km)')
plt.show()


In [None]:
plt.figure(figsize=(12, 6))
sns.countplot(data=drivers_pandas_df, x='favorite_zone_by_time')
plt.title('Zones Favorites des Conducteurs par Temps Passé')
plt.xlabel('Zone ID')
plt.ylabel('Nombre de Conducteurs')
plt.xticks(rotation=45)
plt.show()

plt.figure(figsize=(12, 6))
sns.countplot(data=drivers_pandas_df, x='favorite_zone_by_values')
plt.title('Zones Favorites des Conducteurs par Nombre d\'Enregistrements')
plt.xlabel('Zone ID')
plt.ylabel('Nombre de Conducteurs')
plt.xticks(rotation=45)
plt.show()

In [None]:
plt.figure(figsize=(12, 6))
sns.barplot(data=zones_pandas_df, x='zone_id', y='gps_count')
plt.title('Nombre de Lignes GPS par Zone')
plt.xlabel('Zone ID')
plt.ylabel('Nombre de Lignes GPS')
plt.xticks(rotation=45)
plt.show()

plt.figure(figsize=(12, 6))
sns.barplot(data=zones_pandas_df, x='zone_id', y='unique_drivers')
plt.title('Nombre de Conducteurs Uniques par Zone')
plt.xlabel('Zone ID')
plt.ylabel('Nombre de Conducteurs Uniques')
plt.xticks(rotation=45)
plt.show()

In [None]:
#zones_dask_df['geometry'] = zones_dask_df['polygon'].apply(lambda x: Polygon(x))
zones_geodataframe = _gpd.GeoDataFrame(zones_pandas_df, geometry='geometry')
zones_geodataframe.set_crs(epsg=4326, inplace=True)

In [None]:
zones_geodataframe

In [None]:
def get_color_and_weight(unique_drivers):
    max_drivers = 10000

    if unique_drivers < 10:
        return '#d3d3d3', 1  # Gris pour aucune utilisation
    elif unique_drivers < 500:
        return '#ffcccc', 2  
    elif unique_drivers < 2500:
        return '#ff9999', 3    
    elif unique_drivers < 6000:
        return '#ff6666', 4   
    elif unique_drivers >= max_drivers:
        color = '#ff0000'  # Rouge foncé
        weight = 5
    else:
        normalized_value = unique_drivers / max_drivers
        red = int(255 * normalized_value)
        color = f'#{red:02x}3333'
        weight = 2 + 3 * normalized_value

    return color, weight

# Compute centroid to center the map
zones_geodataframe['centroid'] = zones_geodataframe['geometry'].centroid
mean_latitude = zones_geodataframe['centroid'].y.mean()
mean_longitude = zones_geodataframe['centroid'].x.mean()

oMap = folium.Map(location=[mean_latitude, mean_longitude], zoom_start=12)

for _, row in zones_geodataframe.iterrows():
    color, weight = get_color_and_weight(row['unique_drivers'])
    simpl_geo = row['geometry'].simplify(tolerance=0.001, preserve_topology=True)

    def style_function(x, color=color, weight=weight):
        return {'color': color, 'weight': weight}

    folium.GeoJson(simpl_geo, style_function=style_function).add_to(oMap)

    centroid = row['geometry'].centroid
    folium.Marker(
        [centroid.y, centroid.x],
        icon=folium.DivIcon(
            icon_size=(150,36),
            icon_anchor=(7,20),
            html=f'<div style="font-size: 12pt; color : black">{row["zone_id"]}</div>'
        )
    ).add_to(oMap)

oMap

In [None]:
import folium

m = folium.Map(location=[mean_latitude, mean_longitude], zoom_start=9.5) 

max_drivers = zones_geodataframe['unique_drivers'].max()
zones_geodataframe['normalized_drivers'] = zones_geodataframe['unique_drivers'] / max_drivers


def get_color(normalized_value):
    return f'#{int(normalized_value * 255):02x}0000'  

for _, row in zones_geodataframe.iterrows():
    folium.Polygon(
        locations=[(lon, lat) for lat, lon in row['geometry'].exterior.coords],
        color=get_color(row['normalized_drivers']),
        fill=True,
        fill_color=get_color(row['normalized_drivers']),
        fill_opacity=0.7
    ).add_to(m)

m

In [None]:
drivers_gps_pandas_df['hour'] = drivers_gps_pandas_df['timestamp'].dt.hour

plt.figure(figsize=(10, 6))
plt.hist(drivers_gps_pandas_df['hour'], bins=24, range=(8, 21), edgecolor='black')
plt.title('Répartition des Données GPS par Heure')
plt.xlabel('Heure du Jour')
plt.ylabel('Nombre de Points GPS')
plt.xticks(range(0, 25))
plt.grid(axis='y', alpha=0.75)
plt.show()

In [11]:
pip install sedona-spark-shaded

Defaulting to user installation because normal site-packages is not writeableNote: you may need to restart the kernel to use updated packages.



ERROR: Could not find a version that satisfies the requirement sedona-spark-shaded (from versions: none)
ERROR: No matching distribution found for sedona-spark-shaded


In [12]:
pip install apache-sedona[spark]

Defaulting to user installation because normal site-packages is not writeable
Note: you may need to restart the kernel to use updated packages.


In [1]:
from sedona.utils import SedonaKryoRegistrator, KryoSerializer
from sedona.register import SedonaRegistrator
from sedona.core.SpatialRDD import PointRDD, PolygonRDD
from sedona.core.formatMapper.shapefileParser import ShapefileReader
from sedona.core.formatMapper import WktReader
from sedona.sql.types import GeometryType


In [2]:
# LET'S PLAY WITH SPARX... #
############################

from pyspark.sql import SparkSession
from sedona.register import SedonaRegistrator
from sedona.utils import SedonaKryoRegistrator, KryoSerializer
from pyspark.sql.functions import col
#from sedona.core.SedonaContext import SedonaContext


spark = SparkSession.builder \
    .appName("GeoSpatialSpark") \
    .config("spark.serializer", KryoSerializer.getName) \
    .config("spark.kryo.registrator", SedonaKryoRegistrator.getName) \
    .config("spark.jars.packages", "org.apache.sedona:sedona-python-adapter-3.0_2.12:1.0.1-incubating,org.apache.sedona:sedona-core-3.0_2.12:1.0.1-incubating") \
    .getOrCreate()


sedona_context = SedonaContext(spark)


SedonaRegistrator.registerAll(spark)




Py4JJavaError: An error occurred while calling None.org.apache.spark.api.java.JavaSparkContext.
: java.lang.RuntimeException: java.io.FileNotFoundException: java.io.FileNotFoundException: HADOOP_HOME and hadoop.home.dir are unset. -see https://wiki.apache.org/hadoop/WindowsProblems
	at org.apache.hadoop.util.Shell.getWinUtilsPath(Shell.java:735)
	at org.apache.hadoop.util.Shell.getSetPermissionCommand(Shell.java:270)
	at org.apache.hadoop.fs.FileUtil.chmod(FileUtil.java:1139)
	at org.apache.hadoop.fs.FileUtil.chmod(FileUtil.java:1125)
	at org.apache.spark.util.Utils$.fetchFile(Utils.scala:489)
	at org.apache.spark.SparkContext.addFile(SparkContext.scala:1795)
	at org.apache.spark.SparkContext.$anonfun$new$16(SparkContext.scala:533)
	at org.apache.spark.SparkContext.$anonfun$new$16$adapted(SparkContext.scala:533)
	at scala.collection.immutable.List.foreach(List.scala:431)
	at org.apache.spark.SparkContext.<init>(SparkContext.scala:533)
	at org.apache.spark.api.java.JavaSparkContext.<init>(JavaSparkContext.scala:58)
	at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
	at sun.reflect.NativeConstructorAccessorImpl.newInstance(Unknown Source)
	at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(Unknown Source)
	at java.lang.reflect.Constructor.newInstance(Unknown Source)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:247)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)
	at py4j.Gateway.invoke(Gateway.java:238)
	at py4j.commands.ConstructorCommand.invokeConstructor(ConstructorCommand.java:80)
	at py4j.commands.ConstructorCommand.execute(ConstructorCommand.java:69)
	at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
	at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
	at java.lang.Thread.run(Unknown Source)
Caused by: java.io.FileNotFoundException: java.io.FileNotFoundException: HADOOP_HOME and hadoop.home.dir are unset. -see https://wiki.apache.org/hadoop/WindowsProblems
	at org.apache.hadoop.util.Shell.fileNotFoundException(Shell.java:547)
	at org.apache.hadoop.util.Shell.getHadoopHomeDir(Shell.java:568)
	at org.apache.hadoop.util.Shell.getQualifiedBin(Shell.java:591)
	at org.apache.hadoop.util.Shell.<clinit>(Shell.java:688)
	at org.apache.hadoop.util.StringUtils.<clinit>(StringUtils.java:79)
	at org.apache.hadoop.conf.Configuration.getTimeDurationHelper(Configuration.java:1907)
	at org.apache.hadoop.conf.Configuration.getTimeDuration(Configuration.java:1867)
	at org.apache.hadoop.conf.Configuration.getTimeDuration(Configuration.java:1840)
	at org.apache.hadoop.util.ShutdownHookManager.getShutdownTimeout(ShutdownHookManager.java:183)
	at org.apache.hadoop.util.ShutdownHookManager$HookEntry.<init>(ShutdownHookManager.java:207)
	at org.apache.hadoop.util.ShutdownHookManager.addShutdownHook(ShutdownHookManager.java:304)
	at org.apache.spark.util.SparkShutdownHookManager.install(ShutdownHookManager.scala:181)
	at org.apache.spark.util.ShutdownHookManager$.shutdownHooks$lzycompute(ShutdownHookManager.scala:50)
	at org.apache.spark.util.ShutdownHookManager$.shutdownHooks(ShutdownHookManager.scala:48)
	at org.apache.spark.util.ShutdownHookManager$.addShutdownHook(ShutdownHookManager.scala:153)
	at org.apache.spark.util.ShutdownHookManager$.<init>(ShutdownHookManager.scala:58)
	at org.apache.spark.util.ShutdownHookManager$.<clinit>(ShutdownHookManager.scala)
	at org.apache.spark.util.Utils$.createTempDir(Utils.scala:242)
	at org.apache.spark.util.SparkFileUtils.createTempDir(SparkFileUtils.scala:103)
	at org.apache.spark.util.SparkFileUtils.createTempDir$(SparkFileUtils.scala:102)
	at org.apache.spark.util.Utils$.createTempDir(Utils.scala:94)
	at org.apache.spark.deploy.SparkSubmit.prepareSubmitEnvironment(SparkSubmit.scala:372)
	at org.apache.spark.deploy.SparkSubmit.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:964)
	at org.apache.spark.deploy.SparkSubmit.doRunMain$1(SparkSubmit.scala:194)
	at org.apache.spark.deploy.SparkSubmit.submit(SparkSubmit.scala:217)
	at org.apache.spark.deploy.SparkSubmit.doSubmit(SparkSubmit.scala:91)
	at org.apache.spark.deploy.SparkSubmit$$anon$2.doSubmit(SparkSubmit.scala:1120)
	at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:1129)
	at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
Caused by: java.io.FileNotFoundException: HADOOP_HOME and hadoop.home.dir are unset.
	at org.apache.hadoop.util.Shell.checkHadoopHomeInner(Shell.java:467)
	at org.apache.hadoop.util.Shell.checkHadoopHome(Shell.java:438)
	at org.apache.hadoop.util.Shell.<clinit>(Shell.java:515)
	... 25 more


In [15]:
# Charger les données conducteurs et zones dans des DataFrames Spark
drivers_df = spark.read.csv("drivers_gps_pandas_df.csv", header=True, inferSchema=True)
zones_df = spark.read.csv("zones_pandas_df.csv", header=True, inferSchema=True)

# Supposons que zones_df ait des colonnes 'zone_id', 'latitude', 'longitude'


NameError: name 'spark' is not defined

In [16]:
from sedona.utils import KryoSerializer, SedonaKryoRegistrator
from sedona.core.SpatialRDD import PointRDD, PolygonRDD
from sedona.core.formatMapper.shapefileParser import ShapefileReader
from sedona.core.formatMapper import WktReader
from sedona.sql.types import GeometryType
from sedona.register import SedonaRegistrator
from sedona.core.spatialOperator import KNNQuery
from sedona.core.enums import FileDataSplitter, IndexType
from sedona.core.spatialOperator import JoinQuery
from sedona.core.formatMapper import GeoJsonReader

# Convertir les colonnes de coordonnées en géométries Point pour les conducteurs
drivers_df = drivers_df.withColumn("geometry", expr("ST_Point(CAST(longitude AS Decimal(24,20)), CAST(latitude AS Decimal(24,20)))"))

# Supposons que zones_df ait déjà une colonne 'geometry' de type Polygon
# Si ce n'est pas le cas, vous devrez la créer à partir de vos données

# Jointure spatiale
joined_df = drivers_df.join(zones_df, expr("ST_Within(drivers_df.geometry, zones_df.geometry)"))

# Sélectionner et renommer les colonnes nécessaires
result_df = joined_df.select(col("drivers_df.*"), col("zones_df.zone_id").alias("zone_id"))

# Afficher le résultat
result_df.show()


NameError: name 'drivers_df' is not defined