In [126]:
import os.path as path   
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt

dt_format = '%H:%M:%S,%d-%m-%Y'
sep = ','

trips_cols = [
              'trip_id',
              'driver_id',
              'pickup_datetime',
              'dropoff_datetime',
              'passenger_count',
              'pickup_loc_id',
              'dropoff_loc_id',
              'trip_distance',
              'fare_amount'
             ]
drivers_cols = [
                'driver_id',
                'last_name',
                'given_name'
              ]
locs_cols = [
            'location_id',
            'loc_name'
           ]
trips_dtypes = {
                'trip_id': int,
                'driver_id': int,
                'pickup_datetime': 'datetime64[ns]',
                'dropoff_datetime': 'datetime64[ns]',
                'passenger_count': int,
                'pickup_loc_id': int,
                'dropoff_loc_id': int,
                'trip_distance': float,
                'fare_amount': float
               }
drivers_dtypes = {
                  'driver_id': int,
                  'last_name': str,
                  'given_name': str
                }
locs_dtypes = {
              'location_id': int,
              'loc_name': str
              }

# Loraine: Had to create a separate data type and date column references
# since the current would not work when reading files
dow_order = {'Monday': 0, 'Tuesday': 1, 'Wednesday': 2, 'Thursday': 3, 
            'Friday': 4, 'Saturday': 5, 'Sunday': 6}

trips_dates = ['pickup_datetime', 'dropoff_datetime']
trips_dtypes_read = {
                'trip_id': int,
                'driver_id': int,
                'passenger_count': int,
                'pickup_loc_id': int,
                'dropoff_loc_id': int,
                'trip_distance': float,
                'fare_amount': float
               }


def check_driver(driver):
        
        special_characters = '''!\"#$%&'()*+-/:;<=>?@[\]^_`{|}~'''
        if type(driver) is not str or driver.strip() == '': 
            return 'Name should not be empty or must be in string format'
        if len(driver.split(',')) != 2:
            return 'Invalid name format should be (First Name, Last Name)'
        if any(c in special_characters for c in driver):
            return 'Name contains special characters'
        return None


def check_dt(dt, dt_col): 
    
    if type(dt) is not str: 
        return (f'{dt_col} should be in string format')
    if pd.isnull(pd.to_datetime(dt.strip(), format=dt_format, errors='coerce')): 
        return (f'{dt_col} should be in valid format')
    return None


def check_pass_cnt(cnt):
 
    if type(cnt) is not int: 
        return 'Passenger count should be integer value'
    if cnt < 0:
        return 'Passenger count should be a positive value >= 0'
    return None


def check_loc(loc, loc_col):

    special_characters = '''!\"%&'()+-/:;<=>?@[\]^_`{|}~'''
    if type(loc) is not str or loc.strip() == '': 
        return(f'{loc_col} should not be empty or must be in string format')   
    elif any(c in special_characters for c in loc):
            return 'Location contains special characters'
    return None


def check_trip_d(val):
    
    if type(val) is not float and type(val) is not int:
        return 'Trip Distance should be a numeric value'
    if val < 0:
        return 'Trip Distance should be a positive number'
    return None


def check_fare(val):
    
    if type(val) is not float and type(val) is not int:
        return 'Fare Amount should be a numeric value'
    if val < 0:
        return 'Fare Amount should be a positive number'
    return None

def check_start_end(p_dt, d_dt):
    p_dt = pd.to_datetime(p_dt.strip(), format=dt_format, errors='coerce')
    d_dt = pd.to_datetime(d_dt.strip(), format=dt_format, errors='coerce')

    if (p_dt > d_dt):
        return 'Pickup Date Time should be lower than Dropoff Date Time'
    return None
 

def add_trip_checks(param_list):
    lst_validator = [check_driver, check_dt, check_dt,
                     check_pass_cnt, check_loc, check_loc,
                     check_trip_d, check_fare, check_start_end]

    for key, func in enumerate(lst_validator):
        if key == 1:
            chk = func(param_list[key], 'Pickup Date Time')
        elif key == 2:
            chk = func(param_list[key], 'Dropoff Date Time')
        elif key == 4:
            chk = func(param_list[key], 'Pickup Location')
        elif key == 5:
            chk = func(param_list[key], 'Dropoff Location')
        elif key == 8:
            chk = func(param_list[1] ,param_list[2])
        else:
            chk = func(param_list[key])
        if chk is not None:
            return chk
        
