In [1]:
import sys
sys.path.append("..") # Adds higher directory to python modules path.
from utils import *

### Task 1
Processing information to generate new data and attributes on large datasets such as the bike data below is often likely going to be time-expensive; as such, it is important to seek out approaches that can optimise and streamline these processes. The Pool object contained within Python's multiprocessing module offers a way to parallelize the execution of a function across a number of input values that are distributed between processes. This data parallelism is used in this notebook to calculate bike usage data on the bike data. By approaching a problem from a non-sequential standpoint, tasks and jobs can be completed more efficiently.

 - 1. Read in tables as dataframes.

In [2]:
core_bike_df = query_db()
core_weather_df = query_db("live_weather_data")
bike_df = core_bike_df.copy()
weather_df = core_weather_df.copy()

 - 2. Print the number of rows in each table.

In [3]:
print("\tBikes dataframe contains", f'{len(bike_df.index):,}', "rows.")
print("\tWeather dataframe contains", f'{len(weather_df.index):,}', "rows.")

	Bikes dataframe contains 1,077,375 rows.
	Weather dataframe contains 41,628 rows.


 - 3. Remove any rows related to stations that are not currently in use.

In [4]:
bike_df = bike_df.loc[bike_df['status'] == 0]
print("\t", bike_df['status'].nunique())

	 1


 - 4. Generate the support attribute dd-mm-yy.

In [5]:
bike_df['dd_mm_yy'] = pd.to_datetime(bike_df['dt'],unit='s').dt.strftime('%d-%m-%y')

 - 5. Generate a city indicator attribute for each dataframe.

In [6]:
bike_df['city_id'] = np.where(bike_df['station_id'] > 4000, 4, 
         (np.where(bike_df['station_id'] < 3000, 2, 3)))
weather_df['city_id'] = np.where(weather_df.name == "Cork", 2,
                                 (np.where(weather_df.name == "Limerick", 3, 4)))

In [7]:
# a list of tuples containing df, station_id, date to be used as arguments for pool_process function
stations_data = [(bike_df, station, day) for day in bike_df['dd_mm_yy'].unique() for station in bike_df['station_id'].unique()]
city_data = [(bike_df, city, day) for day in bike_df['dd_mm_yy'].unique() for city in bike_df['city_id'].unique()]

 - 6. Generate a bikes available citywide attribute that represents the total number of bikes available within each city for all recorded moments.

In [8]:
%%time
bike_df['bikes_available_citywide'] = 0
ba_citywide_output = pool_process(func=calculate_available_bikes_citywide, data=city_data, pool_size=8)

CPU times: user 22.8 s, sys: 5.49 s, total: 28.3 s
Wall time: 2min


In [9]:
%%time
for dictionary in ba_citywide_output:
    bike_df.loc[[*dictionary], 'bikes_available_citywide'] = list(dictionary.values())

CPU times: user 3.22 s, sys: 539 ms, total: 3.76 s
Wall time: 3.8 s


 - 7. Generate a count attribute named count_1 that cumulatively tracks bike usage at each station across every day.

In [10]:
bike_df['count_1'] = 0

In [11]:
%%time
count_1_output = pool_process(func=track_station_usage_by_date, data=stations_data, pool_size=8)

CPU times: user 24.6 s, sys: 5.86 s, total: 30.5 s
Wall time: 7min


In [12]:
%%time
for dictionary in count_1_output:
    bike_df.loc[[*dictionary], 'count_1'] = list(dictionary.values())

CPU times: user 1min 34s, sys: 18.7 s, total: 1min 53s
Wall time: 1min 53s


 - 8. Generate a count attribute named count_2 that displays the total bike usage at each city across every day.

In [13]:
bike_df['count_2'] = 0

In [14]:
%%time
count_2_output = pool_process(func=track_city_usage_by_date, data=city_data, pool_size=8)

CPU times: user 22.4 s, sys: 6.31 s, total: 28.7 s
Wall time: 2min 3s


In [15]:
%%time
for dictionary in count_2_output:
    bike_df.loc[[*dictionary], 'count_2'] = list(dictionary.values())

CPU times: user 2.69 s, sys: 239 ms, total: 2.93 s
Wall time: 2.93 s
