<span style="font-width:bold; font-size: 3rem; color:#333;">- Part 01: Feature Backfill for Stock Market Data</span>




## Imports

In [1]:
import datetime
import requests
import pandas as pd
import hopsworks
import datetime
from pathlib import Path
from functions import util
import json
import re
import os
import warnings
warnings.filterwarnings("ignore")

In [2]:
from functions import util


### TO WIPE OUT ALL OF FEATURES AND MODELS, run the cell below

In [3]:
# If you haven't set the env variable 'HOPSWORKS_API_KEY', then uncomment the next line and enter your API key
# os.environ["HOPSWORKS_API_KEY"] = ""
# proj = hopsworks.login()
# util.purge_project(proj)

---

## Alphavantage API Key

In [8]:
api_key_file = '../data/alphavantage-api-key.txt'
util.check_file_path(api_key_file)

with open(api_key_file, 'r') as file:
    ALPHAVANTAGE_API_KEY = file.read().rstrip()

File successfully found at the path: ../data/alphavantage-api-key.txt


## Hopsworks API Key


In [40]:
with open('../data/hopsworks-api-key.txt', 'r') as file:
    os.environ["HOPSWORKS_API_KEY"] = file.read().rstrip()

project = hopsworks.login()

Connection closed.
Connected. Call `.close()` to terminate connection gracefully.







Logged in to project, explore it here https://c.app.hopsworks.ai:443/p/1159326


In [10]:
secrets = util.secrets_api(project.name)
try:
    secrets.create_secret("ALPHAVANTAGE_API_KEY", ALPHAVANTAGE_API_KEY)
except hopsworks.RestAPIError:
    ALPHAVANTAGE_API_KEY = secrets.get_secret("ALPHAVANTAGE_API_KEY").value

Connected. Call `.close()` to terminate connection gracefully.






### Validate that ALPHAVANTAGE_API_KEY works

In [12]:
try:
    sp_SPOT_df = util.get_stock_price('SPOT', ALPHAVANTAGE_API_KEY)
except hopsworks.RestAPIError:
    print("It looks like the AQI_API_KEY doesn't work for your sensor. Is the API key correct? Is the sensor URL correct?")

sp_SPOT_df.head()

Unnamed: 0,price,date
0,484.899994,2024-12-16


## Read CSV file into DataFrames

In [29]:
csv_file_SPOT="../data/daily_SPOT.csv"
csv_file_GOOGL="../data/daily_GOOGL.csv"
csv_file_BTC="../data/currency_daily_BTC_USD.csv"

In [30]:
df_SPOT = pd.read_csv(csv_file_SPOT)
df_GOOGL = pd.read_csv(csv_file_GOOGL)
df_BTC = pd.read_csv(csv_file_BTC)

## Data cleaning

In [33]:
# extract closing price
df_SPOT.drop(columns=['open', 'high', 'low', 'volume'])
df_GOOGL.drop(columns=['open', 'high', 'low', 'volume'])
df_BTC.drop(columns=['open', 'high', 'low', 'volume'])


Unnamed: 0,timestamp,close
0,2024-12-17,106220.60
1,2024-12-16,106099.81
2,2024-12-15,104447.76
3,2024-12-14,101399.99
4,2024-12-13,101428.75
...,...,...
345,2024-01-07,43950.28
346,2024-01-06,43992.44
347,2024-01-05,44186.59
348,2024-01-04,44193.29


In [34]:
df_SPOT.dropna(inplace=True)
df_GOOGL.dropna(inplace=True)
df_BTC.dropna(inplace=True)

## Define Data Validation Rules 


In [37]:
import great_expectations as ge

# Create an Expectation Suite
sp_expectation_suite = ge.core.ExpectationSuite(
    expectation_suite_name="sp_expectation_suite"
)

# Add an expectation to check that all prices are >= 0
sp_expectation_suite.add_expectation(
    ge.core.ExpectationConfiguration(
        expectation_type="expect_column_min_to_be_between",
        kwargs={
            "column": "price",
            "min_value": 0.01,  # Set minimum value to 0.01 to exclude 0 and negatives
        }
    )
)


{"expectation_type": "expect_column_min_to_be_between", "kwargs": {"column": "price", "min_value": 0.01}, "meta": {}}

