In [1]:
import geopandas as gpd
import numpy as np
import pandas as pd
import boto.s3



In [2]:
def collectAllData(inDirectory, outDirectory, popDirectory):
    trips, activities, personToTripDeparture = processPlans(popDirectory)

    PTs, PEVs, PLVs = processEvents(inDirectory)

    BGs = gpd.read_file('/Users/cpoliziani/Downloads/sfbay-blockgroups-2010/641aa0d4-ce5b-4a81-9c30-8790c4ab8cfb202047-1-wkkklf.j5ouj.shp')

    trips = addGeometryIdToDataFrame(trips, BGs, 'originX', 'originY', 'startBlockGroup')
    trips = addGeometryIdToDataFrame(trips, BGs, 'destinationX', 'destinationY', 'endBlockGroup')

    activities = addGeometryIdToDataFrame(activities, BGs, 'x', 'y', 'activityBlockGroup')

    PTs = addGeometryIdToDataFrame(PTs, BGs, 'startX', 'startY', 'startBlockGroup')
    PTs = addGeometryIdToDataFrame(PTs, BGs, 'endX', 'endY', 'endBlockGroup')
    PTs.index.set_names('PathTraversalID', inplace=True)

    trips.to_csv(outDirectory + '/trips.csv.gz', index=True)
    PTs.to_csv(outDirectory + '/pathTraversals.csv.gz', index=True)
    activities.to_csv(outDirectory + '/activities.csv.gz', index=True)
    
    


In [3]:
def processEvents(directory):
    fullPath = directory + 'ITERS/it.0/0.events.csv.gz'
    PTs = []
    PEVs = []
    PLVs = []
    print('Reading ', fullPath)
    for chunk in pd.read_csv("s3://beam-outputs/" + fullPath, chunksize=1500000):
        if sum((chunk['type'] == 'PathTraversal')) > 0:
            chunk['vehicle'] = chunk['vehicle'].astype(str)
            PT = chunk.loc[(chunk['type'] == 'PathTraversal') & (chunk['length'] > 0)].dropna(how='all', axis=1)
            PT['departureTime'] = PT['departureTime'].astype(int)
            PT['arrivalTime'] = PT['arrivalTime'].astype(int)
            if 'riders' in PT.columns:
                PT['riders'] = PT.riders.apply(ridersToList)
            else:
                PT['riders'] = [[]] * len(PT)
            PTs.append(PT[['driver', 'vehicle', 'mode', 'length', 'startX', 'startY', 'endX', 'endY', 'vehicleType',
                           'arrivalTime', 'departureTime', 'primaryFuel', 'primaryFuelType', 'secondaryFuel',
                           'secondaryFuelType', 'numPassengers', 'riders']])
            PEV = chunk.loc[(chunk.type == "PersonEntersVehicle") &
                            ~(chunk['person'].apply(str).str.contains('Agent').fillna(False)) &
                            ~(chunk['vehicle'].str.contains('body').fillna(False)), :].dropna(how='all', axis=1)
            if ~PEV.empty:
                PEV['person'] = PEV['person'].astype(int)
                PEV['time'] = PEV['time'].astype(int)
                PEVs.append(PEV)

            PLV = chunk.loc[(chunk.type == "PersonLeavesVehicle") &
                            ~(chunk['person'].apply(str).str.contains('Agent').fillna(False)) &
                            ~(chunk['vehicle'].str.contains('body').fillna(False)), :].dropna(how='all', axis=1)
            if ~PLV.empty:
                PLV['person'] = PLV['person'].astype(int)
                PLV['time'] = PLV['time'].astype(int)
                PLVs.append(PLV)
    PTs = fixPathTraversals(pd.concat(PTs))
    return PTs, pd.concat(PEVs), pd.concat(PLVs)

