# Imports

In [1]:
from pyspark.sql.types import StructType, StructField
from pyspark.sql.types import IntegerType, StringType, TimestampType

from pyspark.sql import functions as F
from pyspark.sql.window import Window as W


# Loading data

First, we define the data schema:

In [2]:
schema = StructType([
    StructField('rental_id', IntegerType(), True),
    StructField('duration', IntegerType(), True),
    StructField('bike_id', IntegerType(), True),
    StructField('end_date', TimestampType(), True),
    StructField('end_station_id', IntegerType(), True),
    StructField('end_station_name', StringType(), True),
    StructField('start_date', TimestampType(), True),
    StructField('start_station_id', IntegerType(), True),
    StructField('start_station_name', StringType(), True),
    StructField('end_station_logical_terminal', IntegerType(), True),
    StructField('start_station_logical_terminal', IntegerType(), True),
    StructField('end_station_priority_id', IntegerType(), True)
])


Next, we load the data:

In [3]:
data = spark.read \
    .options(header='true', enforceSchema='false', timestampFormat="yyyy-MM-dd HH:mm:ss zzz") \
    .schema(schema) \
    .csv('/input/assignments/london_bicycles/london_bicycles_hire.csv')

data.printSchema()


root
 |-- rental_id: integer (nullable = true)
 |-- duration: integer (nullable = true)
 |-- bike_id: integer (nullable = true)
 |-- end_date: timestamp (nullable = true)
 |-- end_station_id: integer (nullable = true)
 |-- end_station_name: string (nullable = true)
 |-- start_date: timestamp (nullable = true)
 |-- start_station_id: integer (nullable = true)
 |-- start_station_name: string (nullable = true)
 |-- end_station_logical_terminal: integer (nullable = true)
 |-- start_station_logical_terminal: integer (nullable = true)
 |-- end_station_priority_id: integer (nullable = true)



# Filtering data

Now, we will filter out data not compliant with the task's requirements

In [4]:
min_duration = 10 * 60  # duration has to be at least 10 minutes
not_nullable_columns = ['duration', 'start_station_id',
                        'start_station_name', 'end_station_id', 'end_station_name']

filtered_data = data \
    .where((F.year(data.start_date) == 2016) & (F.year(data.end_date) == 2016)) \
    .na.drop(subset=not_nullable_columns) \
    .where(data.duration >= min_duration) \
    .where(F.dayofyear(data.start_date) == F.dayofyear(data.end_date)) \
    .where(data.start_station_id != data.end_station_id)

# now we can drop unnecessary columns and store the month and the day of the week instead of timestamps
# since this is all we actually need
filtered_data = filtered_data.select(
    F.month(filtered_data.start_date).alias('month'),
    F.date_format(filtered_data.start_date, "u").cast('int').alias('day_of_week'),
    filtered_data.duration,
    filtered_data.start_station_id,
    filtered_data.start_station_name,
    filtered_data.end_station_id,
    filtered_data.end_station_name
)


Now we have to organize ordered start/end stations into unordered routes.

To do this, we will store the stations as columns A and B.
Column A will contain the station that has a smaller id while column B will store the other one

In [5]:
route_data = filtered_data.select(
    filtered_data.month, filtered_data.day_of_week, filtered_data.duration,
    F.least(filtered_data.start_station_id, filtered_data.end_station_id).alias('station_A_id'),
    F.greatest(filtered_data.start_station_id, filtered_data.end_station_id).alias('station_B_id')
).cache()

We can also cache data about station names and their ids, so we can join later with the final result to get station ids to names

In [11]:
start_stations = filtered_data.select(
    filtered_data.start_station_id.alias('id'),
    filtered_data.start_station_name.alias('name')
)
end_stations = filtered_data.select(
    filtered_data.end_station_id.alias('id'),
    filtered_data.end_station_name.alias('name')
)

station_data = start_stations.union(end_stations).distinct()

And now we can filter out routes with less than a 100 records in a given month

In [12]:
monthly_route_window = W.partitionBy(
    route_data.month, route_data.station_A_id, route_data.station_B_id)

counted_monthly_route_data = route_data.select(
    '*', F.count('*').over(monthly_route_window).alias('trips_this_month'))

filtered_data = counted_monthly_route_data.where(counted_monthly_route_data.trips_this_month >= 100)
filtered_data = filtered_data.drop('trips_this_month')