def create_file(dir, cols):
    
    df = pd.DataFrame(columns=cols)
    df.to_csv(dir, sep=sep, header=True, index=False)
    return None


def check_driver_id(name, f_dir):
    
    df = pd.read_csv(f_dir, sep=sep)
    df.given_name = df.given_name.str.lower()
    df.last_name = df.last_name.str.lower()

    df_res = df.query('last_name==@name[0].lower() & given_name==@name[1].lower()')
    
    if len(df_res) == 0:
        last_val = df.tail(1)
        if len(last_val) == 1:
            return last_val.driver_id.values[0]+1, True
        return 1, True
    else:
        return df_res.driver_id.values[0], False
    
    
def check_loc_id(loc, f_dir):
  
    df = pd.read_csv(f_dir, sep=sep)
    df_res = df.query('loc_name==@loc')
    
    if len(df_res) == 0:
        last_val = df.tail(1)
        if len(last_val) == 1:
            return last_val.location_id.values[0]+1, True
        return 1, True
    else:
        return df_res.location_id.values[0], False
    
def check_trip_id(data, f_dir):

    df = pd.read_csv(f_dir, sep=sep)
    df_res = df.query('driver_id == @data[0]'
                   '& pickup_datetime == @data[1]'
                   '& dropoff_datetime == @data[2]'
                   '& passenger_count == @data[3]'
                   '& pickup_loc_id == @data[4]'
                   '& dropoff_loc_id == @data[5]' 
                   '& trip_distance == @data[6]'
                   '& fare_amount == @data[7]')
    
    if len(df_res) == 0:
        last_val = df.tail(1)
        if len(last_val) == 1:
            return last_val.trip_id.values[0]+1, False
        return 1, False
    else:
        return 0, True


def insert_data(data, file_p, cols_lst):
    insert_df = pd.DataFrame(data, columns=cols_lst)
    insert_df.to_csv(file_p, mode='a', index=False, header=False, sep=sep)
    return None

def del_trip(trip_id, f_dir):
 
    df = pd.read_csv(f_dir, sep=sep)
    df_res = df.query('trip_id != @trip_id')

    if len(df) == len(df_res):
        return False
    else:
        df_res.to_csv(f_dir, sep=sep, header=True, index=False)
        return True
     
    
# Loraine: Added a function for reading the csv files.
# Feel free to use this if needed
def read_data(dataset, fname):
    if dataset == 'trips':
        return (pd.read_csv(fname, dtype=trips_dtypes_read,
                 parse_dates=trips_dates,
                 date_parser= lambda x: pd.to_datetime(x, format=dt_format)))
    elif dataset == 'drivers':
        return (pd.read_csv(fname, dtype=drivers_dtypes,
                 date_parser= lambda x: pd.to_datetime(x, format=dt_format)))
    elif dataset == 'locations':
        return (pd.read_csv(fname, dtype=locs_dtypes,
                 date_parser= lambda x: pd.to_datetime(x, format=dt_format)))
    else:
        raise SakayDBError("Request dataset not found. Choose among 'trips', \
                           'drivers', or 'locations'")
      
      
        
class SakayDBError(ValueError):
    
    def __init__(self, message):
        self.message = 'Error encountered: ' + str(message)
        super().__init__(self.message)



