In [None]:
import os
import json
import gzip
import boto3
import pandas as pd
from datetime import datetime, timedelta
from collections import defaultdict
import time

# Define the output directory structure as before
notebook_directory = os.path.dirname(os.path.abspath("__file__"))
raw_output_directory = os.path.join(notebook_directory, 'temp', 'raw')

# Create the output directories if they don't exist
os.makedirs(raw_output_directory, exist_ok=True)

# Define the current date
today_date = datetime.now().strftime('%Y/%m/%d')

In [None]:
# Define a global variable that represents the lines of interest (will be GL in the future)
target_lines = ['Red', 'Orange']

# Define the required dictionary which specifies the relationship between stop_ids and stop names
"""stop_dict = {
    'Boston College': ['70106', '70107'],
    'Cleveland Circle': ['70237', '70238'],
    'Riverside': ['70160', '70161'],
    'Union Square': ['70503', '70504'],
    'Medford Tufts': ['70511', '70512'],
    'Reservoir': ['70174', '70175']
}"""

stop_dict = {
    'Ashmont' : ['70093','70094'],
    'Braintree' : ['70105'],
    'Forest Hills' : ['70001'],
    'Oak Grove' : ['70036']
}

In [None]:
# Initialize the S3 client
s3_client = boto3.client('s3')

# Specify the S3 bucket name
bucket_name = 'mbta-gtfs-s3'

In [None]:
def list_objects_with_prefix(prefix):
    paginator = s3_client.get_paginator('list_objects_v2')
    pages = paginator.paginate(Bucket=bucket_name, Prefix=prefix)
    objects = []
    for page in pages:
        objects.extend(page.get('Contents', []))
    return objects

In [None]:
from concurrent.futures import ThreadPoolExecutor

def download_file(bucket_name, file_key, local_file_path):
    if not os.path.exists(os.path.dirname(local_file_path)):
        os.makedirs(os.path.dirname(local_file_path), exist_ok=True)
    s3_client.download_file(Bucket=bucket_name, Key=file_key, Filename=local_file_path)
    print(f"Downloaded: {file_key} -> {local_file_path}")

def download_files(start_date_str, end_date_str=None):
    start_date = datetime.strptime(start_date_str, '%Y/%m/%d')
    end_date = datetime.strptime(end_date_str, '%Y/%m/%d') if end_date_str else datetime.now()

    file_types = ['realtime_TripUpdates_enhanced', 'realtime_VehiclePositions_enhanced']

    current_date = start_date
    while current_date <= end_date:
        prefix = current_date.strftime('%Y/%m/%d/')
        objects = list_objects_with_prefix(prefix)

        # Prepare a list to keep track of tasks
        tasks = []

        with ThreadPoolExecutor() as executor:
            for file_type in file_types:
                for obj in objects:
                    file_key = obj['Key']

                    if file_type in file_key:
                        file_name = os.path.basename(file_key)
                        target_dir = os.path.join('temp/raw', prefix)
                        local_file_path = os.path.join(target_dir, file_name)

                        if not os.path.exists(local_file_path):
                            # Schedule the download task
                            tasks.append(executor.submit(download_file, bucket_name, file_key, local_file_path))

            # Wait for all tasks to complete
            for task in tasks:
                task.result()  # This will re-raise any exceptions caught during the task execution

        current_date += timedelta(days=1)

In [None]:
def download_s3_folder(bucket_name, s3_folder, local_dir=None):
    """
    Download the contents of a folder directory
    Args:
        bucket_name: the name of the s3 bucket
        s3_folder: the folder path in the s3 bucket
        local_dir: a relative or absolute directory path in the local file system
    """
    bucket = s3_client.Bucket(bucket_name)
    for obj in bucket.objects.filter(Prefix=s3_folder):
        target = obj.key if local_dir is None \
            else os.path.join(local_dir, os.path.relpath(obj.key, s3_folder))
        if not os.path.exists(os.path.dirname(target)):
            os.makedirs(os.path.dirname(target))
        if obj.key[-1] == '/':
            continue
        bucket.download_file(obj.key, target)

#download_s3_folder(bucket_name, '/2024/02/15')