In [4]:
def processPlans(directory):
    fullPath = directory + 'plans.csv.gz'
    trips = []
    activities = []
    personToTripDeparture = {}
    print(fullPath)
    df = pd.read_csv("s3://beam-outputs/" + fullPath)
    df = addTimesToPlans(df)
    legs = df.loc[(df['ActivityElement'].str.lower().str.contains('leg'))].dropna(how='all', axis=1)
    legsSub = legs[['person_id', 'legDepartureTime',  'PlanElementIndex', 'originX', 'originY', 'destinationX', 'destinationY']]
    for rowID, val in legsSub.iterrows():
        personToTripDeparture.setdefault(val.person_id, []).append(
            {"planID": val.PlanElementIndex, "departureTime": val.legDepartureTime * 3600.0})
    trips.append(legsSub)
    acts = df.loc[(df['ActivityElement'].str.lower().str.contains('activity'))].dropna(how='all', axis=1)

    actsSub = acts[['person_id', 'ActivityType', 'x', 'y', 'departure_time']]
    activities.append(actsSub)
    return pd.concat(trips), pd.concat(activities), personToTripDeparture




In [5]:
def addGeometryIdToDataFrame(df, gdf, xcol, ycol, idColumn="geometry", df_geom='epsg:4326'):
    gdf_data = gpd.GeoDataFrame(df, geometry=gpd.points_from_xy(df[xcol], df[ycol]))
    #gdf_data.set_crs(df_geom)
    gdf_data.crs = {'init': df_geom}
    joined = gpd.sjoin(gdf_data.to_crs('epsg:26910'), gdf.to_crs('epsg:26910'))
    gdf_data = gdf_data.merge(joined['blkgrpid'], left_index=True, right_index=True, how="left")
    gdf_data.rename(columns={'blkgrpid': idColumn}, inplace=True)
    df = pd.DataFrame(gdf_data.drop(columns='geometry'))
    df.drop(columns=[xcol, ycol], inplace=True)
    return df.loc[~df.index.duplicated(keep='first'), :]

In [6]:
def addTimesToPlans(plans):
    legInds = np.where(plans['ActivityElement'].str.lower() == "leg")[0]
    plans.loc[:, 'legDepartureTime'] = np.nan
    plans.iloc[legInds, plans.columns.get_loc('legDepartureTime')] = plans['departure_time'].iloc[legInds - 1].copy()
    plans.loc[:, 'originX'] = np.nan
    plans.iloc[legInds, plans.columns.get_loc('originX')] = plans['x'].iloc[legInds - 1].copy()
    plans.loc[:, 'originY'] = np.nan
    plans.iloc[legInds, plans.columns.get_loc('originY')] = plans['y'].iloc[legInds - 1].copy()
    plans.loc[:, 'destinationX'] = np.nan
    plans.iloc[legInds, plans.columns.get_loc('destinationX')] = plans['x'].iloc[legInds + 1].copy()
    plans.loc[:, 'destinationY'] = np.nan
    plans.iloc[legInds, plans.columns.get_loc('destinationY')] = plans['y'].iloc[legInds + 1].copy()
    return plans

In [7]:
def ridersToList(val):
    if str(val) == 'nan':
        return []
    else:
        return str(val).split(':')


In [8]:
def fixPathTraversals(PTs):
    PTs['duration'] = PTs['arrivalTime'] - PTs['departureTime']
    PTs['mode_extended'] = PTs['mode']
    PTs['isRH'] = PTs['vehicle'].str.contains('rideHail')
    PTs['isCAV'] = PTs['vehicleType'].str.contains('L5')
    PTs.loc[PTs['isRH'], 'mode_extended'] += '_RideHail'
    PTs.loc[PTs['isCAV'], 'mode_extended'] += '_CAV'
    PTs['occupancy'] = PTs['numPassengers']
    PTs.loc[PTs['mode_extended'] == 'car', 'occupancy'] += 1
    PTs.loc[PTs['mode_extended'] == 'walk', 'occupancy'] = 1
    PTs.loc[PTs['mode_extended'] == 'bike', 'occupancy'] = 1
    PTs['vehicleMiles'] = PTs['length'] / 1609.34
    PTs['passengerMiles'] = (PTs['length'] * PTs['occupancy']) / 1609.34
    PTs['totalEnergyInJoules'] = PTs['primaryFuel'] + PTs['secondaryFuel']
    PTs['gallonsGasoline'] = 0
    PTs.loc[PTs['primaryFuelType'] == 'gasoline',
            'gallonsGasoline'] += PTs.loc[PTs['primaryFuelType'] == 'gasoline', 'primaryFuel'] * 8.3141841e-9
    PTs.loc[PTs['secondaryFuelType'] == 'gasoline',
            'gallonsGasoline'] += PTs.loc[PTs['secondaryFuelType'] == 'gasoline', 'secondaryFuel'] * 8.3141841e-9
    PTs.drop(columns=['numPassengers', 'length'], inplace=True)
    return PTs


