Feature Engineering
---
Diogo Pessoa

## Objective

Call data_collection notebook to load the dataset and apply feature engineering to the dataset.

### Adding new features

As discussed in the [Technical Report](Reports/out/Technical_Report_Diogo_Pessoa.pdf).
I'll add a categorical field for time of day and day of week. Later I'll use these fields to analyze the most popular stations by time of day and day of week.
For both Categorical fields the idea is to simplify the data and make it easier to analyze. While also labeling trips and opening an avenue for further exploration in regression models
* Time of day: Morning, Afternoon, Evening, Night
* Day of week: Workday, non-Working

In [19]:
sampled_df = divvy_df.sample(withReplacement=False, fraction=0.01)

In [None]:
# # removing duplicates and null values from ride_id and station Ids
sampled_df = sampled_df.dropDuplicates(subset=['ride_id']).dropna(subset=['ride_id'])
# # Get unique bike stations, removing rows with null values for station Ids
from divvy_bike_share_data_analysis.bike_stations import get_unique_bike_stations_ids

sampled_df = sampled_df.dropna(subset=['start_station_id', 'end_station_id'])
sampled_df.show(10)
bike_stations = get_unique_bike_stations_ids(sampled_df.select(['start_station_id',
                                                                'start_station_name',
                                                                'end_station_id',
                                                                'end_station_name']))


### Applying unified Station Names by ID back to sampled dataset

In [None]:
# check started_at and ended_at columns for null values
sampled_df = sampled_df.dropna(subset=['started_at', 'ended_at'])


In [None]:
from pyspark.sql.functions import udf, hour, dayofweek
from pyspark.sql.types import StringType

# TODO refactoring to optmized queries and avoid duplicated columns after joins.
# [Note to self] check bike_stations.get_unique_ids function for similar approach.

# Prepare start and end stations DataFrames from bike_stations
start_stations = bike_stations.selectExpr("station_id as start_station_id", "station_name as new_start_station_name")
end_stations = bike_stations.selectExpr("station_id as end_station_id", "station_name as new_end_station_name")

# Drop existing name columns in sampled_df before joining
sampled_df = sampled_df.drop("start_station_name", "end_station_name")

# Join with start_stations to add new_start_station_name
sampled_df = sampled_df.join(start_stations, on="start_station_id", how="left").withColumnRenamed("new_start_station_name", "start_station_name")

# Join with end_stations to add new_end_station_name
sampled_df = sampled_df.join(end_stations, on="end_station_id", how="left").withColumnRenamed("new_end_station_name", "end_station_name")


In [16]:
""" rideable_type - 0: classic_bike, 1: docked_bike, 2: electric_bike
We'll discard the docked_bike type as it's not relevant for the predictive analysis of the number of bikes needed at each station at different times of the day."""

sampled_df = sampled_df.filter(sampled_df['rideable_type'] != 'docked_bike')

In [None]:
# Adding a categorical field for time of day and day of week.
from divvy_bike_share_data_analysis.bike_stations import categorize_time_of_day, categorize_day_of_week

week_day_udf = udf(categorize_day_of_week, StringType())
time_of_day_udf = udf(categorize_time_of_day, StringType())
sampled_df_with_added_features = sampled_df.withColumn('day_period', time_of_day_udf(hour('started_at'))).withColumn(
    'week_day', week_day_udf(dayofweek('started_at')))


In [None]:
from pyspark.ml import Pipeline

from pyspark.ml.feature import StringIndexer
# Assuming your initial DataFrame is named sampled_df_with_added_features and you want to transform it using the defined StringIndexers

indexer_start_station_id = StringIndexer(inputCol="start_station_id", outputCol="start_station_id_index")
indexer_end_station_id = StringIndexer(inputCol="end_station_id", outputCol="end_station_id_index")
indexer_member_casual = StringIndexer(inputCol="member_casual", outputCol="member_casual_index")
indexer_day_period = StringIndexer(inputCol="day_period", outputCol="day_period_index")
indexer_week_day = StringIndexer(inputCol="week_day", outputCol="week_day_index")
indexer_rideable_type = StringIndexer(inputCol="rideable_type", outputCol="rideable_type_index")

# Combine all indexers into a Pipeline for streamlined processing
pipeline = Pipeline(stages=[
    indexer_start_station_id,
    indexer_end_station_id,
    indexer_member_casual,
    indexer_day_period,
    indexer_week_day,
    indexer_rideable_type
])

# Fit and transform the DataFrame using the pipeline
sampled_df_with_added_features_indexed = pipeline.fit(sampled_df_with_added_features).transform(sampled_df_with_added_features)

In [None]:
print(f'loaded sampled_df_with_added_features with {sampled_df_with_added_features.count()} \n and columns {sampled_df_with_added_features.columns}')

print(f'loaded sampled_df_with_added_features_indexed with {sampled_df_with_added_features_indexed.count()} \n and columns {sampled_df_with_added_features_indexed.columns}')
