Dans ce TP, nous considérons des trajets en vélo partagé (similaire au vélib) en Californie. Deux jeux de données sont fournis : l'un qui contient les stations de vélo, l'autre, les trajets à vélo. Les déplacements à vélo se font d'une station à l'autre.

Charger le fichier stationData.csv dans un DataFrame station_df et le fichier tripData.csv dans un DataFrame trip_df. Pour chaque Dataframe, il vous faudra demander une inférence des schémas et indiquer que la première ligne est un en-tête.

In [0]:
spark

In [0]:
station_df = spark.read.option(
        "header", "true"
    ).option(
        "inferSchema", "true"
    ).csv(
        "/FileStore/tables/stationData.csv"
    )

trip_df = spark.read.option(
        "header", "true"
    ).option(
        "inferSchema", "true"
    ).csv(
        "/FileStore/tables/tripData.csv"
    )

Afficher les schémas des 2 DataFrames.

In [0]:
station_df.printSchema()
trip_df.printSchema()

root
 |-- station_id: integer (nullable = true)
 |-- name: string (nullable = true)
 |-- lat: double (nullable = true)
 |-- long: double (nullable = true)
 |-- dockcount: integer (nullable = true)
 |-- landmark: string (nullable = true)
 |-- installation: date (nullable = true)

root
 |-- TripID: integer (nullable = true)
 |-- Duration: integer (nullable = true)
 |-- StartDate: string (nullable = true)
 |-- StartStation: string (nullable = true)
 |-- StartTerminal: integer (nullable = true)
 |-- EndDate: string (nullable = true)
 |-- EndStation: string (nullable = true)
 |-- EndTerminal: integer (nullable = true)
 |-- Bike#: integer (nullable = true)
 |-- SubscriberType: string (nullable = true)
 |-- ZipCode: string (nullable = true)



In [0]:
# Votre code ici

Créer une vue pour chaque DataFrame.

In [0]:
station_df.createOrReplaceTempView("station_view")
trip_df.createOrReplaceTempView("trip_view")

Trouver deux façons de calculer le nombre de trajets, l'une en appelant une méthode sur trip_df directement, l'autre en rédigeant une requête SQL de la vue correspondant au DataFrame tripData.

In [0]:
print(trip_df.count())
spark.sql("SELECT count(*) FROM trip_view").show()

354152
+--------+
|count(1)|
+--------+
|  354152|
+--------+



Ecrire une requête permettant de compter le nombre de trajets qui démarrent et se terminent à la même station.

In [0]:
spark.sql("SELECT count(*) FROM trip_view WHERE StartStation=EndStation").show()

+--------+
|count(1)|
+--------+
|   10276|
+--------+



On souhaite désormais obtenir l’id des stations associées à ces trajets. Ecrire une requête renvoyant la liste des terminaux concernés ainsi que le nombre de trajets pour chacun de ces terminaux. Trier le résultat par ordre décroissant de nombre de trajets.
<br>Exemple de sortie :
<br>+--------+--------+
<br>|terminal|count(1)|
<br>+--------+--------+
<br>| 60| 850|
<br>| 50| 708|
<br>| 35| 348|
<br>| 76| 320|
<br>| 74| 307|
<br>(La station 60 est la plus concernée par ces trajets cycliques, avec 850 de ces trajets.)

In [0]:
spark.sql(
  """
  SELECT count(*), StartTerminal as terminal
  FROM trip_view
  WHERE StartStation=EndStation
  GROUP BY StartTerminal
  ORDER BY count(*) DESC
  """
).show()

+--------+--------+
|count(1)|terminal|
+--------+--------+
|     850|      60|
|     708|      50|
|     348|      35|
|     320|      76|
|     307|      74|
|     296|      39|
|     280|      61|
|     277|      67|
|     268|      71|
|     260|      70|
|     254|      28|
|     248|      48|
|     230|      54|
|     227|      69|
|     213|      42|
|     200|      73|
|     197|      57|
|     194|      64|
|     189|       3|
|     181|      72|
+--------+--------+
only showing top 20 rows



Dans la requête précédente, nous avons oublié un élément qui nous importe. Nous souhaitons compléter le résultat en indiquant le nombre de docks (dockcount) des stations concernées.
<br>Exemple de sortie :
<br>+--------+---------+--------+
<br>|terminal|dockcount|count(1)|
<br>+--------+---------+--------+
<br>| 60| 15| 850|
<br>| 50| 23| 708|
<br>| 35| 11| 348|
<br>| 76| 19| 320|
<br>| 74| 23| 307|
<br>Mettre à jour la requête.

In [0]:
display(station_df)

station_id,name,lat,long,dockcount,landmark,installation
2,San Jose Diridon Caltrain Station,37.329732,-121.901782,27,San Jose,2013-08-06
3,San Jose Civic Center,37.330698,-121.888979,15,San Jose,2013-08-05
4,Santa Clara at Almaden,37.333988,-121.894902,11,San Jose,2013-08-06
5,Adobe on Almaden,37.331415,-121.8932,19,San Jose,2013-08-05
6,San Pedro Square,37.336721,-121.894074,15,San Jose,2013-08-07
7,Paseo de San Antonio,37.333798,-121.886943,15,San Jose,2013-08-07
8,San Salvador at 1st,37.330165,-121.885831,15,San Jose,2013-08-05
9,Japantown,37.348742,-121.894715,15,San Jose,2013-08-05
10,San Jose City Hall,37.337391,-121.886995,15,San Jose,2013-08-06
11,MLK Library,37.335885,-121.88566,19,San Jose,2013-08-06