In [9]:
if __name__ == '__main__':
    # directory = 'https://beam-outputs.s3.amazonaws.com/pilates-outputs/15thSep2019/c_ht/beam/sfbay-smart-c-ht' \
    #             '-pilates__2019-09-13_18-00-40/ITERS/it.15/15.'
    #Create final putputs for folders that contains the iteration number 5
    runName = 'sfbay-cp_pilates_test'
    outputnames = ['year-2018-final','year-2019-final','year-2020-final']
    
    conn = boto.s3.connect_to_region('us-east-2')
    bucket = conn.get_bucket('beam-outputs')
    folders = bucket.list("pilates-outputs/" + runName + "/beam/", "/")
    allBeamOutputs = pd.DataFrame([folder.name for folder in folders], columns = ['name'])
    allBeamOutputs=allBeamOutputs[allBeamOutputs['name'].str.contains('iteration-5')]
    print('allBeamOutputs',allBeamOutputs)
    for beamoutput, outputname in zip(allBeamOutputs['name'],outputnames):
        print('analyzing',beamoutput)
        inDirectory = beamoutput
        popDirectory = inDirectory.replace("/beam/", "/activitysim/")
        outDirectory = bucket.list("s3://beam-outputs/pilates-outputs/" + runName + "/beam/" + outputname)
    #     inDirectory = 'https://beam-outputs.s3.amazonaws.com/pilates-outputs/sfbay-cp_pilates_test/beam/year-2018-iteration-5/'
    #     popDirectory = inDirectory.replace("/beam/", "/activitysim/") 
    #     #outDirectory = '/Users/cpoliziani/Downloads'
    #     outDirectory = 'https://beam-outputs.s3.amazonaws.com/pilates-outputs/sfbay-cp_pilates_test/beam/year-2018-final/'

        collectAllData(inDirectory, outDirectory, popDirectory)

        print('done')

allBeamOutputs                                                  name
10  pilates-outputs/sfbay-cp_pilates_test/beam/yea...
16  pilates-outputs/sfbay-cp_pilates_test/beam/yea...
22  pilates-outputs/sfbay-cp_pilates_test/beam/yea...
analyzing pilates-outputs/sfbay-cp_pilates_test/beam/year-2018-iteration-5/
pilates-outputs/sfbay-cp_pilates_test/activitysim/year-2018-iteration-5/plans.csv.gz
Reading  pilates-outputs/sfbay-cp_pilates_test/beam/year-2018-iteration-5/ITERS/it.0/0.events.csv.gz


  for chunk in pd.read_csv("s3://beam-outputs/" + fullPath, chunksize=1500000):
  for chunk in pd.read_csv("s3://beam-outputs/" + fullPath, chunksize=1500000):
  for chunk in pd.read_csv("s3://beam-outputs/" + fullPath, chunksize=1500000):
  for chunk in pd.read_csv("s3://beam-outputs/" + fullPath, chunksize=1500000):
  for chunk in pd.read_csv("s3://beam-outputs/" + fullPath, chunksize=1500000):
  for chunk in pd.read_csv("s3://beam-outputs/" + fullPath, chunksize=1500000):
  for chunk in pd.read_csv("s3://beam-outputs/" + fullPath, chunksize=1500000):
  for chunk in pd.read_csv("s3://beam-outputs/" + fullPath, chunksize=1500000):
  for chunk in pd.read_csv("s3://beam-outputs/" + fullPath, chunksize=1500000):


FSTimeoutError: 