In [None]:
import requests
import unicodecsv
from bs4 import BeautifulSoup
from pandas import DataFrame, read_csv, concat, set_option, to_datetime
import urllib
from typing import List, Dict, Tuple
from re import search, match, compile, sub
import os
import logging
import zipfile
from io import StringIO

In [None]:
set_option('display.max_rows', 1000)
set_option('display.max_columns', 50)

# Historical rides data pipeline

Historical rides data is made available through bixi's site https://bixi.com/en/open-data/. The underlying html strucutre of the page contains 1 s3 endpoint for each year of data. The following is an example of the aforementioned endpoint:
- https://s3.ca-central-1.amazonaws.com/cdn.bixi.com/wp-content/uploads/2024/03/DonneesOuvertes2024_010203.zip

In order to reliably access this information, an ETL pipeline object will be designed in the next steps. See below the pseudo code and context for each one of the ETL steps. In addition, see below the desired schema for this dataset:

     - start_station_name : str
     - start_station_latitude: float
     - start_station_longitude: float
     - end_station_name : str
     - end_station_latitude: float
     - end_station_longitude: float
     - start_date: datetime[ns]
     - end_date: datetime[ns]

**Extract**:

Ride data files need to be extracted from bixi's website and persisted on a temporary folder. The following lays out the logic for the extract step:
- Turn bixi's website into `BeautifulSoup` object using `BeautifulSoup(webpage.content)` logic.
- Find all the s3 endpoints and select the endpoints dor which there is not yet data.
- For the selected endpoints, use the `requests` library to get the underlying zip file(s)
- Open the zip file and save it on the designated folder structure based on the year.

**Transform**:

Once the rides data zip files are extracted, data needs to be transformed and compiled into a dataset for all years for which data is available. The preliminary exploration of the rides data allowed the idenfications of two distinct schema: before and after the year 2022. The following lays out the logic for the transform step:
- Retrive the name of all files contained inside the zip file using method `zipfile.ZipFile('<file_name>').names_list()`.
- If the data dates from 2021 or earlier:
    - open and concatanted all `.csv` excluding the file containing the regex pattern `.*[s-S]atations.*`
    - open the file containing regex pattern `.*[s-S]atations.*` and merge it with the concatened files from previous step in order to link denormalize the stations names. The fields for the merge vary depending on the year which will be hardcoded.
    - confrom data to the desired schema           
- Else:
    -  open the `.csv` file and confrom to the desired schema.

In [None]:
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')

