In [1]:
import datetime
import requests
import pandas as pd
import hopsworks
import datetime
from pathlib import Path
from functions import util
import json
import re
import os
import warnings
import dotenv
warnings.filterwarnings("ignore")

In [2]:
csv_file = "../data/abuja-air-quality.csv"
util.check_file_path(csv_file)

File successfully found at the path: ../data/abuja-air-quality.csv


In [3]:
country = "nigeria"
city = "abuja"
street = "us-embassy"
aqicn_url = "https://api.waqi.info/feed/@13449"

latitude, longitude = util.get_city_coordinates(country)

today = datetime.date.today()

In [4]:
AQI_API_KEY = os.getenv("AQI_API_KEY")

if AQI_API_KEY == None:
    raise ValueError("AQI_API_KEY not found in environment variables")

In [5]:
with open('../hopsworks-api-key.txt', 'r') as file:
    os.environ["HOPSWORKS_API_KEY"] = file.read().rstrip()

project = hopsworks.login()


Connected. Call `.close()` to terminate connection gracefully.

Logged in to project, explore it here https://c.app.hopsworks.ai:443/p/1048724


In [6]:
secrets = util.secrets_api(project.name)

try:
    secrets.create_secret("AQI_API_KEY", AQI_API_KEY)
except hopsworks.RestAPIError:
    AQI_API_KEY = secrets.get_secret("AQI_API_KEY").value

Connected. Call `.close()` to terminate connection gracefully.


In [7]:
try:
    aq_today_data = util.get_pm25(aqicn_url, country, city, street, today, AQI_API_KEY)
except hopsworks.RestAPIError:
    print("It looks like the AQI_API_KEY doesn't work for your sensor. Is the API key correct? Is the sensor URL correct?")

aq_today_data

Unnamed: 0,pm25,country,city,street,date,url
0,57.0,nigeria,abuja,us-embassy,2024-09-23,https://api.waqi.info/feed/@13449


In [8]:
data = pd.read_csv(csv_file, parse_dates=["date"], skipinitialspace=True)
data

Unnamed: 0,date,pm25
0,2024-08-01,71
1,2024-08-02,65
2,2024-08-03,38
3,2024-08-04,57
4,2024-08-05,70
...,...,...
1215,2021-03-27,205
1216,2021-03-28,318
1217,2021-03-29,307
1218,2021-03-30,376


In [9]:
data.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 1220 entries, 0 to 1219
Data columns (total 2 columns):
 #   Column  Non-Null Count  Dtype         
---  ------  --------------  -----         
 0   date    1220 non-null   datetime64[ns]
 1   pm25    1220 non-null   int64         
dtypes: datetime64[ns](1), int64(1)
memory usage: 19.2 KB


In [10]:
data["pm25"] = data["pm25"].astype(float)
data

Unnamed: 0,date,pm25
0,2024-08-01,71.0
1,2024-08-02,65.0
2,2024-08-03,38.0
3,2024-08-04,57.0
4,2024-08-05,70.0
...,...,...
1215,2021-03-27,205.0
1216,2021-03-28,318.0
1217,2021-03-29,307.0
1218,2021-03-30,376.0


In [11]:
data.dropna(inplace = True)
data

Unnamed: 0,date,pm25
0,2024-08-01,71.0
1,2024-08-02,65.0
2,2024-08-03,38.0
3,2024-08-04,57.0
4,2024-08-05,70.0
...,...,...
1215,2021-03-27,205.0
1216,2021-03-28,318.0
1217,2021-03-29,307.0
1218,2021-03-30,376.0


In [12]:
data["country"] = country
data["city"] = city
data["street"] = street
data["url"] = aqicn_url

data

