# Importation des librairies et démarrage de la session

In [52]:
from pyspark.sql import SparkSession
import pyspark
import pyspark.sql.functions as F
spark = SparkSession.builder.getOrCreate()

# Importation des données et formatage des colonnes

In [2]:
#Importation des deux datasets en csv et suppression des colonnes "month" et "year" de trips car doublon d'une autre colonne
df_trips = spark.read.csv("data/Austin bikes/austin_bikeshare_trips.csv", header=True, inferSchema=True).drop("month","year")
df_stations = spark.read.csv("data/Austin bikes/austin_bikeshare_stations.csv", header=True, inferSchema=True)

#Changement de types de données (pour la plupart, passer de double à integer)
df_trips = df_trips.withColumn("bikeid", df_trips["bikeid"].cast("integer"))\
    .withColumn("trip_id", df_trips["trip_id"].cast("integer"))\
    .withColumn("start_station_id", df_trips["start_station_id"].cast("integer"))\
    .withColumn("end_station_id", df_trips["end_station_id"].cast("integer"))\
    .withColumn("start_datetime", F.to_timestamp(df_trips["start_time"])).drop("start_time")

#Création d'une colonne "date" en plus de la colonne "datetime" pour avoir les deux
df_trips = df_trips.withColumn("date", F.to_date(df_trips["start_datetime"]))

In [3]:
df_trips.show(5)

+------+-------------+----------------+--------------+--------------------+----------------+--------------------+--------------------+----------+-------------------+----------+
|bikeid|checkout_time|duration_minutes|end_station_id|    end_station_name|start_station_id|  start_station_name|     subscriber_type|   trip_id|     start_datetime|      date|
+------+-------------+----------------+--------------+--------------------+----------------+--------------------+--------------------+----------+-------------------+----------+
|     8|     19:12:00|              41|          2565|Trinity & 6th Street|            2536|    Waller & 6th St.|             Walk Up|1310148290|2015-03-19 19:12:00|2015-03-19|
|   141|      2:06:04|               6|          2570|South Congress & ...|            2494|      2nd & Congress|            Local365|  12617682|2016-10-30 02:06:04|2016-10-30|
|   578|     16:28:27|              13|          2498|Convention Center...|            2538|Bullock Museum @ ...|  

In [16]:
df_trips.printSchema()

root
 |-- bikeid: integer (nullable = true)
 |-- checkout_time: string (nullable = true)
 |-- duration_minutes: integer (nullable = true)
 |-- end_station_id: integer (nullable = true)
 |-- end_station_name: string (nullable = true)
 |-- start_station_id: integer (nullable = true)
 |-- start_station_name: string (nullable = true)
 |-- subscriber_type: string (nullable = true)
 |-- trip_id: integer (nullable = true)
 |-- start_datetime: timestamp (nullable = true)
 |-- date: date (nullable = true)



# Traitements (Group by)

In [17]:
#Groupement en faisant la moyenne de la durée des trajets par jour
df_avg_duration_per_day = df_trips.groupBy("date","checkout_time","subscriber_type").mean("duration_minutes")
df_avg_duration_per_day.show(5)

+----------+-------------+---------------+---------------------+
|      date|checkout_time|subscriber_type|avg(duration_minutes)|
+----------+-------------+---------------+---------------------+
|2015-03-17|     11:12:00|        Walk Up|    42.24242424242424|
|2015-01-30|     17:12:00|        Local30|                 13.5|
|2015-10-02|     15:12:19|       Local365|                 17.0|
|2016-08-19|     18:47:49|        Walk Up|                 14.0|
|2017-01-01|     16:29:22|        Walk Up|                 28.0|
+----------+-------------+---------------+---------------------+
only showing top 5 rows



In [18]:
#Groupement en comptant le nombre de trajets par jour
df_trips_per_start_station = df_trips.groupBy("date","start_station_name","checkout_time","subscriber_type").count()
df_trips_per_start_station.show(5)

