In [1]:
import os
os.environ['PYDEVD_WARN_SLOW_RESOLVE_TIMEOUT'] = '2'

In [2]:
# Import libraries
import numpy as np
import matplotlib.pyplot as plt
import pandas as pd
# Set the option to prevent the FutureWarning
pd.set_option('future.no_silent_downcasting', True)
import datetime as dt
from functions import *
from dtype_dictionaries import *

### Import files with arrival and departure times

In [3]:
# Position of the gtfs post-rating schedule files
gtfs_post_rating_files = os.listdir('gtfsSchedule')
# Sort the elements in alphabetical order. I need to ensure this otherwise the strategy of keeping the latest df in memory won't work
gtfs_post_rating_files.sort()

# List only the files needed later, avoid to import the whole folder
txt_list = (['calendar.txt', 'calendar_attributes.txt', 'calendar_dates.txt', 'feed_info.txt', 'routes.txt', 'stop_times.txt', 'stops.txt', 'trips.txt'])
# Position of MBTA_ArrivalDepartureTimes files
ArrDepFolder = 'MBTA_ArrivalDepartureTimes'
# Position of the  parsed ArrivalDepartureTimes files
parsed_ArrDepFolder = 'parsed_ArrivalDepartureFiles'
# List of files contained in the parsed_ArrivalDepartureFiles folder
parsed_gtfs_rt = os.listdir(parsed_ArrDepFolder)
parsed_gtfs_rt.sort()

latest_month_df_list = []
chunk_size = 12**5

adt_list = []

# Initialize to store calendar days and related services, to be converted to dataframe
calendar_service_map = []
# Initialize the list of dataframes to be concatenated containing all the data from gtfs rt
df_ArrDep_list = []
# Initialize the list of gtfs_schedules to be concatenated
gtfs_schedules_list = []

start_date_list = []
end_date_list = []
# Read files or import table?
import_separate_files = False
# Flag that will be used to skip the first element of the compatibleFiles list
first_round = 0 
for postRatingRecap_file in gtfs_post_rating_files[0:1]:
    print(f'Processing {postRatingRecap_file}...')
    gtfs_post_rating_folder = os.path.join('gtfsSchedule', postRatingRecap_file)
    calendar, calendar_attributes, calendar_dates, feed_info, routes, stop_times, stops, trips, schedule = get_gtfs_post_rating_txt_files(gtfs_post_rating_folder, txt_list, gtfs_cols)
    # Add schedule to the list of gtfs_schedules
    gtfs_schedules_list.append(schedule)    
    # Save feed_start_date and feed_end_date
    start_date = feed_info['feed_start_date'][0]
    start_date_list.append(start_date)
    end_date = feed_info['feed_end_date'][0]
    end_date_list.append(end_date)
    calendar_data = parse_calendar_file(calendar)
    calendar_dates_data = parse_calendar_dates_file(calendar_dates)
    calendar_schedule = generate_schedule(start_date, end_date, calendar_data, calendar_dates_data)
    calendar_service_map.append(calendar_schedule)
    
    if first_round:
        # Remove first element from compatibleFiles
        compatibleFiles = compatibleFiles[1:]
        adt_list = adt_list + (latest_month_df_list)
        latest_month_df_list = []
    if import_separate_files:
        # Return the list of compatible files
        compatibleFiles = get_compatible_files(ArrDepFolder, start_date, end_date)
        for filename in (compatibleFiles):   
            print(f'Importing {filename}...')  
            for chunk in pd.read_csv(filename, chunksize=chunk_size, dtype=adt_dtype_map, low_memory=False):
                # Carry out here any filtering, drop or cutting down operation
                chunk = reduce_df_size(chunk)
                adt_list.append(chunk)
                if filename == compatibleFiles[-1]:
                    latest_month_df_list.append(chunk)
        first_round = 1
        print('Concatenating...')
        #adt_df = pd.concat(adt_list, axis=0)
        # Filter out all the records that lie outside the feed_start_date and feed_end_date rang
        adt_df = adjust_adt_df_settings(adt_df, routes, start_date, end_date)
        # Save the dataframe to a csv file: filename is equal as 'feed_start_date_feed_end_date.csv'
        export_filename = start_date.strftime('%Y%m%d') + '_' + end_date.strftime('%Y%m%d') + '.csv'
        filepath = os.path.join('parsed_ArrivalDepartureFiles', export_filename)
        print(f'Exporting {filepath}...')
        #adt_df.to_csv(filepath, index=False)

    else:        
        # Get list index of postRatingRecap_file
        idx = gtfs_post_rating_files.index(postRatingRecap_file)
        parsed_file = parsed_gtfs_rt[idx]
        print(f'Importing {parsed_file}...') 
        filepath = os.path.join(parsed_ArrDepFolder, parsed_file)
        for chunk in pd.read_csv(filepath, chunksize=chunk_size, dtype=adt_dtype_map, low_memory=False):
            chunk['service_date'] = pd.to_datetime(chunk['service_date'], format='%Y-%m-%d')
            chunk['scheduled'] = pd.to_datetime(chunk['scheduled'], format='%H:%M:%S')
            chunk['actual'] = pd.to_datetime(chunk['actual'], format='%H:%M:%S')            
            adt_list.append(chunk)