In [None]:
class RidesDataPipeline:
    """
    Extract, load and transform pipeline object for bixi rides data
    """

    def __init__(
        self, 
        url: str, 
        temporary_directory: str, 
        years: Tuple[int, int], 
        column_name_mapping: Dict[str, str], 
        ride_data_columns: List[str]
    ):
        self.url = url
        self.raw_data_directory = temporary_directory
        self.years = [y for y in range(years[0], years[1] +1)]
        self.column_name_mapping = column_name_mapping
        self.ride_data_columns = ride_data_columns

    def _get_s3_endpoints(self) -> Dict[int, str]:
        """Webscrape and extract all available s3 endpoints metadata from bixi's website"""
        
        response = requests.get(self.url, 'html.parser')
        soup_classes = (
            BeautifulSoup(response.content)
            .find_all('a', href=True, attrs={'class':'button button-primary'})
        )
        s3_endpoints = [s3['href'] for s3 in soup_classes if 's3.ca' in s3['href']]

        s3_endpoint_by_year = {
            int(search('2[0-9]{3}', endpoint.split('/')[-1])[0]) : endpoint # Get only endpoints with year in the string
            for endpoint in s3_endpoints
        }
         
        return s3_endpoint_by_year

    def _save_rides_data_files(self, s3_endpoint_by_year: Dict) -> None:
        """Extracts and save rides data das zip files"""
        
        for year in self.years:
            response = requests.get(s3_endpoint_by_year[year])
            file_name = f"{self.raw_data_directory}/raw_data/rides_data/{year}/{year}_hist_rides_data.zip"
            os.makedirs(os.path.dirname(file_name), exist_ok=True) 
            
            with open(file_name, "wb") as f:
                f.write(response.content)
            f.close()
            logging.info(f'Data for year {year} saved successfully.')
            
        return

    def _retrive_csv_files_path(self) -> Dict[int, List[str]]:
        """Retrive csv files path from the zip files extracted"""
    
        csv_files_path  = {}
        for year in self.years:
            zip_file_object = zipfile.ZipFile(
                f"{self.raw_data_directory}/raw_data/rides_data/{year}/{year}_hist_rides_data.zip"
            )
            csv_files_path[year] = [file for file in zip_file_object.namelist() if search(".*csv*", file)]
            
        return csv_files_path


    @staticmethod
    def _standarize_columns(df: DataFrame, column_name_mapping: Dict[str, str], year: int) -> DataFrame:
        """
        Standarize columns by lowercasing, renaming and enforcing data types based on year selected
        """

        df.columns = [col.lower().strip() for col in df.columns]
        standard_df = df.rename(columns=column_name_mapping)
        
        if all(['start_date' in standard_df, year >= 2022]):
            standard_df = standard_df.assign(
                start_date=to_datetime(standard_df['start_date'], unit='ms'),
                end_date=to_datetime(standard_df['end_date'], unit='ms')
            )
            
        elif all(['start_date' in standard_df, year < 2022]):
            standard_df = standard_df.assign(
                start_date=to_datetime(standard_df['start_date']),
                end_date=to_datetime(standard_df['end_date'])
            )
            
        
        return standard_df
            

    def extract(self) -> None:
        """Extracts and saves rides data available through bixi S3 endpoints"""
        
        try:
            s3_endpoints = self._get_s3_endpoints()

            logging.info(f'The following s3 endpoints were detected: {s3_endpoints}')
            
            self._save_rides_data_files(
                s3_endpoints
            )
            
            logging.info(f'Data for endpoints {s3_endpoints} was sucessfully extracted and saved.')
                     
            return 
           
        except Exception as e:
            logging.error(f'Extraction was not successfull given the following excpetion: {e}')
                     
            return 
            
    def transform(self) -> DataFrame:
        """
        Combine rides data into a single dataframe with standard columns names and schema
        """
    
        csv_paths = self._retrive_csv_files_path()
        ride_data_columns = [
            'start_date',
            'end_date',
            'name_start_station',
            'name_end_station'
        ]

        dfs = []
        for year in self.years:
            logging.info(f'Transforming data for year {year}.')
            
            zip_file_object = zipfile.ZipFile(
                f"{RAW_DATA_DIRECTORY}/raw_data/rides_data/{year}/{year}_hist_rides_data.zip"
            )
            
            stations_file_path = next(
                (file for file in csv_paths[year] if search('.*[sS]tations.*', file)),
                None
            )

            if year < 2022:
                with zip_file_object.open(f'{stations_file_path}', 'r') as file:
                    stations_dataframe = read_csv(file)
            else:
                stations_dataframe = DataFrame()
            
            rides_files_path = [
                file for file in csv_paths[year] if file != stations_file_path
            ]
            
            rides_dataframes_list = []
            for file_path in rides_files_path:
                with zip_file_object.open(f'{file_path}', 'r') as file:
                    rides_dataframes_list.append(read_csv(file))
                    
            rides_dataframe = concat(rides_dataframes_list)
            
            rides_dataframe = self._standarize_columns(rides_dataframe, self.column_name_mapping, year)
            stations_dataframe = self._standarize_columns(stations_dataframe, self.column_name_mapping, year)
                   
            if year < 2022:
                rides_dataframe_denormalized = (
                    rides_dataframe.merge(
                        stations_dataframe,
                        how='left',
                        left_on='code_start_station',
                        right_on='code'
                    )
                    .merge(
                        stations_dataframe,
                        how='left',
                        left_on='code_end_station',
                        right_on='code',
                        suffixes=('_rides','_station')
                    )
                    .drop(['is_member','duration_sec'], axis=1)
                    .rename(
                        columns={
                            'latitude_rides':'latitude_start_station',
                            'longitude_rides':'longitude_start_station',
                            'latitude_station' :'latitude_end_station',
                            'longitude_station' :'longitude_end_station',
                            'name_rides':'name_start_station',
                            'name_station':'name_end_station',
                            'code_rides' :'code_start_station',
                            'code_station':'code_end_station'
                        }
                    )
                )
                dfs.append(rides_dataframe_denormalized[self.ride_data_columns])
            
            else:
                dfs.append(rides_dataframe[self.ride_data_columns])
        
        return concat(dfs)

    def load(self, df: DataFrame, target_directory: str='', file_name: str='historical_rides_data') -> None:
        """
        Load transformed data into an specific target_directory
        """
        
        df.to_csv(f'{target_directory}{file_name}.csv')
        logging.info(f'Rides data sucessfully loaded on {target_directory}{file_name}.csv')
        
    def run_pipeline(self, steps: List[str]=['extract','transform','load'], **kwargs) -> None:
        """
        Run entire RidesDataPipeline consisting of steps extract, transform and load
        """
        if 'extract' in steps:
            self.extract()
        elif 'transform' in steps:
            df = self.transform()
        elif 'load' in steps:
            self.load(df, **kwargs)

