In [2]:
import requests
import pandas as pd
from google.transit import gtfs_realtime_pb2
import time
import logging
from typing import List, Dict, Optional
import threading
from dataclasses import dataclass
import http.client


In [3]:
@dataclass
class VehiclePosition:
    timestamp: int
    vehicle_id: str
    trip_id: str
    route_id: str
    latitude: float
    longitude: float
    speed: Optional[float] = None
    bearing: Optional[float] = None

class RealtimeArchiver:
    def __init__(self, api_key: str, archive_frequency: int = 30):
        self.api_key = api_key
        self.archive_frequency = archive_frequency
        self.logger = logging.getLogger(__name__)
        self.is_running = False
        self.archive_thread = None
        self.archive_df = pd.DataFrame(columns=[
            'timestamp', 'vehicle_id', 'trip_id', 'route_id',
            'latitude', 'longitude', 'speed', 'bearing'
        ])

    def fetch_realtime_data(self, max_retries=3, backoff_factor=2) -> List[VehiclePosition]:
        url = f"https://otd.delhi.gov.in/api/realtime/VehiclePositions.pb"
        params = {'key': self.api_key}
        attempt = 0
        while attempt < max_retries:
            try:
                response = requests.get(url, params=params, timeout=20)
                response.raise_for_status()
                feed = gtfs_realtime_pb2.FeedMessage()
                feed.ParseFromString(response.content)
                positions = []
                data = []
                for entity in feed.entity:
                    if entity.HasField('vehicle'):
                        vehicle = entity.vehicle
                        if vehicle.HasField('position'):
                            pos_bearing = getattr(vehicle.position, 'bearing', None)
                            pos_speed = getattr(vehicle.position, 'speed', None)
                            vehicle_id = getattr(vehicle.vehicle, 'id', '') if vehicle.HasField('vehicle') else ''
                            trip_id = getattr(vehicle.trip, 'trip_id', '') if vehicle.HasField('trip') else ''
                            route_id = getattr(vehicle.trip, 'route_id', '') if vehicle.HasField('trip') else ''
                            position = VehiclePosition(
                                timestamp=feed.header.timestamp,
                                vehicle_id=vehicle_id,
                                trip_id=trip_id,
                                route_id=route_id,
                                latitude=vehicle.position.latitude,
                                longitude=vehicle.position.longitude,
                                speed=pos_speed,
                                bearing=pos_bearing
                            )
                            positions.append(position)
                            data.append({
                                'timestamp': feed.header.timestamp,
                                'vehicle_id': vehicle_id,
                                'trip_id': trip_id,
                                'route_id': route_id,
                                'latitude': vehicle.position.latitude,
                                'longitude': vehicle.position.longitude,
                                'speed': pos_speed,
                                'bearing': pos_bearing
                            })
                self._latest_df = pd.DataFrame(data)
                self.logger.info(f"Fetched {len(positions)} vehicle positions")
                return positions
            except (requests.exceptions.RequestException, http.client.IncompleteRead) as e:
                self.logger.error(f"Network error fetching real-time data (attempt {attempt+1}): {e}")
                self._latest_df = pd.DataFrame()
            except Exception as e:
                self.logger.error(f"Error parsing real-time data (attempt {attempt+1}): {e}")
                self._latest_df = pd.DataFrame()
            attempt += 1
            if attempt < max_retries:
                time.sleep(backoff_factor ** attempt)
        self.logger.error("All attempts to fetch real-time data failed.")
        self._latest_df = pd.DataFrame()
        return []

    def get_vehicle_positions_df(self) -> pd.DataFrame:
        return self.archive_df.copy()

    def save_positions(self, positions: List[VehiclePosition]) -> bool:
        if not positions:
            return True
        try:
            data = [{
                'timestamp': pos.timestamp,
                'vehicle_id': pos.vehicle_id,
                'trip_id': pos.trip_id,
                'route_id': pos.route_id,
                'latitude': pos.latitude,
                'longitude': pos.longitude,
                'speed': pos.speed,
                'bearing': pos.bearing
            } for pos in positions]
            new_df = pd.DataFrame(data)
            self.archive_df = pd.concat([self.archive_df, new_df], ignore_index=True)
            self.logger.info(f"Appended {len(positions)} positions to DataFrame")
            return True
        except Exception as e:
            self.logger.error(f"Failed to save positions: {e}")
            return False

    def calculate_headways(self, route_id: str, stop_id: str, time_window: int = 3600) -> List[float]:
        try:
            end_time = int(time.time())
            start_time = end_time - time_window
            df = self.archive_df
            mask = (
                (df['route_id'] == route_id) &
                (df['timestamp'] >= start_time) &
                (df['timestamp'] <= end_time)
            )
            sub_df = df[mask]
            if sub_df.empty:
                return []
            headways = []
            vehicles = sub_df['vehicle_id'].unique()
            for vehicle in vehicles:
                vehicle_data = sub_df[sub_df['vehicle_id'] == vehicle].sort_values('timestamp')
                if len(vehicle_data) > 1:
                    diffs = vehicle_data['timestamp'].diff().dropna()
                    headways.extend(diffs.tolist())
            return headways
        except Exception as e:
            self.logger.error(f"Failed to calculate headways: {e}")
            return []

    def get_route_performance(self, route_id: str, hours: int = 24) -> Dict:
        try:
            end_time = int(time.time())
            start_time = end_time - (hours * 3600)
            df = self.archive_df
            mask = (
                (df['route_id'] == route_id) &
                (df['timestamp'] >= start_time) &
                (df['timestamp'] <= end_time)
            )
            sub_df = df[mask]
            if sub_df.empty:
                return {}
            return {
                'position_count': len(sub_df),
                'vehicle_count': sub_df['vehicle_id'].nunique(),
                'first_record': int(sub_df['timestamp'].min()),
                'last_record': int(sub_df['timestamp'].max())
            }
        except Exception as e:
            self.logger.error(f"Failed to get route performance: {e}")
            return {}

    def start_archiving(self):
        if self.is_running:
            self.logger.warning("Archiving already running")
            return
        self.is_running = True
        self.archive_thread = threading.Thread(target=self._archive_loop)
        self.archive_thread.daemon = True
        self.archive_thread.start()
        self.logger.info("Archiving started")

    def stop_archiving(self):
        self.is_running = False
        if self.archive_thread:
            self.archive_thread.join(timeout=5)
        self.logger.info("Archiving stopped")

    def _archive_loop(self):
        while self.is_running:
            try:
                positions = self.fetch_realtime_data()
                if positions:
                    self.save_positions(positions)
                time.sleep(self.archive_frequency)
            except Exception as e:
                self.logger.error(f"Error in archive loop: {e}")
                time.sleep(self.archive_frequency)


