In [None]:
!pip install pyspark
!pip install graphframes


Collecting pyspark
  Downloading pyspark-3.5.0.tar.gz (316.9 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m316.9/316.9 MB[0m [31m3.6 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.5.0-py2.py3-none-any.whl size=317425344 sha256=6365f02a7674a3f8c3429d64c7b5e56affd158b3be85b70b139948db1fc13c9f
  Stored in directory: /root/.cache/pip/wheels/41/4e/10/c2cf2467f71c678cfc8a6b9ac9241e5e44a01940da8fbb17fc
Successfully built pyspark
Installing collected packages: pyspark
Successfully installed pyspark-3.5.0
Collecting graphframes
  Downloading graphframes-0.6-py2.py3-none-any.whl (18 kB)
Collecting nose (from graphframes)
  Downloading nose-1.3.7-py3-none-any.whl (154 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m154.7/154.7 kB[0m [31m3.4 MB/s[0m eta [

In [None]:
from pyspark.sql import SparkSession

spark = SparkSession.builder.config('spark.jars.packages', 'graphframes:graphframes:0.8.3-spark3.5-s_2.12').appName("tp5").getOrCreate()


In [None]:
bikeStations = spark.read.option("header","true").csv("/content/station_data.csv")
tripData = spark.read.option("header","true").csv("/content/trip_data.csv")

In [None]:
tripData

DataFrame[Trip ID: string, Duration: string, Start Date: string, Start Station: string, Start Terminal: string, End Date: string, End Station: string, End Terminal: string, Bike #: string, Subscriber Type: string, Zip Code: string]

In [None]:
from pyspark.sql.functions import col

tripData = tripData.withColumn("Duration", col("Duration").cast("int"))

In [None]:
tripData

DataFrame[Trip ID: string, Duration: int, Start Date: string, Start Station: string, Start Terminal: string, End Date: string, End Station: string, End Terminal: string, Bike #: string, Subscriber Type: string, Zip Code: string]

In [None]:
stationVertices = bikeStations.withColumnRenamed("name", "id").distinct()
tripEdges = tripData.withColumnRenamed("Start Station", "src").withColumnRenamed("End Station", "dst")

In [None]:
from graphframes import GraphFrame

stationGraph = GraphFrame(stationVertices, tripEdges)

In [None]:
print("Total Number of Stations: " + str(stationGraph.vertices.count()))
print("Total Number of Trips in Graph: " + str(stationGraph.edges.count()))
print("Total Number of Trips in Original Data: " + str(tripData.count()))

Total Number of Stations: 70
Total Number of Trips in Graph: 99
Total Number of Trips in Original Data: 99


In [None]:
# 5 / Nombre de voyages entre chaque source et destination triés par ordre décroissant
trips_count = stationGraph.edges.groupBy("src", "dst").count().orderBy("count", ascending=False)
trips_count.show()

+--------------------+--------------------+-----+
|                 src|                 dst|count|
+--------------------+--------------------+-----+
|San Francisco Cal...|     Townsend at 7th|    4|
|       5th at Howard|San Francisco Cal...|    3|
|San Francisco Cal...|  Powell Street BART|    2|
|     2nd at Townsend|   Market at Sansome|    2|
|     Spear at Folsom|     2nd at Townsend|    2|
|   Market at Sansome|Broadway St at Ba...|    2|
|    Davis at Jackson|Embarcadero at Sa...|    2|
|San Francisco Cal...|   2nd at South Park|    2|
|   Steuart at Market|San Francisco Cal...|    2|
|       Market at 4th|San Francisco Cal...|    2|
|Embarcadero at Fo...|Embarcadero at Sa...|    2|
|Mountain View Cal...|Rengstorff Avenue...|    1|
|    Davis at Jackson|Temporary Transba...|    1|
|       5th at Howard|     Townsend at 7th|    1|
|   Market at Sansome|South Van Ness at...|    1|
|     Beale at Market|Temporary Transba...|    1|
|     2nd at Townsend|Powell at Post (U...|    1|


In [None]:
# 6) Nombre de voyages commençant ou terminant à 'Townsend at 7th' triés par ordre décroissant
townsend_trips = stationGraph.edges.filter("src = 'Townsend at 7th' or dst = 'Townsend at 7th'").groupBy("src", "dst").count().orderBy("count", ascending=False)
townsend_trips.show()

+--------------------+--------------------+-----+
|                 src|                 dst|count|
+--------------------+--------------------+-----+
|San Francisco Cal...|     Townsend at 7th|    4|
|       5th at Howard|     Townsend at 7th|    1|
|     Townsend at 7th|     Spear at Folsom|    1|
|     Townsend at 7th|Harry Bridges Pla...|    1|
|     Spear at Folsom|     Townsend at 7th|    1|
|       Howard at 2nd|     Townsend at 7th|    1|
+--------------------+--------------------+-----+



In [None]:
# 7) Sommets qui n'ont jamais été une destination d'un voyage commençant à partir de 'Spear at Folsom'
SpearFolsomDestinations = stationGraph.edges \
    .where("src = 'Spear at Folsom'") \
    .select("dst") \
    .distinct()

otherStations = stationGraph \
    .edges \
    .select("dst") \
    .distinct() \
    .subtract(SpearFolsomDestinations)

# Affichage des résultats
otherStations.show(1000, False)

+---------------------------------------------+
|dst                                          |
+---------------------------------------------+
|Powell at Post (Union Square)                |
|Market at Sansome                            |
|Spear at Folsom                              |
|San Salvador at 1st                          |
|Rengstorff Avenue / California Street        |
|Civic Center BART (7th at Market)            |
|Mountain View City Hall                      |
|Market at 4th                                |
|Temporary Transbay Terminal (Howard at Beale)|
|Steuart at Market                            |
|San Francisco City Hall                      |
|South Van Ness at Market                     |
|Embarcadero at Bryant                        |
|Mountain View Caltrain Station               |
|Powell Street BART                           |
|Clay at Battery                              |
|Market at 10th                               |
|Broadway St at Battery St              

In [None]:
#8 -  Retourner la station qui a le nombre maximum de voyages entrants.

from pyspark.sql.functions import desc

stationGraph.inDegrees.orderBy(desc("inDegree")).limit(1)\
.show(1, False)

+---------------------------------------+--------+
|id                                     |inDegree|
+---------------------------------------+--------+
|San Francisco Caltrain 2 (330 Townsend)|9       |
+---------------------------------------+--------+



In [None]:
# 9- Retourner le voyage qui a la plus grande durée.
stationGraph.edges.orderBy(desc("Duration")).limit(1).show()

+-------+--------+---------------+--------------------+--------------+---------------+--------------------+------------+------+---------------+--------+
|Trip ID|Duration|     Start Date|                 src|Start Terminal|       End Date|                 dst|End Terminal|Bike #|Subscriber Type|Zip Code|
+-------+--------+---------------+--------------------+--------------+---------------+--------------------+------------+------+---------------+--------+
| 913386|    1808|8/31/2015 20:23|Embarcadero at Br...|            54|8/31/2015 20:53|Harry Bridges Pla...|          50|   524|     Subscriber|   94105|
+-------+--------+---------------+--------------------+--------------+---------------+--------------------+------------+------+---------------+--------+



In [None]:
# 10 - Créer un sous-graphe qui ne contient que les voyages qui se commencent ou se terminent à ‘Townsend at 7th’.

townAnd7thEdges = stationGraph.edges.where("src = 'Townsend at 7th' OR dst = 'Townsend at 7th'")

subgraph = GraphFrame(stationGraph.vertices, townAnd7thEdges)


In [None]:
subgraph.edges.show()
subgraph.vertices.show()

+-------+--------+---------------+--------------------+--------------+---------------+--------------------+------------+------+---------------+--------+
|Trip ID|Duration|     Start Date|                 src|Start Terminal|       End Date|                 dst|End Terminal|Bike #|Subscriber Type|Zip Code|
+-------+--------+---------------+--------------------+--------------+---------------+--------------------+------------+------+---------------+--------+
| 913434|     283|8/31/2015 21:19|San Francisco Cal...|            69|8/31/2015 21:24|     Townsend at 7th|          65|   521|     Subscriber|   94107|
| 913404|     273|8/31/2015 20:39|San Francisco Cal...|            69|8/31/2015 20:44|     Townsend at 7th|          65|   287|     Subscriber|   94107|
| 913382|     645|8/31/2015 20:20|     Townsend at 7th|            65|8/31/2015 20:31|Harry Bridges Pla...|          50|   270|     Subscriber|   94945|
| 913379|     803|8/31/2015 20:15|     Spear at Folsom|            49|8/31/2015 20

In [None]:
# 11- Retourner tous les chemins qui forment un motif en "triangle" entre trois stations.

motifs = stationGraph.find("(a)-[ab]->(b); (b)-[bc]->(c); (c)-[ca]->(a)")
motifs.show(1000, False)

+------------------------------------------------------------------------------------------------+-------------------------------------------------------------------------------------------------------------------------------------------+------------------------------------------------------------------------------------------------+-------------------------------------------------------------------------------------------------------------------------------------------+------------------------------------------------------------------------------------------------+-------------------------------------------------------------------------------------------------------------------------------------------+
|a                                                                                               |ab                                                                                                                                         |b                                                

In [None]:
# 12) Retourner tous les chemins qui passent par trois sommets et qui commencent à partir de 'Townsend at 7th'.

paths = stationGraph.find("(a)-[]->(b); (b)-[]->(c)")

# Filtrage des chemins qui commencent à partir de 'Townsend at 7th'
filtered_paths = paths.filter("a.id = 'Townsend at 7th'")

# Affichage des résultats
filtered_paths.show(truncate=False)

+---------------------------------------------------------------------------+------------------------------------------------------------------------------------------------+---------------------------------------------------------------------------------------------------+
|a                                                                          |b                                                                                               |c                                                                                                  |
+---------------------------------------------------------------------------+------------------------------------------------------------------------------------------------+---------------------------------------------------------------------------------------------------+
|{65, Townsend at 7th, 37.771058, -122.402717, 15, San Francisco, 8/22/2013}|{49, Spear at Folsom, 37.790302, -122.390637, 19, San Francisco, 8/20/2013}                     |{