# Daily Feature Pipeline
* Retrieve todays data for flights and google trends
* Add these new data to the Feature Store 

## OpenSky Recent Data 
* Use the OpenSky api to retrieve the most recent flight landing data, to update our feature group

In [1]:
import pandas as pd 
import os
import datetime
import requests 
import hopsworks

In [2]:
project = hopsworks.login()
fs = project.get_feature_store() 
secrets = hopsworks.get_secrets_api()

2026-01-09 13:40:09,844 INFO: Initializing external client
2026-01-09 13:40:09,844 INFO: Base URL: https://c.app.hopsworks.ai:443
2026-01-09 13:40:11,451 INFO: Python Engine initialized.

Logged in to project, explore it here https://c.app.hopsworks.ai:443/p/1286325


In [5]:
CLIENT_ID = secrets.get_secret("OPENSKY_CLIENT_ID").value
CLIENT_SECRET = secrets.get_secret("OPENSKY_CLIENT_SECRET").value
ICAO = "ESSA"

def get_access_token():
    auth_url = "https://auth.opensky-network.org/auth/realms/opensky-network/protocol/openid-connect/token"
    data = {
        "grant_type": "client_credentials",
        "client_id": CLIENT_ID,
        "client_secret": CLIENT_SECRET
    }
    response = requests.post(auth_url, data=data)
    response.raise_for_status()
    return response.json().get("access_token")

def fetch_yesterday_data(token):
    # Calculate yesterday's window
    yesterday = datetime.date.today() - datetime.timedelta(days=1)
    start_ts = int(datetime.datetime.combine(yesterday, datetime.time.min).timestamp())
    end_ts = int(datetime.datetime.combine(yesterday, datetime.time.max).timestamp())

    api_url = f"https://opensky-network.org/api/flights/arrival?airport={ICAO}&begin={start_ts}&end={end_ts}"
    headers = {"Authorization": f"Bearer {token}"}
    
    response = requests.get(api_url, headers=headers)
    if response.status_code == 200:
        flights = response.json()
        return yesterday, len(flights)
    else:
        print(f"API Error: {response.status_code}")
        return yesterday, 0

token = get_access_token()
flight_date, total_landings = fetch_yesterday_data(token)

print(f"Arlanda had {total_landings} landings on date: {flight_date}")

CLIENT_ID: 'astegaras-api-client'
CLIENT_SECRET length: 33


HTTPError: 401 Client Error: Unauthorized for url: https://auth.opensky-network.org/auth/realms/opensky-network/protocol/openid-connect/token

## Uploading new data to the Feature Store 

**New Flight Data**

In [4]:
new_flight_data = pd.DataFrame({
    "date": [flight_date],
    "total_landings": [total_landings],
})

In [5]:
# Retrieve feature group
flight_data_fg = fs.get_feature_group(
    name='flight_data_arlanda',
    version=1,
)

In [6]:
# insert new data
flight_data_fg.insert(new_flight_data, wait = True)

Uploading Dataframe: 100.00% |█| Rows 1/1 | Elapsed Time: 00:00 | Remaining Time


Launching job: flight_data_arlanda_1_offline_fg_materialization
Job started successfully, you can follow the progress at 
https://c.app.hopsworks.ai:443/p/1296539/jobs/named/flight_data_arlanda_1_offline_fg_materialization/executions
2026-01-02 15:32:41,900 INFO: Waiting for execution to finish. Current state: INITIALIZING. Final status: UNDEFINED
2026-01-02 15:32:45,135 INFO: Waiting for execution to finish. Current state: RUNNING. Final status: UNDEFINED
2026-01-02 15:34:22,240 INFO: Waiting for execution to finish. Current state: AGGREGATING_LOGS. Final status: SUCCEEDED
2026-01-02 15:34:22,444 INFO: Waiting for log aggregation to finish.
2026-01-02 15:34:31,192 INFO: Execution finished successfully.


(Job('flight_data_arlanda_1_offline_fg_materialization', 'SPARK'), None)

## Google Trends Recent Data
* Download the most recent google search data for the search terms

In [7]:
# Google Trends
from pytrends.request import TrendReq

In [8]:
# Search terms used as predictors
KEYWORDS = [
    "vikings",
    "fika",
    "stockholm",
    "ikea",
    "abba"
]

# Country code for Sweden
COUNTRY = "SE"

# Get Yesterdays Date
today = datetime.date.today()
yesterday = today - datetime.timedelta(days=1)

In [9]:
def fetch_google_trends(keywords, start_date, end_date):
    """
    Fetch Google Trends data for given keywords and date range.
    Returns time-series data indexed by date.
    The resolution (daily / weekly / monthly) is decided by Google Trends
    based on the length of the date range.
    """
    pytrends = TrendReq(hl="en-US", tz=360)

    timeframe = f"{start_date} {end_date}"

    pytrends.build_payload(
        kw_list=keywords,
        timeframe=timeframe,
        geo=COUNTRY
    )

    df = pytrends.interest_over_time()
    return df

In [10]:
raw_data = fetch_google_trends(KEYWORDS, yesterday, yesterday)
raw_data = raw_data.drop(columns=["isPartial"])

yeserday_data = (
    raw_data
    .resample("D")
    .ffill()
    .reset_index()
)

yeserday_data["date"] = pd.to_datetime(yeserday_data["date"]).dt.date
yeserday_data


Unnamed: 0,date,vikings,fika,stockholm,ikea,abba
0,2026-01-01,1,2,100,56,2


In [14]:
# Retrieve feature group
google_data_fg = fs.get_feature_group(
    name='google_trends_daily',
    version=1,
)

In [15]:
google_data_fg.insert(
    yeserday_data,
    write_options={"wait_for_job": True}
)

Uploading Dataframe: 100.00% |█| Rows 1/1 | Elapsed Time: 00:00 | Remaining Time


Launching job: google_trends_daily_2_offline_fg_materialization
Job started successfully, you can follow the progress at 
https://c.app.hopsworks.ai:443/p/1296539/jobs/named/google_trends_daily_2_offline_fg_materialization/executions
2026-01-02 15:38:18,156 INFO: Waiting for execution to finish. Current state: INITIALIZING. Final status: UNDEFINED
2026-01-02 15:38:21,391 INFO: Waiting for execution to finish. Current state: RUNNING. Final status: UNDEFINED
2026-01-02 15:39:52,993 INFO: Waiting for execution to finish. Current state: AGGREGATING_LOGS. Final status: SUCCEEDED
2026-01-02 15:39:53,201 INFO: Waiting for log aggregation to finish.
2026-01-02 15:40:01,959 INFO: Execution finished successfully.


(Job('google_trends_daily_2_offline_fg_materialization', 'SPARK'), None)