In [80]:
from google.cloud import storage
from datetime import datetime, timedelta
import pandas as pd
import pyarrow.parquet as pq
import io

bucket_name = "bigdata-chmurki-nifi"
client = storage.Client()

def modify_name(value):
    if str(value).startswith('N'):
        return '99' + str(value)[1:].zfill(2)
    else:
        return value


def minutes_to_midnight(time_value):
    current_time = datetime.combine(datetime.today(), time_value)
    midnight = datetime.combine(datetime.today(), datetime.min.time()) + timedelta(days=1)
    minutes_to_next_midnight = (midnight - current_time).seconds // 60
    minutes_from_last_midnight = current_time.hour * 60 + current_time.minute
    return min(minutes_to_next_midnight, minutes_from_last_midnight)

def read_positions(date = None):
    '''
    Function to read vehicles positions. If date is specified, it reads data only from given date.
    Else, it reads all the data found (e.g. for model first training).
    '''
    positions = []
    
    for blob in client.list_blobs(bucket_name, prefix='warsaw'):
        if date is None:
            if blob.name.count('Act_position') == 1:
                table = pq.read_table(io.BytesIO(blob.download_as_bytes()))
                df = table.to_pandas()
                df = df.explode('vehicles').reset_index(drop=True)
                df_flat = pd.json_normalize(df['vehicles'])
                df_flat = pd.concat([df_flat, df], axis=1)
                positions.append(df_flat)
        else:
            if blob.name.count(f'Act_position_{date}') == 1:
                table = pq.read_table(io.BytesIO(blob.download_as_bytes()))
                df = table.to_pandas()
                df = df.explode('vehicles').reset_index(drop=True)
                df_flat = pd.json_normalize(df['vehicles'])
                df_flat = pd.concat([df_flat, df], axis=1)
                positions.append(df_flat)
        
    positions_df = pd.concat(positions, ignore_index=True)
    
    cols_to_drop = ['vehicles',
            'headsign',
            'vehicleCode',
            'vehicleId',
            'direction',
            'gpsQuality',
            'tripId',
            'tripId.member0',
            'tripId.member1',
            'lastUpdate']
    for col in cols_to_drop:
        try:
            positions_df = positions_df.drop(col,axis=1)
        except KeyError:
            continue
    positions_df = positions_df.drop_duplicates().reset_index(drop=True)
    positions_df['routeShortName'] = positions_df['routeShortName'].apply(lambda x: modify_name(x))
    return positions_df

def add_15_minutes(positions_df):
    '''
    Function that based on given vehicles positions dataframes reads their delay in 15 minutes.
    '''
    df = positions_df.copy()
    df['generated'] = pd.to_datetime(df['generated'])
    df['generated_weather_bin'] = df['generated'].dt.round('10min')
    df = df.dropna()
    df['time'] = df['generated'].apply(lambda x: x.time())
    df['generated'] = df['generated'].dt.round('1min')
    df.drop_duplicates(subset=['generated', 'vehicleService', 'scheduledTripStartTime'], keep='first', inplace=True)

    df['generated_15min'] = df['generated'] + pd.to_timedelta('15min')
    merged_df = pd.merge(df, df, left_on=['vehicleService', 'scheduledTripStartTime', 'generated_15min'],
                              right_on=['vehicleService', 'scheduledTripStartTime', 'generated'],
                              suffixes=('', '_15min'))
    merged_df = merged_df.drop(['generated_15min','generated_15min',
                               'routeShortName_15min','speed_15min',
                                'generated_15min_15min','generated_weather_bin_15min'],axis=1)
    
    merged_df['time'] = merged_df['time'].apply(lambda x: minutes_to_midnight(x))
    merged_df['time_15min'] = merged_df['time_15min'].apply(lambda x: minutes_to_midnight(x))
    return merged_df