print('Concatenating...')      
adt_df = pd.concat(adt_list, axis=0, ignore_index=True)
# Convert service_date, scheduled and actual columns to datetime objects
print('Converting to datetime...')
#adt_df['service_date'] = pd.to_datetime(adt_df['service_date'], format='%Y-%m-%d')
#adt_df['scheduled'] = pd.to_datetime(adt_df['scheduled'], format='%H:%M:%S')
#adt_df['actual'] = pd.to_datetime(adt_df['actual'], format='%H:%M:%S')

# Build the calendar_service_map and gtfs_schedule dataframe
# Convert every element in calendar_service_map to a dataframe
calendar_df = pd.concat([pd.DataFrame(map, columns=['date', 'day_of_week', 'service_ids']) for map in calendar_service_map], axis=0, ignore_index=True)

#df_gtfs_schedule = pd.concat(gtfs_schedules_list, axis=0, ignore_index=True)
#adt_df_original = adt_df.copy()

Processing 01_gtfs_post-recap_Winter_2022...
Importing 20211219_20220312.csv...
Concatenating...
Converting to datetime...


In [4]:
#[print(f'route {route}') for route in adt_df.route_id.unique()]
#adt_df.route_id = adt_df.route_id.str.rstrip('_')
adt_df.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 1849029 entries, 0 to 1849028
Data columns (total 13 columns):
 #   Column            Dtype         
---  ------            -----         
 0   service_date      datetime64[ns]
 1   route_id          category      
 2   direction_id      category      
 3   half_trip_id      string        
 4   stop_id           object        
 5   time_point_id     object        
 6   time_point_order  Int16         
 7   point_type        category      
 8   standard_type     category      
 9   scheduled         datetime64[ns]
 10  actual            datetime64[ns]
 11  block_id          float64       
 12  service_id        float64       
dtypes: Int16(1), category(4), datetime64[ns](3), float64(2), object(2), string(1)
memory usage: 127.0+ MB


### Combine arrival and departure times with scheduled information

In [11]:
# Create two lists to store the unmatched names and groups
unmatched_names = []
unmatched_groups = []

route_ids = adt_df['route_id'].unique()
# From the start and end date lists, extract the feed_start_date and feed_end_date and convert them to datetime ranges
feed_start_date = pd.to_datetime(start_date_list, format='%Y%m%d')
feed_end_date = pd.to_datetime(end_date_list, format='%Y%m%d')

# Variable we wish to groupby
grouping_vars = ['direction_id', 'scheduled']

for route in route_ids:
    print(f'Processing route {route}...')
    # Fetch subset of the ArrivalDepartureTimes dataframe for the current route and stop_sequence ==1
    adt_route = adt_df.loc[(adt_df['route_id'] == route) & (adt_df['point_type'] == 'Startpoint')]

    for feeds in zip(feed_start_date[0:1], feed_end_date[0:1], calendar_service_map[0:1], gtfs_schedules_list[0:1]):
        # Print the feed_start_date and feed_end_date
        print(f'Feed start date: {feeds[0]}, Feed end date: {feeds[1]}')
        #Filter adt_df to keep only the rows that lie within the feed_start_date and feed_end_date range
        date_filter = (adt_route['service_date'] >= feeds[0]) & (adt_route['service_date'] <= feeds[1])
        adt_grouped = adt_route[date_filter]
        service_map = feeds[2]
        gtfs_schedule = feeds[3]
        # Fetch subset of the GTFS schedule dataframe for the current route
        schedule_route = gtfs_schedule[(gtfs_schedule['route_id'] == route)&(gtfs_schedule['stop_sequence'] == 1)]

        adt_grouped = adt_grouped.groupby(grouping_vars, observed=True)
        schedule_grouped = schedule_route.groupby(grouping_vars, observed=True)
    
        for name, group in adt_grouped:
            # print the group name
            #print(f'{name}...')
            if name in schedule_grouped.groups:
                # extract the corresponding group from schedule_route10_grouped
                schedule_group = schedule_grouped.get_group(name)
                schedule_services = set(schedule_group['service_id'])
                # This is a series whose index is the service_id and the values are the block_ids
                schedule_service_block_ids = schedule_group.groupby(['service_id'], observed=True)['block_id'].apply(list)

                # extract the subset of the calendar_df that matches the service_date
                service_days = calendar_df.loc[calendar_df.date.isin(group.service_date)]
                # loop through the service_days
                for i, row in service_days.iterrows():
                    # add the intersection between schedule_services and row['service_ids'] to the service_id column in adt_route10 as plain strings
                    adt_service_ids = schedule_services.intersection(row['service_ids'])
                    adt_service_ids_str = ', '.join(adt_service_ids)  # Convert set to string
                    calendar_idx = group.loc[group.service_date == row.date].index
                    half_trip_id_value = adt_df.loc[calendar_idx, 'half_trip_id'].values[0]
                    # Assign service_id to row with Startpoint
                    adt_df.loc[calendar_idx, 'service_id'] = adt_service_ids_str
                    # Assign half_trip_id to row with Endpoint --> this is the same as the half_trip_id of the corresponding Startpoint
                    adt_df.loc[adt_df.half_trip_id == half_trip_id_value, 'service_id'] = adt_service_ids_str
                    # Get the block_id list associated to adt_service_ids_str
                    block_list=schedule_service_block_ids[schedule_service_block_ids.index==adt_service_ids_str]
                    block_list = block_list.iloc[0] if not block_list.empty else ''

                    if block_list and not group.empty:
                        # Concatenate block_list elements into a comma-separated string
                        block_ids_str = ', '.join(block_list)
                        # Assign the concatenated string to the Startpoint row
                        adt_df.loc[calendar_idx, 'block_id'] = block_ids_str
                        # Assign the concatenated string to the Endpoint row
                        adt_df.loc[adt_df.half_trip_id == half_trip_id_value, 'block_id'] = block_ids_str

            else:
                #print(f'No match found for group {name}...')
                unmatched_names.append(name)
                unmatched_groups.append(group)

    # Save the route-specific df to a csv file
    export_filename = route + '.csv'
    filepath = os.path.join('block_ids_df', export_filename)
    adt_df.loc[adt_df.route_id == route].to_csv(filepath, index=False)