+----------+--------------------+-------------+--------------------+-----+
|      date|  start_station_name|checkout_time|     subscriber_type|count|
+----------+--------------------+-------------+--------------------+-----+
|2015-12-31|Riverside @ S. Lamar|     15:12:58|             Walk Up|    1|
|2015-10-02|Toomey Rd @ South...|     15:12:19|            Local365|    1|
|2017-07-24|      4th & Congress|     14:05:21|           Weekender|    1|
|2016-05-20|Riverside @ S. Lamar|     20:32:12|             Walk Up|    1|
|2014-03-15|      2nd & Congress|     23:12:00|24-Hour Kiosk (Au...|   13|
+----------+--------------------+-------------+--------------------+-----+
only showing top 5 rows



In [19]:
#Groupement en comptant le nombre de trajets par jour
df_trips_per_end_station = df_trips.groupBy("date","end_station_name","checkout_time","subscriber_type").count()
df_trips_per_end_station.show(5)

+----------+--------------------+-------------+--------------------+-----+
|      date|    end_station_name|checkout_time|     subscriber_type|count|
+----------+--------------------+-------------+--------------------+-----+
|2015-11-13|      2nd & Congress|     17:12:31|             Walk Up|    1|
|2015-04-02|Barton Springs @ ...|     20:12:00|             Walk Up|    5|
|2015-05-17|UT West Mall @ Gu...|      2:12:00|             Walk Up|    4|
|2014-05-17|      4th & Congress|     16:12:00|24-Hour Kiosk (Au...|    1|
|2015-03-15|East 6th at Rober...|     17:12:00|            Local365|    1|
+----------+--------------------+-------------+--------------------+-----+
only showing top 5 rows



# Output

In [28]:
#On exporte les datasets modifiés et créés en csv
df_trips.repartition(1).write.format('csv').mode('overwrite').option("header", "true").save('data/Output/df_trip.csv')

df_avg_duration_per_day.repartition(1).write.format('csv').mode('overwrite').option("header", "true").save('data/Output/df_avg_duration_day.csv')

df_trips_per_start_station.repartition(1).write.format('csv').mode('overwrite').option("header", "true").save('data/Output/df_trips_start_station.csv')

df_trips_per_end_station.repartition(1).write.format('csv').mode('overwrite').option("header", "true").save('data/Output/df_trips_end_station.csv')

df_stations.repartition(1).write.format('csv').mode('overwrite').option("header", "true").save('data/Output/df_station.csv')

# Groupby table Date Unique


In [29]:
## Table unique de date
df_Date = df_trips.select("date")
df_Date.show(5)
df_Date.dropDuplicates().show(10)

+----------+
|      date|
+----------+
|2015-03-19|
|2016-10-30|
|2016-03-11|
|2014-11-23|
|2017-04-16|
+----------+
only showing top 5 rows

+----------+
|      date|
+----------+
|2014-09-26|
|2016-03-01|
|2015-05-19|
|2014-11-12|
|2015-03-09|
|2017-01-06|
|2015-03-06|
|2016-07-26|
|2015-04-09|
|2016-10-03|
+----------+
only showing top 10 rows



In [30]:
## Table unique de date
df_heure = df_trips.select("checkout_time")
df_heure.show(5)
df_heure.dropDuplicates().show(10)

+-------------+
|checkout_time|
+-------------+
|     19:12:00|
|      2:06:04|
|     16:28:27|
|     15:12:00|
|     15:39:13|
+-------------+
only showing top 5 rows

+-------------+
|checkout_time|
+-------------+
|     18:54:06|
|     12:45:09|
|     19:15:02|
|     10:12:05|
|     14:06:41|
|     16:10:56|
|     19:43:39|
|     15:08:58|
|     18:09:52|
|     15:00:39|
+-------------+
only showing top 10 rows



In [31]:
## Table unique de subscriber
df_sub = df_trips.select("subscriber_type")
df_sub.show(5)
df_sub.dropDuplicates().show(10)

+--------------------+
|     subscriber_type|
+--------------------+
|             Walk Up|
|            Local365|
|            Local365|
|24-Hour Kiosk (Au...|
|             Walk Up|
+--------------------+
only showing top 5 rows

+--------------------+
|     subscriber_type|
+--------------------+
|              Annual|
|24-Hour Membershi...|
| Semester Membership|
|Annual (Madison B...|
|Annual (Cincy Red...|
|Annual (Nashville...|
|Annual Membership...|
|24-Hour-Online (A...|
|     Founding Member|
|Membership: pay o...|
+--------------------+
only showing top 10 rows



# Outputs

In [34]:
## exporter les data
df_Date.repartition(1).write.format('csv').mode('overwrite').option("header", "true").save('data/Output/df_Date.csv')
df_heure.repartition(1).write.format('csv').mode('overwrite').option("header", "true").save('data/Output/df_heure.csv')
df_sub.repartition(1).write.format('csv').mode('overwrite').option("header", "true").save('data/Output/df_sub.csv')

# Weather

In [46]:
df_weather = spark.read.csv("data/Austin weather/austin_weather.csv", header=True, inferSchema=True)
df_weather.show(5)

+----------+---------+--------+--------+-------------+------------+------------+-------------------+------------------+------------------+--------------------------+-------------------------+-------------------------+-------------------+------------------+------------------+-----------+----------+-----------+----------------------+-------------------+
|      Date|TempHighF|TempAvgF|TempLowF|DewPointHighF|DewPointAvgF|DewPointLowF|HumidityHighPercent|HumidityAvgPercent|HumidityLowPercent|SeaLevelPressureHighInches|SeaLevelPressureAvgInches|SeaLevelPressureLowInches|VisibilityHighMiles|VisibilityAvgMiles|VisibilityLowMiles|WindHighMPH|WindAvgMPH|WindGustMPH|PrecipitationSumInches|             Events|
+----------+---------+--------+--------+-------------+------------+------------+-------------------+------------------+------------------+--------------------------+-------------------------+-------------------------+-------------------+------------------+------------------+-----------+-----

In [47]:
df_weather = spark.read.csv("data/Austin weather/austin_weather.csv", header=True, inferSchema=True)\
.select("Date", "TempAvgF", "PrecipitationSumInches", "Events")
df_weather.show(5)

+----------+--------+----------------------+-------------------+
|      Date|TempAvgF|PrecipitationSumInches|             Events|
+----------+--------+----------------------+-------------------+
|2013-12-21|      60|                  0.46|Rain , Thunderstorm|
|2013-12-22|      48|                     0|                   |
|2013-12-23|      45|                     0|                   |
|2013-12-24|      46|                     0|                   |
|2013-12-25|      50|                     T|                   |
+----------+--------+----------------------+-------------------+
only showing top 5 rows



In [48]:
#Conversion des pouces et fahrenheint en cm et celsius
df_weather = df_weather.withColumn("Temp_Avg_Celsius", (F.col("TempAvgF")-32)*(5/9))\
    .withColumn("Precipitations_cm", F.col("PrecipitationSumInches")*2.54 )
df_weather = df_weather.drop("PrecipitationSumInches","TempAvgF")

In [49]:
spark.conf.set('spark.sql.caseSensitive', True)
df_trips_per_day = df_trips.groupBy("date").count()
df_join = df_weather.join(df_trips_per_day,df_weather.Date == df_trips_per_day.date,"inner")
df_join = df_join.drop("Date")
df_join.show(10)

+-------------------+-------------------+-------------------+----------+-----+
|             Events|   Temp_Avg_Celsius|  Precipitations_cm|      date|count|
+-------------------+-------------------+-------------------+----------+-----+
|                   | 24.444444444444446|                0.0|2014-09-26|  503|
|                   |  21.11111111111111|                0.0|2016-03-01|  411|
|               Rain|  27.77777777777778|               null|2015-05-19|  438|
|                   |  6.666666666666667|                0.0|2014-11-12|  164|
|               Rain|  11.11111111111111|             5.5118|2015-03-09|   66|
|                   |-1.6666666666666667|                0.0|2017-01-06|  107|
|                   |  5.555555555555555|                0.0|2015-03-06|  354|
|Rain , Thunderstorm|  28.88888888888889|0.20320000000000002|2016-07-26|  439|
|                   |  26.11111111111111|                0.0|2015-04-09|  503|
|                   |  23.88888888888889|           

In [50]:
df_join.repartition(1).write.format('csv').mode('overwrite').option("header", "true").save('data/Output/df_weather.csv')