class SakayDB:
        
    def __init__(self, f_dir):
        if path.exists(f_dir) == False:
            raise SakayDBError(f'Directory {f_dir} does not exist')
            
        self.data_dir = f_dir
        self.__trips_dir = path.join(f_dir, 'trips.csv')
        self.__drivers_dir = path.join(f_dir, 'drivers.csv')
        self.__locs_dir = path.join(f_dir, 'locations.csv')
        
        if path.exists(self.__trips_dir) == False:
            create_file(self.__trips_dir, trips_cols)
            
        if path.exists(self.__drivers_dir) == False:
            create_file(self.__drivers_dir, drivers_cols)
            
        if path.exists(self.__locs_dir) == False:
            create_file(self.__locs_dir, locs_cols)
        
        
    def add_trip(self, driver, pickup_datetime, dropoff_datetime, passenger_count,
                 pickup_loc_name, dropoff_loc_name, trip_distance, fare_amount, is_trips=None):
        
        param_lst = [driver, pickup_datetime, dropoff_datetime, passenger_count,
                     pickup_loc_name, dropoff_loc_name, trip_distance, fare_amount]
        
        chk = add_trip_checks(param_lst)
        
        if chk is not None:
            if is_trips is None:
                raise SakayDBError(chk)
            else:
                print(f'Warning: trip index {is_trips} has invalid or incomplete information. Skipping...')
                return None
            
        split_name = [x.strip() for x in driver.split(',')]
        driver_id, is_new_driver = check_driver_id(split_name, self.__drivers_dir)
        
        pickup_loc_name = pickup_loc_name.strip()
        dropoff_loc_name = dropoff_loc_name.strip()
        
        p_loc_id, is_new_p_loc = check_loc_id(pickup_loc_name, self.__locs_dir)
        d_loc_id, is_new_d_loc = check_loc_id(dropoff_loc_name, self.__locs_dir)
        
        param_lst[0] = driver_id
        param_lst[4] = p_loc_id
        
        if is_new_p_loc and is_new_d_loc:
                d_loc_id += 1
        
        param_lst[5] = d_loc_id
        
        trip_id, is_dup = check_trip_id(param_lst, self.__trips_dir)
    
        if is_dup:
            if is_trips is None:
                raise SakayDBError('Duplicate Trip Entry')
            else:
                print(f'Warning: trip index {is_trips} is already in the database. Skipping...')
                return None
            
        if is_new_driver:
            driver_data = [[driver_id, split_name[0], split_name[1]]]
            insert_data(driver_data, self.__drivers_dir, drivers_dtypes)
            
        if is_new_p_loc:
            loc_data = [[p_loc_id, pickup_loc_name]]
            insert_data(loc_data, self.__locs_dir, locs_dtypes)
            
            
        if is_new_d_loc:
            loc_data = [[d_loc_id, dropoff_loc_name]]
            insert_data(loc_data, self.__locs_dir, locs_dtypes)

        
        trip_data = [[
                      trip_id,
                      param_lst[0],
                      param_lst[1],
                      param_lst[2],
                      param_lst[3],
                      param_lst[4], 
                      param_lst[5],
                      param_lst[6],
                      param_lst[7]
                    ]]
            
        insert_data(trip_data, self.__trips_dir, trips_dtypes)
        
        return trip_id
    
    
    def add_trips(self, trip_list):
        
        if type(trip_list) is not list:
            raise SakayDBError('Trips should be a valid list')
        
        for i, val in enumerate(trip_list):
            self.add_trip(val.get('driver'),
                          val.get('pickup_datetime'),
                          val.get('dropoff_datetime'),
                          val.get('passenger_count'),
                          val.get('pickup_loc_name'),
                          val.get('dropoff_loc_name'),
                          val.get('trip_distance'),
                          val.get('fare_amount'),
                          i)
        
        
    def delete_trip(self, trip_id):
        
        if type(trip_id) is not int:
            raise SakayDBError('Trip ID should be integer value')
        
        if del_trip(trip_id, self.__trips_dir) == False:
            raise SakayDBError('Trip ID not found')

    def generate_statistics(self, stat, df=None):
        """
        Return a dictionary depending on the `stat` parameter passed to it

        Parameters
        ----------
        stat : str
            Statistics to be generated. Can be details of the trip, passenger,
            driver, or all of the above
        df : pandas dataframe
            Dataframe to be used for creating the statistics.
            Uses the `trips` and `drivers` database by default.

        Returns
        -------
        generate_statistics : dict
            Dictionary containing the statistics requested based on `stat`
            parameter
        """
        # if df is None, use default dfs, else use input param
        if stat == 'trip':
            if df is None:
                df_trips = read_data('trips', self.__trips_dir)
                if len(df_trips) == 0:
                    return {}
                else:
                    dow = df_trips.pickup_datetime.dt.strftime('%A')
                    return ((df_trips.groupby(dow).trip_id.nunique() /
                             df_trips.groupby(dow).pickup_datetime
                                 .apply(lambda x: x.dt.date.nunique()))
                                 .sort_index(key=lambda x: x.map(dow_order))
                                 .to_dict())

            else:
                dow = df.pickup_datetime.dt.strftime('%A')
                return ((df.groupby(dow).trip_id.nunique() /
                         df.groupby(dow).pickup_datetime
                           .apply(lambda x: x.dt.date.nunique()))
                           .sort_index(key=lambda x: x.map(dow_order))
                           .to_dict())

        elif stat == 'passenger':
            df_trips = read_data('trips', self.__trips_dir)
            if len(df_trips) == 0:
                return {}
            else:
                return {k: self.generate_statistics('trip', v)
                        for k, v in df_trips.groupby('passenger_count')}

        elif stat == 'driver':
            df_drivers = read_data('drivers', self.__drivers_dir)
            df_trips = read_data('trips', self.__trips_dir)
            if (len(df_drivers) == 0) | (len(df_trips) == 0):
                return {}
            else:
                df_temp = (df_drivers.merge(df_trips[['driver_id', 'trip_id',
                            'pickup_datetime']], how='left', on='driver_id'))
                df_temp['driver_name'] = (df_temp[['last_name', 'given_name']]
                                             .apply(lambda x: ', '.join(x),
                                             axis=1))
                return {k: self.generate_statistics('trip', v)
                        for k, v in df_temp.groupby('driver_name')}

        elif stat == 'all':
            return {'trip': self.generate_statistics('trip'),
                    'passenger': self.generate_statistics('passenger'),
                    'driver': self.generate_statistics('driver')}

        else:
            raise SakayDBError('Input parameter is unknown.')

    def search_trips(self, **kwargs):
        df = read_data('trips', self.__trips_dir)
        col_list = df.columns.tolist()
        if kwargs == {}:
            raise SakayDBError('Invalid keyword')
        else:
            if len(df) == 0:
                return []
            else:
                for i, j in kwargs.items():
                    if i in col_list:
                        if type(j) is not tuple and type(j) is int:
                            df = df.loc[df[i] == j]
                        elif (type(j) is tuple) and (len(j) == 2):
                            if (all(isinstance(n, str) for n in j) is True):
                                df['pickup_datetime_1'] = pd.to_datetime(df[i],
                                                                format='%H:%M:%S,%d-%m-%Y')
                                df['dropoff_datetime_2'] = pd.to_datetime(df[i],
                                                                format='%H:%M:%S,%d-%m-%Y')
                                a = pd.to_datetime(j[0], format='%H:%M:%S,%d-%m-%Y')
                                b = pd.to_datetime(j[1], format='%H:%M:%S,%d-%m-%Y')
                                if (j[0] is not None) and (j[1] is not None) and (a <= b):
                                    df = df.loc[(df[i+'_1'] >= a) & (df[i+'_1'] <= b)]
                                    df = df.iloc[:,:-2]
                                elif (j[0] is None) and (j[1] is not None):
                                    df = df.loc[df[i+'_1'] <= b]
                                    df = df.iloc[:,:-2]
                                elif (j[0] is not None) and (j[1] is None):
                                    df = df.loc[df[i+'_1'] >= a]
                                    df = df.iloc[:,:-2]
                                else:
                                    raise SakayDBError('Invalid values for range')
                            elif (all(isinstance(n, str) for n in j) is False):
                                if (j[0] is not None) and (j[1] is not None) and (j[0] <= j[1]):
                                    df = df.loc[(df[i] >= j[0]) & (df[i] <= j[1])]
                                elif (j[0] is None) and (j[1] is not None):
                                    df = df.loc[df[i] <= j[1]]
                                elif (j[0] is not None) and (j[1] is None):
                                    df = df.loc[df[i] >= j[0]]
                                else:
                                    raise SakayDBError('Invalid values for range')
                        else:
                            raise SakayDBError('Invalid values for range')
                    else:
                        raise SakayDBError('Invalid keyword')
                    df = df.sort_values(by=i)
                df['pickup_datetime'] = df['pickup_datetime'].dt.strftime(dt_format)    
                df['dropoff_datetime'] = df['dropoff_datetime'].dt.strftime(dt_format)
                return df
            
    def generate_odmatrix(self, date_range=None):
        df_trips = read_data('trips', self.__trips_dir)
        df_loc = pd.read_csv('locations.csv')
        
        if len(df_trips) == 0:
            return df_trips
        else:
            pass
        df = df_trips.merge(df_loc,
                    left_on='pickup_loc_id',
                    right_on='location_id',
                    how='left')
        df = df.merge(df_loc,
                      left_on='dropoff_loc_id',
                      right_on='location_id',
                      how='left')
        df['pickup_datetime_1'] = pd.to_datetime(df['pickup_datetime'],
                                                 format='%H:%M:%S,%d-%m-%Y')
        df['count'] = 1
        col_dict = {'loc_name_x': 'pickup_loc_name',
                    'loc_name_y': 'dropoff_loc_name'}
        df.rename(columns=col_dict, inplace=True)
        case = 0
        filt = 'pickup_datetime_1'

        if date_range == None:
            case = 0
        elif (date_range != None):
            if (type(date_range) is not tuple):
                raise SakayDBError('Invalid values for range')
            elif (type(date_range) is tuple) and (len(date_range) != 2):
                raise SakayDBError('Invalid values for range')
            elif (type(date_range) is tuple) and (len(date_range) == 2):
                if (date_range[0] is not None) and \
                   (date_range[1] is not None):
                    date_1 = pd.to_datetime(date_range[0],
                                            format='%H:%M:%S,%d-%m-%Y')
                    date_2 = pd.to_datetime(date_range[1],
                                            format='%H:%M:%S,%d-%m-%Y')
                    if date_1 > date_2:
                        raise SakayDBError('Invalid values for range')
                    elif date_1 <= date_2:
                        case = 1
                elif (date_range[0] is None) and (date_range[1] is not None):
                    case = 2
                elif (date_range[0] is not None) and (date_range[1] is None):
                    case = 3

        if case == 0:
            df = df
        elif case != 1:
            start_date = pd.to_datetime(date_range[0],
                                        format='%H:%M:%S,%d-%m-%Y')
            end_date = pd.to_datetime(date_range[1],
                                      format='%H:%M:%S,%d-%m-%Y')
            if case == 1:
                df = df.loc[(df[filt] >= start_date) & (df[filt] <= end_date)]
            elif case == 2:
                df = df.loc[df[filt] <= end_date]
            elif case == 3:
                df = df.loc[df[filt] >= start_date]
        df = df.sort_values(by=filt)
        df = df.groupby(['pickup_loc_name',
                         'dropoff_loc_name',
                         pd.Grouper(key='pickup_datetime_1',
                                    freq='D')]).sum().reset_index()
        df = df.pivot_table(values='count',
                            index='dropoff_loc_name',
                            columns='pickup_loc_name',
                            aggfunc='mean',
                            fill_value=0)
        return df

