# Run all

In [1]:
from pyspark.sql import SparkSession
from pyspark.sql import DataFrame
from pyspark.sql.functions import col, avg, to_date, count, desc, month, year, row_number, expr
from pyspark.sql.window import Window
import os

In [2]:
# середня тривалість поїздок на день
def avg_trip_duration_per_day(df) -> DataFrame:
    # Виділяємо дату
    df = df.withColumn("start_date", to_date(col("start_time"), "MM/dd/yyyy HH:mm"))

    # групуємо за start_date, агрегую(agg) визначаючи середнє тривалості поїздок, результат в нову колонку average_trip_duration
    result = df.groupBy("start_date").agg(avg("tripduration").alias("average_trip_duration"))

    # замість Hadoop, вирішив конвертувати в Pandas і вже за допомогою нього зберігати в csv
    result.toPandas().to_csv("./out-local/avg_trip_duration_per_day.csv", index=False)
    return result

In [3]:
# кількість поїздок на день
def trips_per_day(df) -> DataFrame:
    # Виділяємо дату
    df = df.withColumn("start_date", to_date(col("start_time"), "MM/dd/yyyy HH:mm"))

    # групуємо за start_date, агрегуємо(agg) визначаючи кількість поїздок на день, результат в нову колонку trip_count
    result = df.groupBy("start_date").agg(count("trip_id").alias("trip_count"))

    # збереження в csv
    result.toPandas().to_csv("./out-local/trips_per_day.csv", index=False)
    return result

In [4]:
# найпопулярніша початкова станція для кожного місяця
def most_popular_start_station_by_month(df):
    # Виділяємо рік і місяць
    df = df.withColumn("month", month(to_date(col("start_time"), "MM/dd/yyyy HH:mm")))
    df = df.withColumn("year", year(to_date(col("start_time"), "MM/dd/yyyy HH:mm")))

    # Рахуємо кількість поїздок з кожної станції в кожному місяці
    grouped = df.groupBy("year", "month", "from_station_name").agg(count("*").alias("trip_count"))
    
    # Вікно для ранжування поїздок по кожному місяцю
    window_spec = Window.partitionBy("year", "month").orderBy(col("trip_count").desc())

    # Додаємо ранг і відбираємо лише топ-1 для кожного місяця
    result = grouped.withColumn("rank", row_number().over(window_spec)).filter(col("rank") == 1).drop("rank")
    
    # збереження в csv
    result.toPandas().to_csv("./out-local/popular_stations_by_month.csv", index=False)

    return result

In [5]:
def top_3_stations_for_day_since_two_week(df) -> DataFrame:
    # Виділяємо дату
    df = df.withColumn("start_date", to_date(col("start_time"), "MM/dd/yyyy HH:mm"))

    # визначаємо останню дату з датасету
    latest_date = df.select(col("start_date")).distinct().orderBy(desc("start_date")).limit(1).collect()[0][0]

    # визначаємо дату, яка на 14 днів раніше від latest_date
    last_two_weeks = df.filter(col("start_date") >= latest_date - expr("INTERVAL 14 DAYS")) 
    
    # групуємо по start_date, from_station_name, обчислюємо кількість поїздок (trip_id) для кожної станції за кожен день
    top_3_stations = last_two_weeks.groupBy("start_date", "from_station_name").agg(count("trip_id").alias("count"))

     # використовуємо той самий прийом з вікном і розділом на групи по даті, потім ті групи сортуємо за спаданням по кількості поїздок count
    top_3_stations = top_3_stations.withColumn("popularity_top", row_number().over(Window.partitionBy("start_date").orderBy(desc("count"))))

    # перші три по популярності
    result = top_3_stations.filter(col("popularity_top") <= 3)

    # збереження в csv
    result.toPandas().to_csv("./out-local/top_3_stations_for_day_since_two_week.csv", index=False)
    return result

In [6]:
# середня тривалість поїздок за статтю
def avg_duration_by_gender(df) -> DataFrame:
    # групуємо за статтю і визначаємо середнє поїздок
    result = df.groupBy("gender").agg(avg("tripduration").alias("average_duration"))

    # збереження в csv
    result.toPandas().to_csv("./out-local/avg_duration_by_gender.csv", index=False)
    return result

In [9]:
spark = SparkSession.builder \
    .appName("Journey Info") \
    .getOrCreate()
    