In [None]:
def parse_trip_updates_for_stops(raw_directory='temp/raw', parsed_directory='temp/parsed_TU'):
    # Initialize the dataframes dictionary to store DataFrames for each stop
    data = {stop: [] for stop in stop_dict}

    # Iterate through directories and process files
    for year in os.listdir(raw_directory):
        year_directory = os.path.join(raw_directory, year)
        if os.path.isdir(year_directory):
            for month in os.listdir(year_directory):
                month_directory = os.path.join(year_directory, month)
                if os.path.isdir(month_directory):
                    for day in os.listdir(month_directory):
                        day_directory = os.path.join(month_directory, day)
                        if os.path.isdir(day_directory):
                            for root, dirs, files in os.walk(day_directory):
                                for file_name in files:
                                    if file_name.endswith('.gz'):
                                        file_path = os.path.join(day_directory, file_name)
                                        with gzip.open(file_path, 'rb') as gz_file:
                                            json_content = gz_file.read()
                                            json_data = json.loads(json_content)
                                            
                                            if 'entity' in json_data:
                                                for entity in json_data['entity']:
                                                    if 'trip_update' in entity and 'stop_time_update' in entity['trip_update']:
                                                        stop_time_updates = entity['trip_update']['stop_time_update']
                                                        for stop_update in stop_time_updates:
                                                            if 'departure' in stop_update and 'stop_id' in stop_update:
                                                                stop_id = stop_update['stop_id']
                                                                for stop_name, ids in stop_dict.items():
                                                                    if stop_id in ids:
                                                                        departure_time = pd.to_datetime(stop_update['departure']['time'], unit='s')
                                                                        file_time = file_name.split('_')[0].split('T')[1].replace('Z', '').replace('/', ':')
                                                                        datetime_str = f"{year}-{month}-{day} {file_time}"
                                                                        datetime_obj = datetime.strptime(datetime_str, '%Y-%m-%d %H:%M:%S')
                                                                        current_time = datetime_obj - timedelta(hours=5)
                                                                        departure_time_adj = departure_time - timedelta(hours=5)

                                                                        new_row = {'id': entity['id'],
                                                                                   'stop_id': stop_id,
                                                                                   'file_timestamp': datetime_obj,
                                                                                   'current_time': current_time,
                                                                                   'departure_time': departure_time,
                                                                                   'departure_time_adj': departure_time_adj}
                                                                        data[stop_name].append(new_row)

    # Convert lists to DataFrames, sort by current_time, and save as CSV
    os.makedirs(parsed_directory, exist_ok=True)
    for stop_name, rows in data.items():
        df = pd.DataFrame(rows, columns=['id', 'stop_id', 'file_timestamp', 'current_time', 'departure_time', 'departure_time_adj'])
        # Sorting by current_time
        df = df.sort_values(by='current_time')
        csv_file_path = os.path.join(parsed_directory, f"{stop_name}.csv")
        df.to_csv(csv_file_path, index=False)
        print(f"Saved DataFrame for {stop_name}, sorted by current time, to {csv_file_path}")

In [None]:
def parse_files(raw_directory='temp/raw', parsed_TU_directory='temp/parsed_TU', parsed_VP_directory='temp/parsed_VP'):
    # Initialize the data dictionaries to store data
    data_TU = {stop: [] for stop in stop_dict}
    data_VP = {}
    
    # Iterate through directories and process files
    for year in os.listdir(raw_directory):
        year_directory = os.path.join(raw_directory, year)
        if os.path.isdir(year_directory):
            for month in os.listdir(year_directory):
                month_directory = os.path.join(year_directory, month)
                if os.path.isdir(month_directory):
                    for day in os.listdir(month_directory):
                        day_directory = os.path.join(month_directory, day)
                        if os.path.isdir(day_directory):
                            for root, dirs, files in os.walk(day_directory):
                                for file_name in files:
                                    if file_name.endswith('.gz'):
                                        file_path = os.path.join(day_directory, file_name)
                                        with gzip.open(file_path, 'rb') as gz_file:
                                            json_content = gz_file.read()
                                            json_data = json.loads(json_content)
                                            
                                            # Determine the type of file and process accordingly
                                            if 'TripUpdate' in file_name and 'entity' in json_data:
                                                for entity in json_data['entity']:
                                                    if 'trip_update' in entity and 'stop_time_update' in entity['trip_update']:
                                                        stop_time_updates = entity['trip_update']['stop_time_update']
                                                        for stop_update in stop_time_updates:
                                                            if 'departure' in stop_update and 'stop_id' in stop_update:
                                                                process_trip_update(entity, stop_update, file_name, year, month, day, data_TU)
                                            elif 'VehiclePosition' in file_name and 'entity' in json_data:
                                                process_vehicle_position_route_based(json_data, data_VP, target_lines)

    # Save TripUpdate data to CSV
    os.makedirs(parsed_TU_directory, exist_ok=True)
    for stop_name, rows in data_TU.items():
        save_trip_updates_as_csv(rows, stop_name, parsed_TU_directory)
        
    # Save VehiclePosition data to JSON
    os.makedirs(parsed_VP_directory, exist_ok=True)
    for stop_name, stop_data in data_VP.items():
        save_vehicle_positions_as_json(stop_data, stop_name, parsed_VP_directory)

