In [99]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import count, weekofyear, year

In [100]:
spark = (SparkSession
    .builder
    .master('local')
    .config("spark.jars", "/usr/local/spark/resources/jars/postgresql-42.3.3.jar")
    .getOrCreate()
)

In [101]:
df_trips = spark.read.format("jdbc")\
    .option("driver", "org.postgresql.Driver")\
        .option("url", "jdbc:postgresql://postgres/postgres")\
        .option("user", "airflow")\
        .option("password", "airflow")\
    .option("dbtable", "trips")\
    .load()

In [102]:
df_trips.show()

+---+---------+-----------------+-----------------+-------------------+-------------------+-------------------+---------+
| id|id_region|   origin_coord_x|   origin_coord_y|destination_coord_x|destination_coord_y|           datetime|id_source|
+---+---------+-----------------+-----------------+-------------------+-------------------+-------------------+---------+
| 80|        1|14.47912398994219|50.11781434726048|  14.54894833778109|  50.01480664304733|2018-05-19 09:07:17|        4|
| 78|        1|14.59545724636152| 50.0524554235424|  14.48321912885263|  50.06509529578676|2018-05-09 04:23:26|        4|
| 74|        1|14.65754935346634|50.11820315220623|  14.48378121129139|  50.09770540993485|2018-05-10 09:38:12|        4|
| 65|        1|14.34611560109357|50.02251748571683|  14.34790601885638|  50.11666616118271|2018-05-14 23:16:56|        4|
| 54|        1|14.37338615486802|50.06537556739956|  14.54081985518606|  50.08790619445585|2018-05-10 23:04:58|        4|
| 43|        1|14.495810

In [None]:
# Trips with similar origin, destination, and time of day should be grouped togethe

In [103]:
df_group_trips = (df_trips
                  .select(
                      "origin_coord_x", 
                      "origin_coord_y", 
                      "destination_coord_x", 
                      "destination_coord_y", 
                      "datetime"
                  ).groupBy(
                      "origin_coord_x", 
                      "origin_coord_y", 
                      "destination_coord_x", 
                      "destination_coord_y", 
                      "datetime"
                  ).agg(
                      count('*')
                      .alias("trips_quantity")
                  )
                 )

In [104]:
df_group_trips.show()

+-----------------+-----------------+-------------------+-------------------+-------------------+--------------+
|   origin_coord_x|   origin_coord_y|destination_coord_x|destination_coord_y|           datetime|trips_quantity|
+-----------------+-----------------+-------------------+-------------------+-------------------+--------------+
|14.31840390580184|50.08466755143189|  14.55762488630385|  50.08502385365859|2018-05-19 23:08:09|             1|
|10.04884628340258|53.51430547831718|  9.805977128944804|  53.56924626552725|2018-05-21 19:13:24|             1|
|7.558864701837288|45.11540031667295|  7.770328470181719|  45.05433872971169|2018-05-01 17:42:05|             1|
|7.541509189114433|45.09160503827746|   7.74528653441973|  45.02628598341506|2018-05-06 09:49:16|             1|
|14.53300650375239|50.12502233477432|  14.66560032530535|   50.0869602482622|2018-05-23 01:07:33|             1|
|10.17378993097742| 53.5467336774148|  10.21529787231755|  53.50485266884467|2018-05-21 04:06:11

In [None]:
# weekly average number of trips for an area, defined by a region.


In [117]:
df_trips_time = (df_trips
                 .withColumn("datetime", 
                             col("datetime")
                             .cast("timestamp")
                            )
                )
df_trips_week = (df_trips_time
                .withColumn("week",
                           weekofyear(col("datetime"))
                           )
                .withColumn("year",
                           year(col("datetime"))
                           )
                )


In [155]:
df_trips_week.show()

+---+---------+-----------------+-----------------+-------------------+-------------------+-------------------+---------+----+----+
| id|id_region|   origin_coord_x|   origin_coord_y|destination_coord_x|destination_coord_y|           datetime|id_source|week|year|
+---+---------+-----------------+-----------------+-------------------+-------------------+-------------------+---------+----+----+
| 80|        1|14.47912398994219|50.11781434726048|  14.54894833778109|  50.01480664304733|2018-05-19 09:07:17|        4|  20|2018|
| 78|        1|14.59545724636152| 50.0524554235424|  14.48321912885263|  50.06509529578676|2018-05-09 04:23:26|        4|  19|2018|
| 74|        1|14.65754935346634|50.11820315220623|  14.48378121129139|  50.09770540993485|2018-05-10 09:38:12|        4|  19|2018|
| 65|        1|14.34611560109357|50.02251748571683|  14.34790601885638|  50.11666616118271|2018-05-14 23:16:56|        4|  20|2018|
| 54|        1|14.37338615486802|50.06537556739956|  14.54081985518606|  50.

In [153]:
df_count_trips_week = (df_trips_week
                       .select(
                           "week", 
                           "year", 
                           "id_region", 
                           "datetime"
                       ).groupBy(
                           "week", 
                           "year", 
                           "id_region"
                       ).agg(
                           count("datetime").alias("week_trips")
                       )
                      )

In [154]:
df_count_trips_week.show()

+----+----+---------+----------+
|week|year|id_region|week_trips|
+----+----+---------+----------+
|  22|2018|        1|         3|
|  20|2018|        1|         8|
|  21|2018|        3|        14|
|  19|2018|        2|         6|
|  19|2018|        3|         6|
|  21|2018|        2|         7|
|  20|2018|        3|         6|
|  18|2018|        3|         8|
|  22|2018|        2|         5|
|  20|2018|        2|         5|
|  19|2018|        1|         8|
|  21|2018|        1|         5|
|  18|2018|        2|         5|
|  22|2018|        3|         4|
|  18|2018|        1|        10|
+----+----+---------+----------+



In [151]:
df_avg_trips_week = (df_count_trips_week
                     .select(
                         "year", 
                         "id_region", 
                         "week_trips"
                     ).groupBy(
                         "year", 
                         "id_region"
                     ).agg(
                         avg("week_trips").alias("avg_week_trips")
                     )
                    )

In [152]:
df_avg_trips_week.show(truncate=False)

+----+---------+--------------+
|year|id_region|avg_week_trips|
+----+---------+--------------+
|2018|3        |7.6           |
|2018|1        |6.8           |
|2018|2        |5.6           |
+----+---------+--------------+