# Asserts

In [127]:
import pickle
import shutil
import os
import pandas as pd
from tempfile import TemporaryDirectory
from numpy.testing import (assert_equal, assert_almost_equal,
                           assert_raises, assert_allclose)
sakay_db = SakayDB('.')
assert_equal(sakay_db.data_dir, '.')

In [128]:
trip_columns = ['trip_id', 'driver_id', 'pickup_datetime', 'dropoff_datetime',
                'passenger_count', 'pickup_loc_id', 'dropoff_loc_id',
                'trip_distance', 'fare_amount']
driver_columns = ['driver_id', 'given_name', 'last_name']
with TemporaryDirectory() as temp_dir:
    sakay_db = SakayDB(temp_dir)
    shutil.copy('locations.csv', os.path.join(temp_dir, 'locations.csv'))
    assert_equal(
        sakay_db.add_trip('Dailisan, Damian', '08:13:00,15-05-2022',
                          '08:46:00,15-05-2022', 2,
                          'UP Campus', 'Legazpi Village', 17.6, 412),
        1
    )
    assert_raises(
        SakayDBError,
        lambda: sakay_db.add_trip('Dailisan, Damian', '08:13:00,15-05-2022',
                                  '08:46:00,15-05-2022', 2, 'UP Campus',
                                  'Legazpi Village', 17.6, 412)
    )
    df_trips = pd.read_csv(os.path.join(temp_dir, 'trips.csv'))
    assert_equal(
        set(df_trips.columns.tolist()),
        set(trip_columns)
    )
    assert_equal(
        df_trips.to_numpy().tolist(),
        [[1, 1, '08:13:00,15-05-2022',
          '08:46:00,15-05-2022',
          2, 1, 2, 17.6, 412]]
    )
    assert_equal(
        df_trips.index.tolist(),
        [0]
    )
    df_drivers = pd.read_csv(os.path.join(temp_dir, 'drivers.csv'))
    assert_equal(
        set(df_drivers.columns.tolist()),
        set(driver_columns)
    )
    assert_equal(
        df_drivers[driver_columns].to_numpy().tolist(),
        [[1, 'Damian', 'Dailisan']]
    )
    assert_equal(
        df_drivers.index.tolist(),
        [0]
    )