In [None]:
# constants
BIXI_WEBSITE = "https://bixi.com/en/open-data/"
RAW_DATA_DIRECTORY = "/mnt/c/Users/felip/Documents/knowledge/ml_projects/protifolio/bixi_ride_predictions/data"
COLUMN_MAPPING = {
    'pk':'code',
    'emplacement_pk_start': 'code_start_station',
    'emplacement_pk_end': 'code_end_station',
    'startstationname': 'name_start_station', 
    'startstationarrondissement':'arrondissement_start_station',
    'startstationlatitude':'latitude_start_station', 
    'startstationlongitude':'longitude_start_station',
    'endstationname': 'name_end_station',
    'endstationarrondissement':'arrondissement_end_station', 
    'endstationlatitude':'latitude_end_station', 
    'endstationlongitude':'longitude_end_station',
    'starttimems':'start_date', 
    'endtimems': 'end_date',
    "start_station_code":"code_start_station",
    "end_station_code":"code_end_station"
}

RIDE_DATA_COLUMNS = [
    'start_date',
    'end_date',
    'name_start_station',
    'name_end_station',
    'latitude_start_station',
    'longitude_start_station',
    'latitude_end_station',
    'longitude_end_station'
]

In [None]:
dfs_ = RidesDataPipeline(
    'https://bixi.com/en/open-data/', 
    RAW_DATA_DIRECTORY,
    (2022, 2024),
    COLUMN_MAPPING,
    RIDE_DATA_COLUMNS,
).run_pipeline(file_name='historical_rides_data_2022_to_2024')

In [None]:
dfs_.dtypes

# Historical data weather

In [None]:
def get_historical_hourly_weather_data(stationID: int, year: str, month: str) -> DataFrame:
    """
    Reads and transfrom historical data based on the data parameters into a dataframe
    """
    base_url = 'http://climate.weather.gc.ca/climate_data/bulk_data_e.html?'
    query_url = 'format=csv&stationID={}&Year={}&Month={}&timeframe=1'.format(stationID, year, month)
    api_endpoint = base_url + query_url
    return read_csv(api_endpoint, skiprows=0)

# List of months and years to iterate with
years = [ y for y in range(2014, 2024)]
months = [ f'0{m}' if m < 10 else f'{m}' for m in range(1, 13)]

weather_data = concat([
    get_hourly_weather_data(30165, year, month)
    for year in years
    for month in months
])

weather_data.to_csv('/mnt/c/Users/felip/Documents/knowledge/ml_projects/protifolio/bixi_ride_predictions/data/hist_weather_data.csv')