def process_trip_update(entity, stop_update, file_name, year, month, day, data):
    stop_id = stop_update['stop_id']
    for stop_name, ids in stop_dict.items():
        if stop_id in ids:
            departure_time = pd.to_datetime(stop_update['departure']['time'], unit='s')
            file_time = file_name.split('_')[0].split('T')[1].replace('Z', '').replace('/', ':')
            datetime_str = f"{year}-{month}-{day} {file_time}"
            datetime_obj = datetime.strptime(datetime_str, '%Y-%m-%d %H:%M:%S')
            current_time = datetime_obj - timedelta(hours=5)
            departure_time_adj = departure_time - timedelta(hours=5)

            new_row = {'id': entity['id'],
                       'stop_id': stop_id,
                       'file_timestamp': datetime_obj,
                       'current_time': current_time,
                       'departure_time': departure_time,
                       'departure_time_adj': departure_time_adj}
            data[stop_name].append(new_row)

def process_vehicle_position_route_based(json_data, data, target_lines):
    for entity in json_data['entity']:
        if 'vehicle' in entity and 'trip' in entity['vehicle'] and entity['vehicle']['trip']['route_id'] in target_lines:
            route_id = entity['vehicle']['trip']['route_id']
            if route_id not in data:
                data[route_id] = []
            data[route_id].append(entity['vehicle'])

def save_trip_updates_as_csv(rows, stop_name, directory):
    if rows:  # Check if there are any rows to save
        df = pd.DataFrame(rows)
        df.sort_values(by='current_time', inplace=True)
        csv_file_path = os.path.join(directory, f"{stop_name}.csv")
        df.to_csv(csv_file_path, index=False)
        print(f"Saved DataFrame for {stop_name} to {csv_file_path}")
    else:
        print(f"No data to save for {stop_name}.")


def save_vehicle_positions_as_json(stop_data, stop_name, directory):
    if stop_data:  # Check if there is any data to save
        json_file_path = os.path.join(directory, f"{stop_name}.json")
        with open(json_file_path, 'w') as json_file:
            json.dump(stop_data, json_file, indent=4)
        print(f"Saved JSON data for {stop_name} to {json_file_path}")
    else:
        print(f"No data to save for {stop_name}.")

In [None]:
from haversine import haversine, Unit

# Ensure the output directory exists
output_directory = 'temp/parsed_stops'
if not os.path.exists(output_directory):
    os.makedirs(output_directory)

# Load locations.csv and pre-filter
def prefilter_locations(locations_csv, stop_dict):
    locations_df = pd.read_csv(locations_csv)
    unique_stops = [stop for stops in stop_dict.values() for stop in stops]
    pattern = '|'.join(unique_stops)
    return locations_df[locations_df['loc_name'].str.contains(pattern, regex=True)]

# Function to calculate distance between two points
def calculate_distance(lat1, lon1, lat2, lon2):
    return haversine((lat1, lon1), (lat2, lon2), Unit.METERS)

# Process Vehicle Position files
def process_vehicle_positions(filename, filtered_locations_df, dataframes):
    with open(filename, 'r') as file:
        vehicles_data = json.load(file)
        for vehicle in vehicles_data:
            vehicle_pos = vehicle['position']
            vehicle_lat = vehicle_pos['latitude']
            vehicle_lon = vehicle_pos['longitude']
            for _, loc in filtered_locations_df.iterrows():
                loc_lat = loc['latitude']
                loc_lon = loc['longitude']
                if calculate_distance(vehicle_lat, vehicle_lon, loc_lat, loc_lon) <= 3:  # 3 meters threshold
                    for stop in dataframes:
                        if stop in loc['loc_name']:
                            event_data = {
                                'timestamp': pd.to_datetime(vehicle['timestamp'], unit='s').tz_localize('UTC').tz_convert('America/New_York'),
                                'current_status': vehicle['current_status'],
                                'stop_id': vehicle['stop_id'],
                                'location': loc['loc_name'],
                                'trip_id': vehicle['trip']['trip_id'],
                                'bearing': vehicle_pos['bearing']
                            }
                            dataframes[stop] = pd.concat([dataframes[stop], pd.DataFrame([event_data])], ignore_index=True)
                            break

# Save DataFrames to CSV
def save_dataframes_to_csv(dataframes, directory):
    for stop, df in dataframes.items():
        if not df.empty:
            df.to_csv(f'{directory}/{stop}_events.csv', index=False)

# Main function to run the process
def main(parsed_VP_directory, locations_csv, stop_dict):
    filtered_locations_df = prefilter_locations(locations_csv, stop_dict)
    dataframes = {stop: pd.DataFrame() for stop in stop_dict}

    for filename in os.listdir(parsed_VP_directory):
        if filename.endswith('.json'):
            process_vehicle_positions(os.path.join(parsed_VP_directory, filename), filtered_locations_df, dataframes)

    save_dataframes_to_csv(dataframes, output_directory)

# Example usage
stop_dict = {
    # Define your stop_dict here
}
parsed_VP_directory = 'temp/parsed_VP'
locations_csv = 'locations.csv'

main(parsed_VP_directory, locations_csv, stop_dict)