In [129]:
with TemporaryDirectory() as temp_dir:
    sakay_db = SakayDB(temp_dir)
    shutil.copy('locations.csv', os.path.join(temp_dir, 'locations.csv'))
    assert_equal(
        sakay_db.add_trip('Dailisan, Damian', '08:13:00,15-05-2022',
                          '08:46:00,15-05-2022', 2,
                          'UP Campus', 'Legazpi Village', 17.6, 412),
        1
    )
    assert_equal(
        sakay_db.add_trip('Dorosan, Michael', '14:13:00,31-12-2022',
                          '14:46:00,31-12-2022', 1,
                          'Fairview', 'Highway Hills', 15.1, 371),
        2
    )
    assert_equal(
        sakay_db.add_trip('Alis, Christian', '09:13:00,16-08-2022',
                          '09:46:00,16-08-2022', 3,
                          'Loyola Heights', 'Legazpi Village', 8.9, 235),
        3
    )
    assert_equal(
        sakay_db.add_trip('Dailisan, Damian', '15:13:00,09-09-2022',
                          '15:46:00,09-09-2022', 2,
                          'Pasong Putik', 'San Antonio', 31.2, 716),
        4
    )
    assert_raises(
        SakayDBError,
        lambda: sakay_db.add_trip('Alis, Christian', '09:13:00,16-08-2022',
                                  '09:46:00,16-08-2022', 3,
                                  'Loyola Heights', 'Legazpi Village', 8.9, 235)
    )

