In [1]:
import boto3
import pandas as pd
import io

In [2]:
# Read single parquet file from S3
def pd_read_s3_parquet(key, bucket, s3_client=None, **args):
    if s3_client is None:
        s3_client = boto3.client('s3')
    obj = s3_client.get_object(Bucket=bucket, Key=key)
    return pd.read_parquet(io.BytesIO(obj['Body'].read()), **args)


In [3]:
# Read multiple parquets from a folder on S3 generated by spark
def pd_read_s3_multiple_parquets(
        filepath, 
        bucket, 
        s3=None, 
        s3_client=None, 
        verbose=False,
        **args
        ):
#     if not filepath.endswith('/'):
#         filepath = filepath + '/'  # Add '/' to the end
#         print(filepath)
    if s3_client is None:
        session = boto3.Session()
        s3_client = session.client('s3')
    if s3 is None:
        s3 = boto3.resource('s3')
    
    s3_keys = [item.key for item in s3.Bucket(bucket).objects.filter(Prefix=filepath)
            if item.key.endswith('.parquet')]
    print(s3_keys)
    if not s3_keys:
        print('No parquet found in', bucket, filepath)
    elif verbose:
        print('Load parquets:')
        for p in s3_keys: 
            print(p)
    dfs = [pd_read_s3_parquet(key, bucket=bucket, s3_client=s3_client, **args) 
        for key in s3_keys]
    return s3_keys, pd.concat(dfs, ignore_index=True)


In [4]:
# get config from the passed event
region = "us-east-1"
bucket_name = "bus-observatory-dev-concatenate-and-clean"
system_id = "mbta_all"
feed_config = {
        "publish": "True",
        "system_name": "Massachusetts Bay Transit Authority",
        "city_name": "Boston, MA, US",
        "feed_type": "gtfsrt",
        "url": "https://cdn.mbta.com/realtime/VehiclePositions.pb",
        "header": "False",
        "route_key": "vehicle.trip.route_id",
        "timestamp_key": "vehicle.timestamp",
        "tz": "America/New_York",
        "notes": "Sampled once per minute, inlcudes buses and trolleys. We parse all fields in this feed."
    }

prefix=f"feeds/{system_id}/INCOMING_"

# read and concat all files into a single dataframe
s3_keys, combined_df = pd_read_s3_multiple_parquets(prefix, bucket_name)

['feeds/mbta_all/INCOMING_mbta_all_2023-02-26_12_57_22.parquet', 'feeds/mbta_all/INCOMING_mbta_all_2023-02-26_12_58_22.parquet', 'feeds/mbta_all/INCOMING_mbta_all_2023-02-26_12_59_25.parquet', 'feeds/mbta_all/INCOMING_mbta_all_2023-02-26_18_00_23.parquet', 'feeds/mbta_all/INCOMING_mbta_all_2023-02-26_18_01_24.parquet']


In [5]:
combined_df

Unnamed: 0,id,vehicle.trip.trip_id,vehicle.trip.start_time,vehicle.trip.start_date,vehicle.trip.schedule_relationship,vehicle.trip.route_id,vehicle.trip.direction_id,vehicle.position.latitude,vehicle.position.longitude,vehicle.position.bearing,vehicle.current_stop_sequence,vehicle.current_status,vehicle.timestamp,vehicle.stop_id,vehicle.vehicle.id,vehicle.vehicle.label,vehicle.occupancy_status,vehicle.occupancy_percentage,vehicle.position.speed
0,y1777,54416554,12:40:00,20230226,0,66,0.0,42.342690,-71.122002,302.0,18.0,2,2023-02-26 17:57:14+00:00,1373,y1777,1777,1.0,20.0,
1,y1958,54257641,12:17:00,20230226,0,77,0.0,42.417366,-71.163559,293.0,26.0,2,2023-02-26 17:57:13+00:00,2286,y1958,1958,1.0,20.0,
2,y1306,54711787,12:50:00,20230226,0,743,0.0,42.347309,-71.043419,249.0,5.0,2,2023-02-26 17:57:12+00:00,7096,y1306,1306,,,
3,y3130,54933912,12:40:00,20230226,0,111,1.0,42.391663,-71.038536,284.0,21.0,2,2023-02-26 17:57:13+00:00,5607,y3130,3130,,,
4,y0864,54952298,12:45:00,20230226,0,117,1.0,42.404881,-71.016434,137.0,10.0,2,2023-02-26 17:57:12+00:00,5715,y0864,0864,1.0,20.0,
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
1185,y2014,54257867,13:11:00,20230226,0,83,1.0,42.384792,-71.112282,305.0,1.0,2,2023-02-26 18:01:15+00:00,2425,y2014,2014,,,
1186,O-54754922,55328064-HayOL,12:36:00,20230226,0,Orange,0.0,42.345440,-71.078651,220.0,130.0,2,2023-02-26 18:01:14+00:00,70012,O-54754922,1418,,,
1187,y1421,54952300,12:58:00,20230226,0,116,1.0,42.418697,-70.992554,264.0,2.0,2,2023-02-26 18:01:16+00:00,5700,y1421,1421,1.0,0.0,
1188,G-10038,54544120-HayGLHayGLHayGLHayGL,12:39:00,20230226,0,Green-B,1.0,42.350700,-71.127098,45.0,100.0,0,2023-02-26 18:01:19+00:00,70134,G-10038,3800,,,10.4


In [6]:
# delete all read files in file list
s3_client = boto3.client('s3')
response = s3_client.delete_objects(
    Bucket=bucket_name,
    Delete={
        'Objects': [{'Key': key} for key in s3_keys]
    }
)


In [7]:
s3_keys

['feeds/mbta_all/INCOMING_mbta_all_2023-02-26_12_57_22.parquet',
 'feeds/mbta_all/INCOMING_mbta_all_2023-02-26_12_58_22.parquet',
 'feeds/mbta_all/INCOMING_mbta_all_2023-02-26_12_59_25.parquet',
 'feeds/mbta_all/INCOMING_mbta_all_2023-02-26_18_00_23.parquet',
 'feeds/mbta_all/INCOMING_mbta_all_2023-02-26_18_01_24.parquet']