# Pre processing information

In [None]:
%pip install dask


In [210]:
import pandas as pd
def clean_is_installed(value):
    if isinstance(value, str):
        value = value.lower()
        if value in ['true', 'false']:
            return 1 if value == 'true' else 0
    try:
        return float(value)
    except ValueError:
        return None


def to_bool(x):
    if pd.isna(x):
        return False
    if isinstance(x, str):
        return x.lower() in ['true', '1']
    return bool(x)


def clean_is_renting_returning(value):
    if value in ['0', '1']:
        return int(value)
    return None


def clean_num_docks_available(value):
    try:
        return float(value)
    except ValueError:
        return None


def cleanColumns(df):
    # Convert 'is_installed' to boolean using a custom function
    df['is_installed'] = df['is_installed'].map_partitions(
        lambda s: s.map(clean_is_installed))

    # Convert 'is_renting' and 'is_returning' to boolean using a custom function
    df['is_renting'] = df['is_renting'].map_partitions(
        lambda s: s.map(clean_is_renting_returning))
    df['is_returning'] = df['is_returning'].map_partitions(
        lambda s: s.map(clean_is_renting_returning))

    # Convert 'num_docks_available' to float using a custom function
    df['num_docks_available'] = df['num_docks_available'].map_partitions(
        lambda s: s.map(clean_num_docks_available))

    # Convert 'is_charging_station' to boolean using a custom function
    df['is_charging_station'] = df['is_charging_station'].map_partitions(
        lambda s: s.map(to_bool))
    
    return df

In [211]:
import dask.dataframe as dd
import pandas as pd
import glob
from datetime import  datetime

# Define the path to the directory containing the CSV files
csv_files = glob.glob('bicing_data/*.csv')

# Read all files into a single Dask DataFrame, treating all columns as strings initially
df = dd.read_csv(csv_files, assume_missing=True, dtype=str)


# last_updated column has unecessary information
columns_to_drop = ['last_updated']
df = df.drop(columns=columns_to_drop)

# Get all unique columns across all CSV files
all_columns = set(df.columns)
# Ensure all columns are present in the DataFrame
for col in all_columns:
    if col not in df.columns:
        df[col] = None





df = cleanColumns(df)

# Convert columns to their appropriate data types
df = df.astype({
    'is_charging_station': 'Int64', 
    'is_installed': 'Int64',
    'is_renting': 'Int64',
    'is_returning': 'Int64',
    'num_bikes_available': 'Int64',
    'num_bikes_available_types.ebike': 'Int64',
    'num_bikes_available_types.mechanical': 'Int64',
    'num_docks_available': 'Int64',
    'station_id': 'Int64',
    'status': 'object',
    'ttl': 'float64'
})



In [212]:

# Function to convert Unix timestamp to datetime
def convert_unix_to_datetime(s):
    # Convert to numeric, coercing errors to NaN
    s = pd.to_numeric(s, errors='coerce')
    # Convert numeric values to datetime, coercing errors
    return pd.to_datetime(s, unit='s', errors='coerce')

def define_last_reported_column(df):
   # Apply the conversion function to the last_reported column
    df['last_reported'] = df['last_reported'].map_partitions(
    convert_unix_to_datetime, meta=('last_reported', 'datetime64[ns]'))

    return df

# Function to extract datetime components
def extract_datetime_components(df):
    df['year'] = df['last_reported'].dt.year
    df['month'] = df['last_reported'].dt.month
    df['day'] = df['last_reported'].dt.day
    df['hour'] = df['last_reported'].dt.hour
    return df



In [213]:
df = define_last_reported_column(df)
df = df.map_partitions(extract_datetime_components)

df.head()

Unnamed: 0,station_id,num_bikes_available,num_bikes_available_types.mechanical,num_bikes_available_types.ebike,num_docks_available,is_installed,is_renting,is_returning,last_reported,is_charging_station,status,ttl,year,month,day,hour
0,1,9,9,0,35,1,1,1,2020-05-31 21:58:55,1,IN_SERVICE,2.0,2020,5,31,21
1,2,22,22,0,3,1,1,1,2020-05-31 21:56:31,1,IN_SERVICE,2.0,2020,5,31,21
2,3,12,12,0,15,1,1,1,2020-05-31 22:00:04,1,IN_SERVICE,2.0,2020,5,31,22
3,4,9,9,0,12,1,1,1,2020-05-31 22:00:10,1,IN_SERVICE,2.0,2020,5,31,22
4,5,31,31,0,7,1,1,1,2020-05-31 21:59:53,1,IN_SERVICE,2.0,2020,5,31,21