In [130]:
with TemporaryDirectory() as temp_dir:
    sakay_db = SakayDB(temp_dir)
    shutil.copy('locations.csv', os.path.join(temp_dir, 'locations.csv'))
    assert_equal(
        sakay_db.add_trip('Dailisan, Damian', '08:13:00,15-05-2022',
                          '08:46:00,15-05-2022', 2,
                          'UP Campus', 'Legazpi Village', 17.6, 412),
        1
    )
    assert_equal(
        sakay_db.add_trip('Dailisan, Damian', '14:13:00,31-12-2022',
                          '14:46:00,31-12-2022', 1,
                          'Fairview', 'Highway Hills', 15.1, 371),
        2
    )
    assert_equal(
        sakay_db.add_trip('Dailisan, Damian', '09:13:00,16-08-2022',
                          '09:46:00,16-08-2022', 3,
                          'Fairview', 'Highway Hills', 17.6, 412),
        3
    )
    assert_raises(
        SakayDBError,
        lambda: sakay_db.add_trip(' Dailisan, Damian ', '09:13:00,16-08-2022',
                                  '09:46:00,16-08-2022', 3,
                                  ' Fairview ', ' Highway Hills', 17.6, 412)
    )

In [131]:
%%capture out
with TemporaryDirectory() as temp_dir:
    sakay_db = SakayDB(temp_dir)
    shutil.copy('locations.csv', os.path.join(temp_dir, 'locations.csv'))
    sakay_db.add_trips([
        {'driver': 'Dailisan, Damian',
         'pickup_datetime': '08:13:00,15-05-2022',
         'dropoff_datetime': '08:46:00,15-05-2022',
         'passenger_count': 2,
         'pickup_loc_name': 'UP Campus',
         'dropoff_loc_name': 'Legazpi Village',
         'trip_distance': 17.6,
         'fare_amount': 412},
        {'driver': 'Dorosan, Michael',
         'pickup_datetime': '14:13:00,31-12-2022',
         'dropoff_datetime': '14:46:00,31-12-2022',
         'passenger_count': 1,
         'pickup_loc_name': 'Fairview',
         'dropoff_loc_name': 'Highway Hills',
         'trip_distance': 15.1,
         'fare_amount': 371},
        {'driver': 'Alis, Christian',
         'pickup_datetime': '09:13:00,16-08-2022',
         'dropoff_datetime': '09:46:00,16-08-2022',
         'pickup_loc_name': 'Loyola Heights',
         'dropoff_loc_name': 'Legazpi Village',
         'trip_distance': 8.9,
         'fare_amount': 235},
        {'driver': 'Dailisan, Damian',
         'pickup_datetime': '15:13:00,09-09-2022',
         'dropoff_datetime': '15:46:00,09-09-2022',
         'passenger_count': 2,
         'pickup_loc_name': 'Pasong Putik',
         'dropoff_loc_name': 'San Antonio',
         'trip_distance': 31.2,
         'fare_amount': 716},
        {'driver': 'Dorosan, Michael',
         'pickup_datetime': '14:13:00,31-12-2022',
         'dropoff_datetime': '14:46:00,31-12-2022',
         'passenger_count': 1,
         'pickup_loc_name': 'Fairview',
         'dropoff_loc_name': 'Highway Hills',
         'trip_distance': 15.1,
         'fare_amount': 371}
    ])