def read_weather(date=None):
    actuals = []
    forecasts = []
    
    for blob in client.list_blobs(bucket_name, prefix='weather'):
        if date is None:
            if blob.name.count('Act_weather_') == 1:
                table = pq.read_table(io.BytesIO(blob.download_as_bytes()))
                df = table.to_pandas()
                df_flat = pd.json_normalize(df['current'])
                df_flat = pd.concat([df_flat, df], axis=1)
                df_flat = df_flat.drop(['current'],axis=1)
                actuals.append(df_flat)
            elif blob.name.count('FC_weather_') == 1:
                table = pq.read_table(io.BytesIO(blob.download_as_bytes()))
                df = table.to_pandas()
                df_flat = pd.json_normalize(df['hourly'])
                df_flat['data'] = df_flat['data'].apply(lambda x: x[0] if x is not None and len(x) > 0 else None)
                df_flat = pd.json_normalize(df_flat['data'])
                df_flat = pd.concat([df_flat, df], axis=1)
                df_flat = df_flat.drop(['hourly'],axis=1)
                forecasts.append(df_flat)
            else:
                continue
        else:
            if blob.name.count(f'Act_weather_{date}') == 1:
                table = pq.read_table(io.BytesIO(blob.download_as_bytes()))
                df = table.to_pandas()
                df_flat = pd.json_normalize(df['current'])
                df_flat = pd.concat([df_flat, df], axis=1)
                df_flat = df_flat.drop(['current'],axis=1)
                actuals.append(df_flat)
            elif blob.name.count('FC_weather_') == 1:
                table = pq.read_table(io.BytesIO(blob.download_as_bytes()))
                df = table.to_pandas()
                df_flat = pd.json_normalize(df['hourly'])
                df_flat['data'] = df_flat['data'].apply(lambda x: x[0] if x is not None and len(x) > 0 else None)
                df_flat = pd.json_normalize(df_flat['data'])
                df_flat = pd.concat([df_flat, df], axis=1)
                df_flat = df_flat.drop(['hourly'],axis=1)
                forecasts.append(df_flat)
            else:
                continue
        
    actual_df = pd.concat(actuals, ignore_index=True).drop_duplicates().reset_index(drop=True)
    forecast_df = pd.concat(forecasts, ignore_index=True).drop_duplicates().reset_index(drop=True)
    
    actual_df['time'] = pd.to_datetime(actual_df['time'])
    actual_df['generated_weather_bin'] = actual_df['time'].dt.round('10min')
    actual_df['generated_forecast_bin'] = actual_df['time'].dt.round('60min').dt.tz_convert('UTC')+ pd.Timedelta(hours=1)
    
    actual_df = actual_df.drop(['time','icon','icon_num','wind.dir',
                                'lat','lon','elevation','timezone','units',
                                'hourly','daily'],axis=1)
    
    forecast_df['date'] = pd.to_datetime(forecast_df['date']).dt.tz_localize('UTC')
    forecast_df.rename(columns={'cloud_cover.total': 'cloud_cover'}, inplace=True)
    forecast_df = forecast_df.drop(['weather','icon','wind.dir',
                                    'lat','lon','elevation','timezone','units','current','daily','time'],axis=1)
    
    weather_df = pd.merge(actual_df,forecast_df,left_on = 'generated_forecast_bin',right_on='date',
                          suffixes=('_actual', '_forecast'))
    weather_df = weather_df.drop(['generated_forecast_bin','date'],axis=1)
    weather_df = weather_df.drop_duplicates().reset_index(drop=True)

    return weather_df
    

def finish_df(df):
    df = df.drop(['generated','generated_weather_bin','vehicleService', 'scheduledTripStartTime'],axis=1)
    cols_to_ohe = ['summary_actual', 'precipitation.type_actual', 'summary_forecast', 'precipitation.type_forecast']
    ohes = []

    for col in cols_to_ohe:
        ohes.append(pd.get_dummies(df[col], prefix=col))

    df = pd.concat([df]+ohes, axis=1)
    df = df.drop(cols_to_ohe, axis=1)
    return df