### Add capacity column by mergin the station information from the Informacio_Estacions_Bicing.csv

In [214]:
def read_capacity_data(df):
    # Define the path to the capacity data file
    capacity_file_path = 'estaciones/Informacio_Estacions_Bicing.csv'

    # Read the capacity data
    capacity_df = dd.read_csv(capacity_file_path, dtype={
                              'station_id': 'Int64', 'capacity': 'Int64'})
    

    capacity_df = capacity_df[capacity_df['capacity'].notnull()].compute()
    # Merge the main DataFrame with the capacity DataFrame on 'station_id'
    df = df.merge(capacity_df, on='station_id', how='left')

    # Filter rows where 'capacity' is not null
    df = df[df['capacity'].notnull()]

    # Convert 'capacity' to float for any subsequent operations
    df['capacity'] = df['capacity'].astype(float)

    return df


df = read_capacity_data(df)

df.head()

Unnamed: 0,station_id,num_bikes_available,num_bikes_available_types.mechanical,num_bikes_available_types.ebike,num_docks_available,is_installed,is_renting,is_returning,last_reported,is_charging_station_x,...,lon,altitude,address,post_code,capacity,is_charging_station_y,nearby_distance,_ride_code_support,rental_uris,cross_street
0,1,9,9,0,35,1,1,1,2020-05-31 21:58:55,1,...,2.180107,16.0,"GRAN VIA CORTS CATALANES, 760",8013.0,45.0,True,1000.0,True,,
1,2,22,22,0,3,1,1,1,2020-05-31 21:56:31,1,...,2.177198,17.0,"C/ ROGER DE FLOR, 126",8013.0,29.0,True,1000.0,True,,
2,3,12,12,0,15,1,1,1,2020-05-31 22:00:04,1,...,2.181331,11.0,"C/ NÀPOLS, 82",8013.0,27.0,True,1000.0,True,,
3,4,9,9,0,12,1,1,1,2020-05-31 22:00:10,1,...,2.181248,8.0,"C/ RIBES, 13",8013.0,21.0,True,1000.0,True,,
4,5,31,31,0,7,1,1,1,2020-05-31 21:59:53,1,...,2.180176,7.0,"PG. LLUIS COMPANYS, 11 (ARC TRIOMF)",8018.0,39.0,True,1000.0,True,,


### Add percentage availability column

In [216]:
import numpy as np


def add_percentage_docks_available(df):
    # Ensure both columns are in float format and handle missing values
    df['num_docks_available'] = df['num_docks_available'].astype(float)

    # Compute the percentage of docks available
    df['percentage_docks_available'] = df['num_docks_available'] / df['capacity']

    # Handle division by zero or missing values by replacing them with zero
    df['percentage_docks_available'] = df['percentage_docks_available'].fillna(
        0)
    df['percentage_docks_available'] = df['percentage_docks_available'].replace(
        [np.inf, -np.inf], 0)

    return df

df_percentage_docks_available = add_percentage_docks_available(df)
df_percentage_docks_available.head()

Unnamed: 0,station_id,num_bikes_available,num_bikes_available_types.mechanical,num_bikes_available_types.ebike,num_docks_available,is_installed,is_renting,is_returning,last_reported,is_charging_station_x,...,altitude,address,post_code,capacity,is_charging_station_y,nearby_distance,_ride_code_support,rental_uris,cross_street,percentage_docks_available
0,1,9,9,0,35.0,1,1,1,2020-05-31 21:58:55,1,...,16.0,"GRAN VIA CORTS CATALANES, 760",8013.0,45.0,True,1000.0,True,,,0.777778
1,2,22,22,0,3.0,1,1,1,2020-05-31 21:56:31,1,...,17.0,"C/ ROGER DE FLOR, 126",8013.0,29.0,True,1000.0,True,,,0.103448
2,3,12,12,0,15.0,1,1,1,2020-05-31 22:00:04,1,...,11.0,"C/ NÀPOLS, 82",8013.0,27.0,True,1000.0,True,,,0.555556
3,4,9,9,0,12.0,1,1,1,2020-05-31 22:00:10,1,...,8.0,"C/ RIBES, 13",8013.0,21.0,True,1000.0,True,,,0.571429
4,5,31,31,0,7.0,1,1,1,2020-05-31 21:59:53,1,...,7.0,"PG. LLUIS COMPANYS, 11 (ARC TRIOMF)",8018.0,39.0,True,1000.0,True,,,0.179487