Unnamed: 0,date,pm25,country,city,street,url
0,2024-08-01,71.0,nigeria,abuja,us-embassy,https://api.waqi.info/feed/@13449
1,2024-08-02,65.0,nigeria,abuja,us-embassy,https://api.waqi.info/feed/@13449
2,2024-08-03,38.0,nigeria,abuja,us-embassy,https://api.waqi.info/feed/@13449
3,2024-08-04,57.0,nigeria,abuja,us-embassy,https://api.waqi.info/feed/@13449
4,2024-08-05,70.0,nigeria,abuja,us-embassy,https://api.waqi.info/feed/@13449
...,...,...,...,...,...,...
1215,2021-03-27,205.0,nigeria,abuja,us-embassy,https://api.waqi.info/feed/@13449
1216,2021-03-28,318.0,nigeria,abuja,us-embassy,https://api.waqi.info/feed/@13449
1217,2021-03-29,307.0,nigeria,abuja,us-embassy,https://api.waqi.info/feed/@13449
1218,2021-03-30,376.0,nigeria,abuja,us-embassy,https://api.waqi.info/feed/@13449


In [13]:
earliest_aq_date = pd.Series.min(data["date"])
earliest_aq_date = earliest_aq_date.strftime("%Y-%m-%d")
earliest_aq_date


weather_data = util.get_historical_weather(city, earliest_aq_date, str(today), latitude, longitude)

Coordinates 9.595782279968262°N 7.99651575088501°E
Elevation 737.0 m asl
Timezone None None
Timezone difference to GMT+0 0 s


In [14]:
weather_data.info()

<class 'pandas.core.frame.DataFrame'>
Index: 1316 entries, 0 to 1315
Data columns (total 6 columns):
 #   Column                       Non-Null Count  Dtype         
---  ------                       --------------  -----         
 0   date                         1316 non-null   datetime64[ns]
 1   temperature_2m_mean          1316 non-null   float32       
 2   precipitation_sum            1316 non-null   float32       
 3   wind_speed_10m_max           1316 non-null   float32       
 4   wind_direction_10m_dominant  1316 non-null   float32       
 5   city                         1316 non-null   object        
dtypes: datetime64[ns](1), float32(4), object(1)
memory usage: 51.4+ KB


In [15]:
import great_expectations as ge
aq_expectation_suite = ge.core.ExpectationSuite(
    expectation_suite_name="aq_expectation_suite"
)

aq_expectation_suite.add_expectation(
    ge.core.ExpectationConfiguration(
        expectation_type="expect_column_min_to_be_between",
        kwargs={
            "column":"pm25",
            "min_value":-0.1,
            "max_value":500.0,
            "strict_min":True
        }
    )
)

{"expectation_type": "expect_column_min_to_be_between", "kwargs": {"column": "pm25", "min_value": -0.1, "max_value": 500.0, "strict_min": true}, "meta": {}}

In [16]:
import great_expectations as ge
weather_expectation_suite = ge.core.ExpectationSuite(
    expectation_suite_name = "weather_expectation_suite"
)

def expect_greater_than_zero(col):  
    weather_expectation_suite.add_expectation(
        ge.core.ExpectationConfiguration(
            expectation_type="expect_column_min_to_be_between",
            kwargs={
                "column":col,
                "min_value":-0.1,
                "max_value":1000.0,
                "strict_min":True
            }
        )
    )
    
expect_greater_than_zero("precipitation_sum")
expect_greater_than_zero("wind_speed_10m_max")

In [17]:
fs = project.get_feature_store()

Connected. Call `.close()` to terminate connection gracefully.


In [18]:
dict_obj = {
    "country": country,
    "city"  : city,
    "street": street,
    "aqicn_url": aqicn_url,
    "latitude" : latitude,
    "longitude": longitude, 
}

str_dict = json.dumps(dict_obj)

try:
    secrets.create_secret("SENSOR_LOCATION_JSON", str_dict)
except hopsworks.RestAPIError:
    print("SENSOR_LOCATION_JSON already exists. To update, delete the secret in the UI (https://c.app.hopsworks.ai/account/secrets) and re-run this cell.")
    existing_key = secrets.get_secret("SENSOR_LOCATION_JSON").value
    