In [132]:
assert_equal(
    out.stdout,
    'Warning: trip index 2 has invalid or incomplete information. '
    'Skipping...\n'
    'Warning: trip index 4 is already in the database. Skipping...\n'
)

In [133]:
with TemporaryDirectory() as temp_dir:
    sakay_db = SakayDB(temp_dir)
    assert_raises(SakayDBError, lambda: sakay_db.generate_statistics('Trips'))
    assert_equal(sakay_db.generate_statistics('trip'), {})
    assert_equal(sakay_db.generate_statistics('passenger'), {})
    assert_equal(sakay_db.generate_statistics('driver'), {})
    assert_equal(
        sakay_db.generate_statistics('all'),
        {'trip': {}, 'passenger': {}, 'driver': {}}
    )
    shutil.copy('trips_test2.csv', os.path.join(temp_dir, 'trips.csv'))
    shutil.copy('drivers_test2.csv',
                os.path.join(temp_dir, 'drivers.csv'))
    shutil.copy('locations.csv', os.path.join(temp_dir, 'locations.csv'))
    stats_trip = sakay_db.generate_statistics('trip')
    assert_equal(len(stats_trip), 7)
    assert_almost_equal(stats_trip['Friday'], 41.794117647058826)
    assert_almost_equal(stats_trip['Sunday'], 41.714285714285715)