## Connect to Hopsworks

In [46]:
project

Project('ID2223_Hosl_Ghosh', 'hosl@kth.se', 'Default project')

In [45]:
fs = project.get_feature_store() 

Connected. Call `.close()` to terminate connection gracefully.


TypeError: __init__() missing 1 required positional argument: 'hive_endpoint'

## Create the Feature Groups and insert the DataFrames in them

In [38]:
SPOT_fg = fs.get_or_create_feature_group(
    name='SPOT',
    description='Spotify Stock Prices',
    version=1,
    primary_key=['price'],
    event_time="date",
    expectation_suite=sp_expectation_suite
)

GOOGL_fg = fs.get_or_create_feature_group(
    name='GOOGL',
    description='Google Stock Prices',
    version=1,
    primary_key=['price'],
    event_time="date",
    expectation_suite=sp_expectation_suite
)

BTC_fg = fs.get_or_create_feature_group(
    name='BTC',
    description='Bitcoin Prices',
    version=1,
    primary_key=['price'],
    event_time="date",
    expectation_suite=sp_expectation_suite
)

NameError: name 'fs' is not defined

In [81]:
SPOT_fg.insert(SPOT_fg)
GOOGL_fg.insert(GOOGL_fg)
BTC_fg.insert(BTC_fg)

2024-11-18 22:41:10,269 INFO: 	1 expectation(s) included in expectation_suite.
Validation succeeded.
Validation Report saved successfully, explore a summary at https://c.app.hopsworks.ai:443/p/1159326/fs/1150029/fg/1338743


Uploading Dataframe: 0.00% |          | Rows 0/3924 | Elapsed Time: 00:00 | Remaining Time: ?

Launching job: air_quality_1_offline_fg_materialization
Job started successfully, you can follow the progress at 
https://c.app.hopsworks.ai/p/1159326/jobs/named/air_quality_1_offline_fg_materialization/executions


