In [None]:
import functions_framework
import pandas as pd
import sqlalchemy
import requests
from pytz import timezone
import pysqlite3
import sys
sys.modules["sqlite3"] = pysqlite3
from datetime import datetime, timedelta
import keys

@functions_framework.http
def get_flights_data(request):
    """
    Main pipeline function to retrieve flights for stored airports
    and send the data to a SQL database.

    Workflow:
        1. Create a MySQL connection string.
        2. Fetch the list of airports (with icao) from the database.
        3. Retrieve scheduled flights from the AeroDataBox.
        4. Store the retrieved flights into the database.

    Returns:
        str: Confirmation message when data has been successfully updated.
    """
    connection_string = create_connection_string()
    airport_df = fetch_airport_data(connection_string)
    flights_df = fetch_flights_data(airport_df['icao_code'])
    store_flights_data(flights_df, connection_string)
    return "Data has been updated"

def create_connection_string():
    """
    Builds a SQLAlchemy-style connection string for a MySQL database.

    Environment:
        Expects a file named 'keys.env' containing:
            MYSQL_KEY=<your_mysql_password>

    Returns:
        str: A connection string of the form
             'mysql+pymysql://user:password@host:port/schema'
    """
    schema = "data_pipeline_example"
    host = "130.211.82.25"
    user = "root"
    password = keys.WBS_MYSQL_DB
    port = 3306
    return f'mysql+pymysql://{user}:{password}@{host}:{port}/{schema}'


def fetch_airport_data(connection_string):
    """
    Fetches a list of airports from the database, including their icaos.

    Args:
        connection_string (str): Database connection string.

    Returns:
        pandas.DataFrame: A DataFrame with airport data.
                          Expected columns: ['city_id', 'airport_name', 'icao']
    """
    return pd.read_sql("cities_airports", con=connection_string)


def fetch_flights_data(icaos):
    """
    Retrieves scheduled flights for airport icoa from the AeroDataBox API.

    Args:
        icaos (list): list containing icao as string.

    Environment:
        Expects a file named 'keys.env' containing:
            AERODATABOX_KEY=<your_openweather_api_key>

    Returns:
        pandas.DataFrame: DataFrame with weather forecasts, containing:
            - arrival_airport_icao            
            - departure_airport_icao
            - scheduled_arrival_time
            - flight_number
            - timestamp_flight: retrieval time
    """
    
    API_KEY = keys.AERODATABOX_KEY

    berlin_timezone = timezone('Europe/Berlin')
    schedule_time = (datetime.now(berlin_timezone) + timedelta(1)).strftime('%Y-%m-%d')
    times = {'from': ["T00:00", "T12:00"],
                "to": ["T11:59", "T23:59"]}

    querystring = {
        "withLeg":"false",
        "direction":"Arrival",
        "withCancelled":"false",
        "withCodeshared":"false",
        "withCargo":"false",
        "withPrivate":"false",
        "withLocation":"false"}
    headers = {
        "X-RapidAPI-Key": API_KEY,
        "x-rapidapi-host": "aerodatabox.p.rapidapi.com"
        }

    flight_items = []
    for icao in icaos:
        for time in range(2):
            url = f"""
            https://aerodatabox.p.rapidapi.com/flights/airports/icao/{icao}/
            {schedule_time + times ["from"][time]}/{schedule_time + times["to"][time]}
            """
            # print(url)

            response = requests.get(url, headers=headers, params=querystring)
            # print(response.status_code)
            if response.status_code != 200:
                break
            
            flight_data = response.json()
            retrieval_time = datetime.now(berlin_timezone).strftime("%Y-%m-%d %H:%M:%S")

            for item in flight_data["arrivals"]:
                flight_item = {
                    "arrival_airport_icao": icao,            
                    "departure_airport_icao": item['movement']['airport'].get('icao'),
                    "scheduled_arrival_time": item['movement']['scheduledTime'].get('local'),
                    "flight_number": item.get('number'),
                    "timestamp_flight":retrieval_time
                }
                flight_items.append(flight_item)

    flight_df = pd.DataFrame(flight_items)

    # Ensure correct datetime types
    flight_df["scheduled_arrival_time"] = flight_df["scheduled_arrival_time"].str[:-6]#replace("Z", ":00")#for using UTC
    flight_df["scheduled_arrival_time"] = pd.to_datetime(flight_df["scheduled_arrival_time"])
    flight_df["timestamp_flight"] = pd.to_datetime(flight_df["timestamp_flight"])

    return flight_df

def store_flights_data(flights_df, connection_string):
    """
    Stores the retrieved flights data into the database.

    Args:
        flights_df (pandas.DataFrame): DataFrame containing flights.
        connection_string (str): Database connection string.

    Behavior:
        Appends data to the 'flights' table.
    """
    flights_df.to_sql(
        'flights',
        if_exists='append',
        con=create_connection_string(),
        index=False
)


# requirements.txt
functions-framework==3.*
pandas
requests
pytz
sqlalchemy
pymysql
pysqlite3-binary