In [134]:
with TemporaryDirectory() as temp_dir:
    sakay_db = SakayDB(temp_dir)
    shutil.copy('trips_test2.csv', os.path.join(temp_dir, 'trips.csv'))
    shutil.copy('drivers_test2.csv',
                os.path.join(temp_dir, 'drivers.csv'))
    shutil.copy('locations.csv', os.path.join(temp_dir, 'locations.csv'))
    stats_passenger = sakay_db.generate_statistics('passenger')
    assert_equal(len(stats_passenger), 4)
    assert_equal(len(stats_passenger[0]), 7)
    assert_almost_equal(
        stats_passenger[0]['Monday'],
        9.371428571428572
    )
    stats_driver = sakay_db.generate_statistics('driver')

    assert_equal(len(stats_driver), 200)
    assert_almost_equal(
        stats_driver['Dome, Benyamin']['Saturday'],
        1
    )
    stats_all = sakay_db.generate_statistics('all')
    assert_equal(set(stats_all.keys()), {'trip', 'passenger', 'driver'})
    assert_equal(len(stats_all['trip']), 7)
    assert_almost_equal(stats_all['trip']['Tuesday'], 39.74285714285714)
    assert_equal(len(stats_all['passenger']), 4)
    assert_almost_equal(stats_all['passenger'][0]['Monday'],
                        9.371428571428572)
    assert_almost_equal(
        stats_all['driver']['Dome, Benyamin']['Saturday'],
        1
    )
    assert_equal(len(stats_all['driver']), 200)

In [135]:
with TemporaryDirectory() as temp_dir:
    sakay_db = SakayDB(temp_dir)
    assert_raises(SakayDBError, lambda: sakay_db.search_trips())
    assert_equal(sakay_db.search_trips(driver_id=1), [])
    shutil.copy('trips_test.csv', os.path.join(temp_dir, 'trips.csv'))
    shutil.copy('drivers_test.csv', os.path.join(temp_dir, 'drivers.csv'))
    assert_raises(SakayDBError, lambda: sakay_db.search_trips())

    assert_equal(sakay_db.search_trips(driver_id=1).to_numpy().tolist(),
                 [[1, 1, '08:13:00,15-05-2022', '08:46:00,15-05-2022', 2, 1, 2, 17.6, 412],
                  [4, 1, '15:13:00,09-09-2022', '15:46:00,09-09-2022', 2, 6, 7, 31.2, 716]]
                 )
    assert_equal(sakay_db.search_trips(fare_amount=(None, 300)).to_numpy().tolist(),
                 [[3, 3, '09:13:00,16-08-2022', '09:46:00,16-08-2022', 3, 5, 2, 8.9, 235]]
                 )
    assert_equal(sakay_db.search_trips(driver_id=1, fare_amount=(None, 300)).to_numpy().tolist(),
                 [])
    assert_equal(sakay_db.search_trips(driver_id=1, fare_amount=(200, 500)).to_numpy().tolist(),
                 [[1, 1, '08:13:00,15-05-2022', '08:46:00,15-05-2022', 2, 1, 2, 17.6, 412]])

    assert_equal(sakay_db.search_trips(
                 pickup_datetime=('00:00:00,1-05-2022', '23:59:59,31-08-2022')
                 ).to_numpy().tolist(),
                 [[1, 1, '08:13:00,15-05-2022', '08:46:00,15-05-2022', 2, 1, 2, 17.6, 412],
                  [3, 3, '09:13:00,16-08-2022', '09:46:00,16-08-2022', 3, 5, 2, 8.9, 235]]
                 )

In [136]:
with TemporaryDirectory() as temp_dir:
    sakay_db = SakayDB(temp_dir)
    assert_equal(sakay_db.generate_odmatrix().to_numpy().tolist(), [])
    shutil.copy('trips_test2.csv', os.path.join(temp_dir, 'trips.csv'))
    shutil.copy('drivers_test2.csv',
                os.path.join(temp_dir, 'drivers.csv'))
    shutil.copy('locations.csv', os.path.join(temp_dir, 'locations.csv'))
    od_df = sakay_db.generate_odmatrix()
    assert_equal(od_df.shape, (48, 48))
    assert_equal(od_df.loc['Macpherson', 'UP Campus'], 1.25)
    assert_equal(od_df.iloc[-2, -2], 0)
    assert_equal(sakay_db
                   .generate_odmatrix(date_range=('00:00:00,1-08-2022', '23:59:59,31-08-2022'))
                   .shape, (48, 48))