### Data Loader

In [5]:
import requests
from google.transit import gtfs_realtime_pb2
from google.protobuf.json_format import MessageToDict

In [6]:
def get_gtfs_rt_data(*args, **kwargs) -> gtfs_realtime_pb2.FeedMessage:
    """
    Fetches GTFS-RT data from the given URL.
    Args:
        gtfs_rt_url (str): The URL to fetch the GTFS-RT data from.
        verbose (bool, optional): Whether to print verbose output. Defaults to True.
    Returns:
        gtfs_realtime_pb2.FeedMessage: The parsed GTFS-RT data.
    Raises:
        ValueError: If an error occurs while fetching the GTFS-RT data.
    """


    def fetch_data(url:str) -> requests.Response:
        """
        Fetches GTFS-RT data from the given URL.
        Args:
            url (str): The URL to fetch the data from.
        Returns:
            requests.Response: The response object containing the fetched data.
        Raises:
            requests.exceptions.RequestException: If an error occurs while fetching the data.
        """
        try:
            response = requests.get(url)
            response.raise_for_status()
            return response
        
        except requests.exceptions.RequestException as e:
            return None


    # Args
    gtfs_rt_url:str="https://proxy.transport.data.gouv.fr/resource/sncf-tgv-gtfs-rt-trip-updates"

    # Get response
    response = fetch_data(gtfs_rt_url)

    # Parse GTFS RT Datas
    feed = gtfs_realtime_pb2.FeedMessage()
    feed.ParseFromString(response.content)
    
    # Convert FeedMessage to dictionary
    feed_dict = MessageToDict(feed)
    
    
    return feed_dict


In [17]:
feed_dict = get_gtfs_rt_data(gtfs_rt_url="https://proxy.transport.data.gouv.fr/resource/sncf-tgv-gtfs-rt-trip-updates")

### Transformer

In [18]:
def dispatch_feed(feed_dict:dict, *args, **kwargs) -> dict:
    """

    """
    # Remove Header key
    del feed_dict['header']
    # Remove entity key and keep the list value as dictionary
    feed_dict = feed_dict['entity']

    return feed_dict

In [19]:
feed_dict = dispatch_feed(feed_dict)
feed_dict

[{'id': 'OCESN5521F3342716:2024-08-14T00:27:56Z',
  'tripUpdate': {'trip': {'tripId': 'OCESN5521F3342716:2024-08-14T00:27:56Z',
    'startTime': '05:46:00',
    'startDate': '20240819'},
   'stopTimeUpdate': [{'departure': {'delay': 0, 'time': '1724039160'},
     'stopId': 'StopPoint:OCETGV INOUI-87192039'},
    {'arrival': {'delay': 0, 'time': '1724042400'},
     'departure': {'delay': 4200, 'time': '1724047020'},
     'stopId': 'StopPoint:OCETGV INOUI-87212027'},
    {'arrival': {'delay': 5400, 'time': '1724049780'},
     'departure': {'delay': 5400, 'time': '1724049960'},
     'stopId': 'StopPoint:OCETGV INOUI-87182014'},
    {'arrival': {'delay': 5400, 'time': '1724051880'},
     'departure': {'delay': 5400, 'time': '1724052120'},
     'stopId': 'StopPoint:OCETGV INOUI-87182063'},
    {'arrival': {'delay': 4800, 'time': '1724052840'},
     'departure': {'delay': 4800, 'time': '1724053020'},
     'stopId': 'StopPoint:OCETGV INOUI-87300822'},
    {'arrival': {'delay': 4800, 'time': '

### Data Exporter

In [None]:
def export_data_to_postgres(df: DataFrame, **kwargs) -> None:
    """

    """
    schema_name = 'your_schema_name'  # Specify the name of the schema to export data to
    table_name = 'your_table_name'  # Specify the name of the table to export data to
    config_path = path.join(get_repo_path(), 'io_config.yaml')
    config_profile = 'default'

    with Postgres.with_config(ConfigFileLoader(config_path, config_profile)) as loader:
        loader.export(
            df,
            schema_name,
            table_name,
            index=False,  # Specifies whether to include index in exported table
            if_exists='replace',  # Specify resolution policy if table name already exists
        )