df = spark.read.csv("../jobs/Divvy_Trips_2019_Q4.csv", header=True, inferSchema=True)
os.makedirs("./out-local", exist_ok=True)

In [10]:
result_avg_trip_duration_per_day = avg_trip_duration_per_day(df)
result_avg_trip_duration_per_day.show()

+----------+---------------------+
|start_date|average_trip_duration|
+----------+---------------------+
|2019-10-05|    529.6975832789027|
|2019-10-01|    519.1034563470391|
|2019-10-04|    502.3697794462694|
|2019-10-02|    489.1325637447672|
|2019-10-06|    540.2795057520239|
|2019-10-07|    515.5357023690357|
|2019-10-03|    507.1099269445638|
|2019-10-08|    515.4543694020819|
|2019-10-14|   497.98104315039797|
|2019-10-12|    500.3075485799701|
|2019-10-10|   505.07017693819984|
|2019-10-13|    522.7091085584857|
|2019-10-11|    475.8004881025015|
|2019-10-09|    512.1497682738434|
|2019-10-21|    496.8957780725022|
|2019-10-15|   495.01938976377954|
|2019-10-20|    519.4521739130435|
|2019-10-17|     492.546112804878|
|2019-10-16|    489.1441823744202|
|2019-10-18|   501.65576186265844|
+----------+---------------------+
only showing top 20 rows



In [11]:
result_trips_per_day = trips_per_day(df)
result_trips_per_day.show()

+----------+----------+
|start_date|trip_count|
+----------+----------+
|2019-10-05|     10452|
|2019-10-01|     18425|
|2019-10-04|     14570|
|2019-10-02|      9882|
|2019-10-06|     13396|
|2019-10-07|     17256|
|2019-10-03|     15647|
|2019-10-08|     17537|
|2019-10-14|     13785|
|2019-10-12|      8702|
|2019-10-10|     15795|
|2019-10-13|     10533|
|2019-10-11|      8016|
|2019-10-09|     17226|
|2019-10-21|     11757|
|2019-10-15|     13297|
|2019-10-20|     10419|
|2019-10-17|     13635|
|2019-10-16|     12886|
|2019-10-18|     14096|
+----------+----------+
only showing top 20 rows



In [12]:
result_most_popular_start_station_by_month = most_popular_start_station_by_month(df)
result_most_popular_start_station_by_month.show()

+----+-----+-------------------+----------+
|year|month|  from_station_name|trip_count|
+----+-----+-------------------+----------+
|2019|   10|Canal St & Adams St|      6564|
|2019|   11|Canal St & Adams St|      3445|
|2019|   12|Canal St & Adams St|      2928|
+----+-----+-------------------+----------+



In [13]:
result_top_3_stations_for_day_since_two_week = top_3_stations_for_day_since_two_week(df)
result_top_3_stations_for_day_since_two_week.show()

+----------+--------------------+-----+--------------+
|start_date|   from_station_name|count|popularity_top|
+----------+--------------------+-----+--------------+
|2019-12-17| Canal St & Adams St|  153|             1|
|2019-12-17|Clinton St & Madi...|  144|             2|
|2019-12-17|Clinton St & Wash...|  124|             3|
|2019-12-18| Canal St & Adams St|  123|             1|
|2019-12-18|Clinton St & Madi...|  115|             2|
|2019-12-18|Clinton St & Wash...|   94|             3|
|2019-12-19| Canal St & Adams St|  133|             1|
|2019-12-19|Clinton St & Madi...|  123|             2|
|2019-12-19|Clinton St & Wash...|   95|             3|
|2019-12-20| Canal St & Adams St|  131|             1|
|2019-12-20|Clinton St & Wash...|  109|             2|
|2019-12-20|Clinton St & Madi...|   94|             3|
|2019-12-21|Streeter Dr & Gra...|   63|             1|
|2019-12-21|Kingsbury St & Ki...|   47|             2|
|2019-12-21|Wells St & Concor...|   46|             3|
|2019-12-2

In [14]:
result_avg_duration_by_gender = avg_duration_by_gender(df)
result_avg_duration_by_gender.show()

+------+-----------------+
|gender| average_duration|
+------+-----------------+
|  NULL|611.8951609483015|
|Female|509.8082474784837|
|  Male|478.6205057415161|
+------+-----------------+



In [15]:
spark.stop()