Processing route 1...
Feed start date: 2021-12-19 00:00:00, Feed end date: 2022-03-12 00:00:00


KeyboardInterrupt: 

In [None]:
adt_df_block_ids = adt_df.copy()
# Remove the rows with empty block_id
adt_df_block_ids = adt_df_block_ids[adt_df_block_ids['block_id'].notna()]
# Remove Midpoint rows
adt_df_block_ids = adt_df_block_ids[adt_df_block_ids['block_id'] != 'Midpoint']
# Find the rows where block_id contains more than one element
adt_df_block_ids[adt_df_block_ids['block_id'].str.contains(',')]

In [None]:
print('Split Blocks')
adt_df_block_ids = split_multiple_block_id(adt_df_block_ids)
print('Assign Blocks to endpoints')

In [None]:
# Assign the same values of block_id and service_id to Endpoint records with the same half_trip_id
trips_grouped = adt_df.groupby(['half_trip_id'], observed=True)
for name, group in trips_grouped:
    if len(group.index) > 1:
        adt_df.loc[group.index[1], ['block_id', 'service_id']] = group.loc[group.index[0], ['block_id', 'service_id']].values
    else:
        print(name)
        print(group)

In [None]:
trips_grouped = adt_df.groupby(['half_trip_id'], observed=True)
for name, group in trips_grouped:
    if len(group.index) > 1:
        adt_df.loc[group.index[1], ['block_id', 'service_id']] = group.loc[group.index[0], ['block_id', 'service_id']].values
    else:
        print(name)
        print(group)

In [None]:
# Assign the same values of block_id and service_id to Endpoint records with the same half_trip_id
trips_grouped = adt_df.groupby(['half_trip_id'], observed=True)
for name, group in trips_grouped:
    adt_df.loc[group.index[1], ['block_id', 'service_id']] = group.loc[group.index[0], ['block_id', 'service_id']].values

### Compute layover

In [None]:
# Create layover_df as a copy
layover_df = adt_route10.copy()
layover_df = layover_df.sort_values(by=['block_id','service_date','half_trip_id','departure_time'])
layover_df=layover_df.reset_index(drop=True)

# Group by 'block_id' and 'service_date', skipping rows with null 'service_id' or 'block_id'
grouped = layover_df.loc[layover_df.block_id.notna()].groupby(['block_id', 'service_date'])

# Calculate theoretical and actual layover times using diff()
layover_df['theoretical_layover'] = grouped['departure_time'].diff().dt.total_seconds() / 60
layover_df['actual_layover'] = grouped['actual'].diff().dt.total_seconds() / 60

# Replace the first row of each group with null timedelta
layover_df.loc[grouped.head(1).index, ['theoretical_layover', 'actual_layover']] = 0
layover_df.loc[layover_df.time_point_order != 1, ['theoretical_layover', 'actual_layover']] = np.nan
layover_df = layover_df.drop(columns=['time_point_order', 'point_type', 'standard_type'])
(layover_df.groupby(['block_id', 'service_date'])[['theoretical_layover', 'actual_layover']]
            .agg({'theoretical_layover': ['mean', 'max', 'count'],
                  'actual_layover': ['mean', 'max', 'count'],
                  'stop_id': 'first'})
)

In [None]:
test = adt_route10.groupby(['half_trip_id'])
startpoint_df = test.get_group