In [0]:
spark.sql(
  """
  SELECT count(*), StartTerminal as terminal, dockcount
  FROM trip_view, station_view
  WHERE StartStation=EndStation AND StartTerminal=station_id
  GROUP BY StartTerminal, dockcount
  ORDER BY count(*) DESC
  """
).show()

+--------+--------+---------+
|count(1)|terminal|dockcount|
+--------+--------+---------+
|     850|      60|       15|
|     708|      50|       23|
|     348|      35|       11|
|     320|      76|       19|
|     307|      74|       23|
|     296|      39|       19|
|     280|      61|       27|
|     277|      67|       27|
|     268|      71|       19|
|     260|      70|       19|
|     254|      28|       23|
|     248|      48|       15|
|     230|      54|       15|
|     227|      69|       23|
|     213|      42|       15|
|     200|      73|       15|
|     197|      57|       15|
|     194|      64|       15|
|     189|       3|       15|
|     181|      72|       23|
+--------+--------+---------+
only showing top 20 rows



Rédiger les 2 requêtes précédentes avec le DSL de DataFrame.

In [0]:
from pyspark.sql.functions import col, desc

In [0]:
spark.sql(
  """
  SELECT count(*), StartTerminal as terminal
  FROM trip_view
  WHERE StartStation=EndStation
  GROUP BY StartTerminal
  ORDER BY count(*) DESC
  """
).show()

In [0]:
trip_df.filter(
    col("StartStation") == col("EndStation")
).groupBy(
    "StartTerminal"
).count(
).orderBy(
    "count", ascending=False
).show()

+-------------+-----+
|StartTerminal|count|
+-------------+-----+
|           60|  850|
|           50|  708|
|           35|  348|
|           76|  320|
|           74|  307|
|           39|  296|
|           61|  280|
|           67|  277|
|           71|  268|
|           70|  260|
|           28|  254|
|           48|  248|
|           54|  230|
|           69|  227|
|           42|  213|
|           73|  200|
|           57|  197|
|           64|  194|
|            3|  189|
|           72|  181|
+-------------+-----+
only showing top 20 rows



In [0]:
final_count_df = trip_df.filter(
    col("StartStation") == col("EndStation")
).join(
    station_df,
    trip_df["StartTerminal"] == station_df["station_id"],
).groupBy(
    "StartTerminal",
    "dockcount",
).count(
).orderBy(
    "count", ascending=False
)

Observer le plan d’exécution des requêtes (avec df.explain()).

In [0]:
final_count_df.explain()

== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- == Current Plan ==
   Sort [count#1058L DESC NULLS LAST], true, 0
   +- ShuffleQueryStage 2, Statistics(sizeInBytes=12.8 KiB, rowCount=544, isRuntime=true)
      +- Exchange rangepartitioning(count#1058L DESC NULLS LAST, 200), ENSURE_REQUIREMENTS, [plan_id=2252]
         +- *(3) Project [StartTerminal#200, dockcount#169, count(1)#1057L AS count#1058L]
            +- *(3) HashAggregate(keys=[StartTerminal#200, dockcount#169], functions=[finalmerge_count(merge count#1063L) AS count(1)#1057L])
               +- AQEShuffleRead coalesced
                  +- ShuffleQueryStage 1, Statistics(sizeInBytes=17.0 KiB, rowCount=544, isRuntime=true)
                     +- Exchange hashpartitioning(StartTerminal#200, dockcount#169, 200), ENSURE_REQUIREMENTS, [plan_id=2216]
                        +- *(2) HashAggregate(keys=[StartTerminal#200, dockcount#169], functions=[merge_count(merge count#1063L) AS count#1063L])
                         

In [0]:
final_count_df.show()
final_count_df.explain()


+-------------+---------+-----+
|StartTerminal|dockcount|count|
+-------------+---------+-----+
|           60|       15|  850|
|           50|       23|  708|
|           35|       11|  348|
|           76|       19|  320|
|           74|       23|  307|
|           39|       19|  296|
|           61|       27|  280|
|           67|       27|  277|
|           71|       19|  268|
|           70|       19|  260|
|           28|       23|  254|
|           48|       15|  248|
|           54|       15|  230|
|           69|       23|  227|
|           42|       15|  213|
|           73|       15|  200|
|           57|       15|  197|
|           64|       15|  194|
|            3|       15|  189|
|           72|       23|  181|
+-------------+---------+-----+
only showing top 20 rows

== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=true
+- == Final Plan ==
   Sort [count#1058L DESC NULLS LAST], true, 0
   +- AQEShuffleRead coalesced
      +- ShuffleQueryStage 2, Statistics(sizeInBytes=