### --- Step 1: Setup and Imports ---

In [11]:
import pandas as pd
import requests
import time
import datetime
import hopsworks
import exclude.key

# Hopsworks
HOPSWORKS_API_KEY = exclude.key.HOPSWORKS_API_KEY
FEATURE_GROUP_NAME = "bars_near_london_bridge"
FEATURE_GROUP_VERSION = 1

# Besttime API
BESTTIME_API_KEY_PRIVATE = exclude.key.BESTTIME_API_KEY_PRIVATE
BESTTIME_API_KEY_PUBLIC = exclude.key.BESTTIME_API_KEY_PUBLIC
BAR_LOCATION = 'London Bridge, London'
NUMBER_OF_BARS = 50

# Connect to Hopsworks
project = hopsworks.login(api_key_value=HOPSWORKS_API_KEY)
fs = project.get_feature_store()

Connection closed.
Connected. Call `.close()` to terminate connection gracefully.

Logged in to project, explore it here https://c.app.hopsworks.ai:443/p/25749
Connected. Call `.close()` to terminate connection gracefully.


### --- Step 2: Create or Get Feature Group ---

In [12]:
def get_or_create_feature_group():
    try:
        fg = fs.get_feature_group(name=FEATURE_GROUP_NAME, version=FEATURE_GROUP_VERSION)
    except:
        fg = fs.create_feature_group(
            name=FEATURE_GROUP_NAME,
            version=FEATURE_GROUP_VERSION,
            description="Foot traffic data for bars near London Bridge",
            primary_key=['venue_name', 'day', 'hour'],
            event_time='last_updated',
            online_enabled=True
        )
    return fg


### --- Step 3: Perform Venue Search ---

In [13]:
def perform_venue_search(api_key=BESTTIME_API_KEY_PRIVATE):
    print("Performing venue search")
    endpoint = 'https://besttime.app/api/v1/venues/search'
    params = {
        'api_key_private': api_key,
        'q': f'bars near {BAR_LOCATION}',
        'num': NUMBER_OF_BARS,
        'format': 'all'  # Retrieve full forecast data
    }
    response = requests.post(endpoint, params=params)
    search_data = response.json()
    
    job_progress_url = search_data['_links']['venue_search_progress']
    print(f"Venue search initiated. Progress URL: {job_progress_url}")
    return job_progress_url


### --- Step 4: Retrieve Venue Search Results ---

In [14]:
def retrieve_venue_search_results(job_progress_url):
    print("Retrieving venue search results...")
    while True:
        response = requests.get(job_progress_url)
        progress_data = response.json()
        if progress_data['job_finished']:
            break
        time.sleep(5)
    venues = progress_data['venues']
    print(f"Found {len(venues)} venues.")
    return venues


### --- Step 5: Extract Historical Data ---

In [15]:
def extract_historical_data(venues):
    print("Extracting historical data...")
    historical_data = []
    for venue in venues:
        if venue['forecast']:  # Check if venue has forecast data
            forecast_data = venue['venue_foot_traffic_forecast']
            for day_data in forecast_data['analysis']:
                for hour_data in day_data['hour_analysis']:
                    historical_data.append([
                        venue['venue_name'],
                        venue['venue_address'],
                        day_data['day_info']['day_text'],
                        hour_data['hour'],
                        hour_data['intensity_txt']
                    ])
    return historical_data


### --- Step 6: Backfill Historical Data ---

In [16]:
def backfill_historical_data(api_key=BESTTIME_API_KEY_PRIVATE):
    print("Starting backfill"
    # Perform venue search and retrieve results
    job_progress_url = perform_venue_search(api_key)
    venues = retrieve_venue_search_results(job_progress_url)

    # Extract historical data from the venue search results
    historical_data = extract_historical_data(venues)

    # Convert to DataFrame
    df_historical = pd.DataFrame(historical_data, columns=['Venue Name', 'Venue Address', 'Day', 'Hour', 'Busyness'])

    # Rename columns to match feature store requirements
    df_historical.columns = ['venue_name', 'venue_address', 'day', 'hour', 'busyness']
    
    # Add last_updated column
    df_historical['last_updated'] = datetime.datetime.now()

    # Get or create the feature group
    fg = get_or_create_feature_group()

    # Insert historical data into the feature group
    fg.insert(df_historical, write_options={"wait": True})
    print("Historical data backfilled successfully.")


### --- Step 7: Query Real-Time Data ---

In [17]:
def query_realtime_data(venue_id, api_key=BESTTIME_API_KEY_PRIVATE):
    endpoint = 'https://besttime.app/api/v1/venues/analysis'
    params = {
        'api_key_private': api_key,
        'venue_id': venue_id,
        'format': 'all'
    }
    response = requests.post(endpoint, params=params)
    return response.json()


### --- Step 8: Update Real-Time Data ---

In [18]:
def update_realtime_data(api_key=BESTTIME_API_KEY_PRIVATE):
    # Perform venue search and retrieve results
    job_progress_url = perform_venue_search(api_key)
    venues = retrieve_venue_search_results(job_progress_url)

    # Extract real-time data from the venue search results
    historical_data = extract_historical_data(venues)

    # Convert to DataFrame
    df_historical = pd.DataFrame(historical_data, columns=['Venue Name', 'Venue Address', 'Day', 'Hour', 'Busyness'])

    # Rename columns to match feature store requirements
    df_historical.columns = ['venue_name', 'venue_address', 'day', 'hour', 'busyness']
    
    # Add last_updated column
    df_historical['last_updated'] = datetime.datetime.now()

    # Insert real-time data into the feature group
    fg = get_or_create_feature_group()
    fg.insert(df_historical, write_options={"wait": True})
    print("Real-time data updated successfully.")


### --- Step 9: Main Function ---

In [None]:
def main():
    # Call backfill_historical_data() once to backfill
    # Uncomment the next line to perform backfill
    # backfill_historical_data()

    # Call update_realtime_data() periodically to update with real-time data
    update_realtime_data()

if __name__ == "__main__":
    main()