And so we have successfully filtered the data. Now, we can cache the result

In [13]:
route_data = filtered_data.cache()

# Queries

Our initial filtered data for querying looks as follows:

In [14]:
print('route_data schema:')
route_data.printSchema()
print('station_data schema:')
station_data.printSchema()

route_data schema:
root
 |-- month: integer (nullable = true)
 |-- day_of_week: integer (nullable = true)
 |-- duration: integer (nullable = true)
 |-- station_A_id: integer (nullable = true)
 |-- station_B_id: integer (nullable = true)

station_data schema:
root
 |-- id: integer (nullable = true)
 |-- name: string (nullable = true)



Now we will calculate average durations for weekdays and for weekends

In [15]:
weekdays_monthly_avgs = route_data.where(route_data.day_of_week < 6) \
    .groupBy(route_data.month, route_data.station_A_id, route_data.station_B_id) \
    .agg(F.avg('duration').alias('weekday_avg'))

weekend_monthly_avgs = route_data.where(route_data.day_of_week >= 6) \
    .groupBy(route_data.month, route_data.station_A_id, route_data.station_B_id) \
    .agg(F.avg('duration').alias('weekend_avg'))

print(f"Weekdays averages count: {weekdays_monthly_avgs.count()}")
print(f"Weekend averages count: {weekend_monthly_avgs.count()}")

Weekdays averages count: 1556
Weekend averages count: 1406


We can see that not every route has a calculated average for both weekdays and weekends 

*(perhaps data from this days is missing or some stations are closed on weekends, for example)*,

so we're going to drop the routes without both averages. This will be done while doing an inner join

In [16]:
monthly_avgs = weekdays_monthly_avgs.join(weekend_monthly_avgs, ['month', 'station_A_id', 'station_B_id'], 'inner')
monthly_avgs.count()

1406

Now, we can calculate the **weekday_avg/weekend_avg ratios** and order them by ratio desc

In [17]:
monthly_ratios = monthly_avgs.select(
    monthly_avgs.month,
    monthly_avgs.station_A_id,
    monthly_avgs.station_B_id,
    (monthly_avgs.weekday_avg / monthly_avgs.weekend_avg).alias('ratio')
).withColumn(
    'rank', F.rank().over(W.partitionBy('month').orderBy(F.desc('ratio')))
)

Since we used rank instead of dense_rank, now we can just take rows with **rank <= 2**. This way we'll take all rows with rank 1 and, if there is only 1 row with rank 1 for a given month, take rows with rank 2

In [40]:
top_monthly = monthly_ratios.where(monthly_ratios.rank <= 2).cache()

And so, one final thing to do is to change station ids into station names

In [41]:
top_monthly_with_names = top_monthly \
    .join(station_data, top_monthly.station_A_id == station_data.id) \
    .select(top_monthly['*'], station_data.name.alias('station_A_name'))

top_monthly_with_names = top_monthly_with_names \
    .join(station_data, top_monthly_with_names.station_B_id == station_data.id) \
    .select(top_monthly_with_names['*'], station_data.name.alias('station_B_name'))


And so, one final thing is to format the final result, so it contains only necessary output

In [44]:
final_result = top_monthly_with_names \
    .orderBy(top_monthly_with_names.month.asc(), top_monthly_with_names.rank.asc()) \
    .select('month', 'station_A_name', 'station_B_name', 'ratio')

print("The final result:")
final_result.show(50, truncate=False)


The final result:
+-----+----------------------------------------------+-------------------------------------------------+------------------+
|month|station_A_name                                |station_B_name                                   |ratio             |
+-----+----------------------------------------------+-------------------------------------------------+------------------+
|1    |Aquatic Centre, Queen Elizabeth Olympic Park  |Lee Valley VeloPark, Queen Elizabeth Olympic Park|1.268274812602153 |
|1    |Eccleston Place, Victoria                     |Eastbourne Mews, Paddington                      |1.1965811965811965|
|2    |Albert Gate, Hyde Park                        |Queen's Gate, Kensington Gardens                 |1.7529734183563805|
|2    |Finsbury Circus, Liverpool Street             |Waterloo Station 1, Waterloo                     |1.5393258426966292|
|3    |Crosswall, Tower                              |Waterloo Station 1, Waterloo                     |1.53880266

In [33]:
spark.stop()