def read_timetable(date = None):
    '''
    Function that reads timetable for all the trips scheduled for given day (or all day if date is None).
    '''
    timetables = []
    
    for blob in client.list_blobs(bucket_name, prefix='warsaw'):
        if date is not None:
            if blob.name.count(f'Stops_{date}') == 1:
                timetable = pd.read_csv(f'gs://{bucket_name}/{blob.name}')
                timetables.append(timetable)
                break
        else:
            if blob.name.count(f'Stops') == 1:
                timetable = pd.read_csv(f'gs://{bucket_name}/{blob.name}')
                timetables.append(timetable)
                
    timetable_df = pd.concat(timetables, ignore_index=True)
    timetable_df = timetable_df[['trip_id','arrival_time','stop_id','stop_sequence']]
    
    for blob in client.list_blobs(bucket_name, prefix='warsaw'):
        if date is None:
            date = '2023-12-17'
        if blob.name.count(f'Stop_times_position_{date}') == 1:
            stops = pd.read_csv(f'gs://{bucket_name}/{blob.name}')
            break
    stops = stops[['stop_id','stop_lat','stop_lon']]
    
    timetable_full = pd.merge(timetable_df,stops,on='stop_id')
    timetable_full = timetable_full.drop_duplicates().reset_index(drop=True)
    return timetable_full

def find_future_coords(df, datetime_str, code, delay, time):
    '''
    Function that for given vehicleService and scheduledTrupStartTime (composite identifier of trip),
    current delay and time returns position of expected stop in 15 minutes.
    '''
    dt_components = pd.to_datetime(datetime_str)
    year = str(dt_components.year)
    month = str(dt_components.month).zfill(2)  
    day = str(dt_components.day).zfill(2)
    hour = str(dt_components.hour).zfill(2)
    minute = str(dt_components.minute).zfill(2)
    pattern = f'{code.split("-")[0]}{year}{month}{day}{hour}{minute}.*_{code}'
    time_format = "%H:%M:%S"
    filtered_df = df[df['trip_id'].str.contains(pattern, case=False)].sort_values(['arrival_time']).reset_index(drop=True)
    filtered_df = filtered_df[filtered_df['arrival_time']>=(datetime.strptime(time, time_format) - timedelta(minutes=int(delay))).strftime(time_format)].head(1)
    
    return filtered_df['stop_lat'], filtered_df['stop_lon']

def add_future_coords(df, timetable):
    '''
    Function that appends columns with predicted coords to each row according to timetable.
    '''
    def calculate_future_time(generated_time):
        future_time = generated_time + pd.to_timedelta('15min')
        return future_time.strftime('%H:%M:%S')

    df['future_time'] = df['generated'].apply(calculate_future_time)
    df['coords'] = df.apply(lambda row: find_future_coords(timetable, row['scheduledTripStartTime'], row['vehicleService'], row['delay'], row['future_time']), axis=1)
    df['lat_15min'] = df['coords'].apply(lambda x: x[0])
    df['lon_15min'] = df['coords'].apply(lambda x: x[1])
    df.drop(['coords','future_time'], axis=1, inplace=True)
    
    return df


## Example of usage

In [81]:
df = read_positions()
df = add_15_minutes(df)
w = read_weather()
df = pd.merge(df,w,on='generated_weather_bin')
df = finish_df(df)

In [82]:
from sklearn.ensemble import RandomForestRegressor
from sklearn.model_selection import train_test_split

X_train, X_test, y_train, y_test = train_test_split(df.iloc[:, df.columns != 'delay_15min'], df["delay_15min"], test_size=0.33, random_state=42)

clf = RandomForestRegressor(n_estimators=100, max_depth=1, random_state=0).fit(X_train, y_train)
clf.score(X_test, y_test)

0.5038159983319095

In [89]:
import pickle
pickle.dump(clf, open("model.sav", 'wb'))