In [4]:
if __name__ == "__main__":
    api_key = "L5jVvGl6iLEdSqnZG42pEm5LD94t1PYF"
    archiver = RealtimeArchiver(api_key=api_key, archive_frequency=120)
    n = 10
    for i in range(n):
        print(f"Iteration {i+1}: Fetching real-time vehicle positions...")
        positions = archiver.fetch_realtime_data()
        if positions:
            print(f"Fetched {len(positions)} vehicle positions.")
            archiver.save_positions(positions)
            print("Positions appended to DataFrame.")
        else:
            print("No vehicle positions fetched or an error occurred.")
        if i < n - 1:
            print(f"Waiting 120 seconds before next fetch...")
            time.sleep(120)
    print("Completed all iterations.")

    archiver.archive_df.to_csv("vehicle_positions_archive.csv", index=False)

Iteration 1: Fetching real-time vehicle positions...
Fetched 2034 vehicle positions.
Positions appended to DataFrame.
Waiting 120 seconds before next fetch...


  self.archive_df = pd.concat([self.archive_df, new_df], ignore_index=True)


Iteration 2: Fetching real-time vehicle positions...
Fetched 2033 vehicle positions.
Positions appended to DataFrame.
Waiting 120 seconds before next fetch...
Iteration 3: Fetching real-time vehicle positions...
Fetched 2045 vehicle positions.
Positions appended to DataFrame.
Waiting 120 seconds before next fetch...
Iteration 4: Fetching real-time vehicle positions...
Fetched 2044 vehicle positions.
Positions appended to DataFrame.
Waiting 120 seconds before next fetch...
Iteration 5: Fetching real-time vehicle positions...
Fetched 2051 vehicle positions.
Positions appended to DataFrame.
Waiting 120 seconds before next fetch...
Iteration 6: Fetching real-time vehicle positions...
Fetched 2055 vehicle positions.
Positions appended to DataFrame.
Waiting 120 seconds before next fetch...
Iteration 7: Fetching real-time vehicle positions...
Fetched 2053 vehicle positions.
Positions appended to DataFrame.
Waiting 120 seconds before next fetch...
Iteration 8: Fetching real-time vehicle positi