(<hsfs.core.job.Job at 0x26cbc0f1520>,
 {
   "success": true,
   "results": [
     {
       "success": true,
       "expectation_config": {
         "expectation_type": "expect_column_min_to_be_between",
         "kwargs": {
           "column": "pm25",
           "min_value": -0.1,
           "max_value": 500.0,
           "strict_min": true
         },
         "meta": {
           "expectationId": 662538
         }
       },
       "result": {
         "observed_value": 5.0,
         "element_count": 3924,
         "missing_count": null,
         "missing_percent": null
       },
       "meta": {
         "ingestionResult": "INGESTED",
         "validationTime": "2024-11-18T09:41:10.000269Z"
       },
       "exception_info": {
         "raised_exception": false,
         "exception_message": null,
         "exception_traceback": null
       }
     }
   ],
   "evaluation_parameters": {},
   "statistics": {
     "evaluated_expectations": 1,
     "successful_expectations": 1,
     "un

#### Enter a description for each feature in the Feature Group

In [84]:
air_quality_fg.update_feature_description("date", "Date of measurement of air quality")
air_quality_fg.update_feature_description("country", "Country where the air quality was measured (sometimes a city in acqcn.org)")
air_quality_fg.update_feature_description("city", "City where the air quality was measured")
air_quality_fg.update_feature_description("street", "Street in the city where the air quality was measured")
air_quality_fg.update_feature_description("pm25", "Particles less than 2.5 micrometers in diameter (fine particles) pose health risk")

air_quality_lag_fg.update_feature_description("date", "Date of measurement of air quality")
air_quality_lag_fg.update_feature_description("country", "Country where the air quality was measured (sometimes a city in acqcn.org)")
air_quality_lag_fg.update_feature_description("city", "City where the air quality was measured")
air_quality_lag_fg.update_feature_description("street", "Street in the city where the air quality was measured")
air_quality_lag_fg.update_feature_description("pm25", "Particles less than 2.5 micrometers in diameter (fine particles) pose health risk")
air_quality_lag_fg.update_feature_description("pm25_prev", "mean pm25 value of last three days")

<hsfs.feature_group.FeatureGroup at 0x26cbe4e7250>

### <span style='color:#ff5f27'> üå¶ Weather Data
    
 1. Provide a name, description, and version for the feature group.
 2. Define the `primary_key`: we have to select which columns uniquely identify each row in the DataFrame - by providing them as the `primary_key`. Here, each weather measurement is uniquely identified by `city` and  `date`.
 3. Define the `event_time`: We also define which column stores the timestamp or date for the row - `date`.
 4. Attach any `expectation_suite` containing data validation rules

In [85]:
# Get or create feature group 
weather_fg = fs.get_or_create_feature_group(
    name='weather',
    description='Weather characteristics of each day',
    version=1,
    primary_key=['city', 'date'],
    event_time="date",
    expectation_suite=weather_expectation_suite
) 

#### Insert the DataFrame into the Feature Group

In [86]:
# Insert data
weather_fg.insert(weather_df)

2024-11-18 22:41:37,438 INFO: 	2 expectation(s) included in expectation_suite.
Validation succeeded.
Validation Report saved successfully, explore a summary at https://c.app.hopsworks.ai:443/p/1159326/fs/1150029/fg/1337716


Uploading Dataframe: 0.00% |          | Rows 0/3973 | Elapsed Time: 00:00 | Remaining Time: ?

Launching job: weather_1_offline_fg_materialization
Job started successfully, you can follow the progress at 
https://c.app.hopsworks.ai/p/1159326/jobs/named/weather_1_offline_fg_materialization/executions


(<hsfs.core.job.Job at 0x26c953fb610>,
 {
   "success": true,
   "results": [
     {
       "success": true,
       "expectation_config": {
         "expectation_type": "expect_column_min_to_be_between",
         "kwargs": {
           "column": "precipitation_sum",
           "min_value": -0.1,
           "max_value": 1000.0,
           "strict_min": true
         },
         "meta": {
           "expectationId": 661515
         }
       },
       "result": {
         "observed_value": 0.0,
         "element_count": 3973,
         "missing_count": null,
         "missing_percent": null
       },
       "meta": {
         "ingestionResult": "INGESTED",
         "validationTime": "2024-11-18T09:41:37.000438Z"
       },
       "exception_info": {
         "raised_exception": false,
         "exception_message": null,
         "exception_traceback": null
       }
     },
     {
       "success": true,
       "expectation_config": {
         "expectation_type": "expect_column_min_to_be_bet

#### Enter a description for each feature in the Feature Group

In [87]:
weather_fg.update_feature_description("date", "Date of measurement of weather")
weather_fg.update_feature_description("city", "City where weather is measured/forecast for")
weather_fg.update_feature_description("temperature_2m_mean", "Temperature in Celsius")
weather_fg.update_feature_description("precipitation_sum", "Precipitation (rain/snow) in mm")
weather_fg.update_feature_description("wind_speed_10m_max", "Wind speed at 10m abouve ground")
weather_fg.update_feature_description("wind_direction_10m_dominant", "Dominant Wind direction over the dayd")

<hsfs.feature_group.FeatureGroup at 0x26cbe0a73a0>

## <span style="color:#ff5f27;">‚è≠Ô∏è **Next:** Part 02: Daily Feature Pipeline 
 </span> 


## <span style="color:#ff5f27;">‚è≠Ô∏è **Exercises:** 
 </span> 

Extra Homework:

  * Try adding a new feature based on a rolling window of 3 days for 'pm25'
      * This is not easy, as forecasting more than 1 day in the future, you won't have the previous 3 days of pm25 measurements.
      * df.set_index("date").rolling(3).mean() is only the start....
  * Parameterize the notebook, so that you can provide the `country`/`street`/`city`/`url`/`csv_file` as parameters. 
      * Hint: this will also require making the secret name (`SENSOR_LOCATION_JSON`), e.g., add the street name as part of the secret name. Then you have to pass that secret name as a parameter when running the operational feature pipeline and batch inference pipelines.
      * After you have done this, collect the street/city/url/csv files for all the sensors in your city or region and you make dashboards for all of the air quality sensors in your city/region. You could even then add a dashboard for your city/region, as done [here for Poland](https://github.com/erno98/ID2223).

Improve this AI System
  * As of mid 2024, there is no API call available to download historical data from the AQIN website. You could improve this system by writing a PR to download the CSV file using Python Selenium and the URL for the sensor.


---