SENSOR_LOCATION_JSON already exists. To update, delete the secret in the UI (https://c.app.hopsworks.ai/account/secrets) and re-run this cell.


In [19]:
air_quality_fg = fs.get_or_create_feature_group(
    name = "air_quality",
    description= "Air quality characteristics of each day",
    version = 1, 
    primary_key= ['city', 'street', 'date'],
    event_time="date",
    expectation_suite=aq_expectation_suite
)

In [20]:
air_quality_fg.insert(data)

2024-09-23 21:34:50,233 INFO: 	1 expectation(s) included in expectation_suite.
Validation succeeded.
Validation Report saved successfully, explore a summary at https://c.app.hopsworks.ai:443/p/1048724/fs/1040451/fg/1204590


Uploading Dataframe: 0.00% |          | Rows 0/1220 | Elapsed Time: 00:00 | Remaining Time: ?

Launching job: air_quality_1_offline_fg_materialization
Job started successfully, you can follow the progress at 
https://c.app.hopsworks.ai/p/1048724/jobs/named/air_quality_1_offline_fg_materialization/executions


(<hsfs.core.job.Job at 0x1d4de4dfd70>,
 {
   "success": true,
   "results": [
     {
       "success": true,
       "expectation_config": {
         "expectation_type": "expect_column_min_to_be_between",
         "kwargs": {
           "column": "pm25",
           "min_value": -0.1,
           "max_value": 500.0,
           "strict_min": true
         },
         "meta": {
           "expectationId": 621569
         }
       },
       "result": {
         "observed_value": 4.0,
         "element_count": 1220,
         "missing_count": null,
         "missing_percent": null
       },
       "meta": {
         "ingestionResult": "INGESTED",
         "validationTime": "2024-09-23T08:34:50.000233Z"
       },
       "exception_info": {
         "raised_exception": false,
         "exception_message": null,
         "exception_traceback": null
       }
     }
   ],
   "evaluation_parameters": {},
   "statistics": {
     "evaluated_expectations": 1,
     "successful_expectations": 1,
     "un

In [21]:
air_quality_fg.update_feature_description("date", "Date of measurement of air quality")
air_quality_fg.update_feature_description("country", "Country where the air quality was measured (sometimes a city in acqcn.org)")
air_quality_fg.update_feature_description("city", "City where the air quality was measured")
air_quality_fg.update_feature_description("street", "Street in the city where the air quality was measured")
air_quality_fg.update_feature_description("pm25", "Particles less than 2.5 micrometers in diameter (fine particles) pose health risk")

<hsfs.feature_group.FeatureGroup at 0x1d4de267ad0>

In [22]:
data

Unnamed: 0,date,pm25,country,city,street,url
0,2024-08-01,71.0,nigeria,abuja,us-embassy,https://api.waqi.info/feed/@13449
1,2024-08-02,65.0,nigeria,abuja,us-embassy,https://api.waqi.info/feed/@13449
2,2024-08-03,38.0,nigeria,abuja,us-embassy,https://api.waqi.info/feed/@13449
3,2024-08-04,57.0,nigeria,abuja,us-embassy,https://api.waqi.info/feed/@13449
4,2024-08-05,70.0,nigeria,abuja,us-embassy,https://api.waqi.info/feed/@13449
...,...,...,...,...,...,...
1215,2021-03-27,205.0,nigeria,abuja,us-embassy,https://api.waqi.info/feed/@13449
1216,2021-03-28,318.0,nigeria,abuja,us-embassy,https://api.waqi.info/feed/@13449
1217,2021-03-29,307.0,nigeria,abuja,us-embassy,https://api.waqi.info/feed/@13449
1218,2021-03-30,376.0,nigeria,abuja,us-embassy,https://api.waqi.info/feed/@13449


In [23]:
data.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 1220 entries, 0 to 1219
Data columns (total 6 columns):
 #   Column   Non-Null Count  Dtype         
---  ------   --------------  -----         
 0   date     1220 non-null   datetime64[ns]
 1   pm25     1220 non-null   float64       
 2   country  1220 non-null   object        
 3   city     1220 non-null   object        
 4   street   1220 non-null   object        
 5   url      1220 non-null   object        
dtypes: datetime64[ns](1), float64(1), object(4)
memory usage: 57.3+ KB


In [24]:
air_quality_fg

<hsfs.feature_group.FeatureGroup at 0x1d4de267ad0>

In [25]:
weather_data

Unnamed: 0,date,temperature_2m_mean,precipitation_sum,wind_speed_10m_max,wind_direction_10m_dominant,city
0,2021-02-14,25.401079,0.000000,11.183201,75.999123,abuja
1,2021-02-15,25.907333,0.000000,15.281989,40.532776,abuja
2,2021-02-16,26.217749,0.000000,20.268990,46.492268,abuja
3,2021-02-17,24.348997,0.000000,24.464113,46.088535,abuja
4,2021-02-18,21.411499,0.000000,21.129883,38.741505,abuja
...,...,...,...,...,...,...
1311,2024-09-17,22.855249,11.799999,11.367109,308.803558,abuja
1312,2024-09-18,22.476080,23.700005,10.483357,160.772873,abuja
1313,2024-09-19,22.598999,0.000000,7.289445,199.048691,abuja
1314,2024-09-20,22.611502,4.200000,11.013882,311.355408,abuja


In [26]:
weather_fg = fs.get_or_create_feature_group(
    name='weather',
    description='Weather characteristics of each day',
    version=1,
    primary_key=['city', 'date'],
    event_time="date",
    expectation_suite=weather_expectation_suite
) 

In [27]:
weather_fg.insert(weather_data)

2024-09-23 21:35:05,101 INFO: 	2 expectation(s) included in expectation_suite.
Validation succeeded.
Validation Report saved successfully, explore a summary at https://c.app.hopsworks.ai:443/p/1048724/fs/1040451/fg/1206605


Uploading Dataframe: 0.00% |          | Rows 0/1316 | Elapsed Time: 00:00 | Remaining Time: ?

Launching job: weather_1_offline_fg_materialization
Job started successfully, you can follow the progress at 
https://c.app.hopsworks.ai/p/1048724/jobs/named/weather_1_offline_fg_materialization/executions


(<hsfs.core.job.Job at 0x1d4de4dfb90>,
 {
   "success": true,
   "results": [
     {
       "success": true,
       "expectation_config": {
         "expectation_type": "expect_column_min_to_be_between",
         "kwargs": {
           "column": "wind_speed_10m_max",
           "min_value": -0.1,
           "max_value": 1000.0,
           "strict_min": true
         },
         "meta": {
           "expectationId": 622594
         }
       },
       "result": {
         "observed_value": 5.447788238525391,
         "element_count": 1316,
         "missing_count": null,
         "missing_percent": null
       },
       "meta": {
         "ingestionResult": "INGESTED",
         "validationTime": "2024-09-23T08:35:05.000101Z"
       },
       "exception_info": {
         "raised_exception": false,
         "exception_message": null,
         "exception_traceback": null
       }
     },
     {
       "success": true,
       "expectation_config": {
         "expectation_type": "expect_colum

In [28]:
weather_fg.update_feature_description("date", "Date of measurement of weather")
weather_fg.update_feature_description("city", "City where weather is measured/forecast for")
weather_fg.update_feature_description("temperature_2m_mean", "Temperature in Celsius")
weather_fg.update_feature_description("precipitation_sum", "Precipitation (rain/snow) in mm")
weather_fg.update_feature_description("wind_speed_10m_max", "Wind speed at 10m abouve ground")
weather_fg.update_feature_description("wind_direction_10m_dominant", "Dominant Wind direction over the dayd")

<hsfs.feature_group.FeatureGroup at 0x1d4de4dfe60>