
# Feature Pipeline
This notebook consists of 5 parts:
1. parsing new data from Sensade API
2. Inserting the newest data into our feature store

In [1]:
import random
import pandas as pd
import hopsworks 

import uuid
from datetime import datetime
import pytz

BACKFILL=False # is set to false, the script will generate new data

  from .autonotebook import tqdm as notebook_tqdm


## Fake data generation

In [2]:
# Create a timezone object for GMT+2
timezone = pytz.timezone('Etc/GMT-2')
# Get the current time in GMT+2
now = datetime.now(timezone).strftime("%m/%d/%Y, %H:%M:%S")

In [4]:
# function to generate one fake observation for EL1
def generate_magnetic_field_update_el1(name, x_max, x_min, y_max, y_min, z_max, z_min):
    """
    Returns a single magnetic field update as a single row in a DataFrame
    """
    df = pd.DataFrame({"id": [str(uuid.uuid4())],
                       "x": [random.uniform(x_max, x_min)],
                       "y": [random.uniform(y_max, y_min)],
                       "z": [random.uniform(z_max, z_min)],
                      })

    return df

def get_random_magnetic_field_update_el1():
    """
    Returns a DataFrame containing one random magnetic field update
    """
    detection_df = generate_magnetic_field_update_el1("detection", 1156, -374, 401, -1275, 603, -1433)
    no_detection_df = generate_magnetic_field_update_el1("no_detection", 2041, -860, 905, -897, -133, -4087)

    # randomly pick one of these 2 and write it to the featurestore
    pick_random = random.uniform(0,2)
    if pick_random >= 1:
        mag_df_el1 = detection_df
    else:
        mag_df_el1 = no_detection_df

    # Add current date and time to 'time' column
    mag_df_el1['time'] = now
    mag_df_el1['time'] = pd.to_datetime(mag_df_el1['time'])

    # Add a ID for the parking space sensor name EL1
    mag_df_el1['psensor'] = 'EL1'

    return mag_df_el1

In [5]:
# function to generate one fake observation for EL2
def generate_magnetic_field_update_el2(name, x_max, x_min, y_max, y_min, z_max, z_min):
    """
    Returns a single magnetic field update as a single row in a DataFrame
    """
    df = pd.DataFrame({"id": [str(uuid.uuid4())],
                       "x": [random.uniform(x_max, x_min)],
                       "y": [random.uniform(y_max, y_min)],
                       "z": [random.uniform(z_max, z_min)],
                      })

    return df

def get_random_magnetic_field_update_el2():
    """
    Returns a DataFrame containing one random magnetic field update
    """
    detection_df = generate_magnetic_field_update_el2("detection", 1156, -374, 401, -1275, 603, -1433)
    no_detection_df = generate_magnetic_field_update_el2("no_detection", 2041, -860, 905, -897, -133, -4087)

    # randomly pick one of these 2 and write it to the featurestore
    pick_random = random.uniform(0,2)
    if pick_random >= 1:
        mag_df_el2 = detection_df
    else:
        mag_df_el2 = no_detection_df

    # Add current date and time to 'time' column
    mag_df_el2['time'] = now
    mag_df_el2['time'] = pd.to_datetime(mag_df_el2['time'])

    # Add a ID for the parking space sensor name EL2
    mag_df_el2['psensor'] = 'EL2'

    return mag_df_el2

In [6]:
# function to generate one fake observation for EL3
def generate_magnetic_field_update_el3(name, x_max, x_min, y_max, y_min, z_max, z_min):
    """
    Returns a single magnetic field update as a single row in a DataFrame
    """
    df = pd.DataFrame({"id": [str(uuid.uuid4())],
                       "x": [random.uniform(x_max, x_min)],
                       "y": [random.uniform(y_max, y_min)],
                       "z": [random.uniform(z_max, z_min)],
                      })

    return df

def get_random_magnetic_field_update_el3():
    """
    Returns a DataFrame containing one random magnetic field update
    """
    detection_df = generate_magnetic_field_update_el3("detection", 1156, -374, 401, -1275, 603, -1433)
    no_detection_df = generate_magnetic_field_update_el3("no_detection", 2041, -860, 905, -897, -133, -4087)

    # randomly pick one of these 2 and write it to the featurestore
    pick_random = random.uniform(0,2)
    if pick_random >= 1:
        mag_df_el3 = detection_df
    else:
        mag_df_el3 = no_detection_df

    # Add current date and time to 'time' column
    mag_df_el3['time'] = now
    mag_df_el3['time'] = pd.to_datetime(mag_df_el3['time'])

    # Add a ID for the parking space sensor name EL2
    mag_df_el3['psensor'] = 'EL3'

    return mag_df_el3

In [7]:
if BACKFILL == True:
    raise ValueError("No new data to gennerated. BACKFILL is set to False.")
else:
    mag_df_el1 = get_random_magnetic_field_update_el1()
    mag_df_el2 = get_random_magnetic_field_update_el2()
    mag_df_el3 = get_random_magnetic_field_update_el3()
    
print(mag_df_el1.head(), mag_df_el2.head(), mag_df_el3.head())

                                     id           x           y          z  \
0  10e81eab-aef4-4a8e-b000-de9f82acb44e  177.676998  359.081974  43.040308   

                 time psensor  
0 2024-04-30 11:06:05     EL1                                        id            x         y           z  \
0  7c9762f0-f879-423e-a867-3556fb37933d  1406.731235 -5.650229 -237.923224   

                 time psensor  
0 2024-04-30 11:06:05     EL2                                        id          x           y           z  \
0  b7a11920-a3e4-4ee8-a44e-9c84f1f2ed91  161.67681 -512.321921  154.182894   

                 time psensor  
0 2024-04-30 11:06:05     EL3  


In [8]:
mag_df_el1.head()

Unnamed: 0,id,x,y,z,time,psensor
0,10e81eab-aef4-4a8e-b000-de9f82acb44e,177.676998,359.081974,43.040308,2024-04-30 11:06:05,EL1


In [9]:
mag_df_el1.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 1 entries, 0 to 0
Data columns (total 6 columns):
 #   Column   Non-Null Count  Dtype         
---  ------   --------------  -----         
 0   id       1 non-null      object        
 1   x        1 non-null      float64       
 2   y        1 non-null      float64       
 3   z        1 non-null      float64       
 4   time     1 non-null      datetime64[ns]
 5   psensor  1 non-null      object        
dtypes: datetime64[ns](1), float64(3), object(2)
memory usage: 176.0+ bytes


## Upload new data in a feature group for each parking space

In [10]:
# connect to hopsworks
project = hopsworks.login()
fs = project.get_feature_store()

Connected. Call `.close()` to terminate connection gracefully.



Logged in to project, explore it here https://c.app.hopsworks.ai:443/p/549014
Connected. Call `.close()` to terminate connection gracefully.


In [11]:

new_parking_detection_features_el1 = fs.get_or_create_feature_group(name="new_parking_detection_features_el1",
                                  version=1,
                                  primary_key=["id"],
                                  event_time='time',
                                  description="New data for EL1",
                                  online_enabled=True
                                 )
new_parking_detection_features_el1.insert(mag_df_el1)

Feature Group created successfully, explore it at 
https://c.app.hopsworks.ai:443/p/549014/fs/544837/fg/767323


Uploading Dataframe: 100.00% |██████████| Rows 1/1 | Elapsed Time: 00:05 | Remaining Time: 00:00


Launching job: new_parking_detection_features_el1_1_offline_fg_materialization
Job started successfully, you can follow the progress at 
https://c.app.hopsworks.ai/p/549014/jobs/named/new_parking_detection_features_el1_1_offline_fg_materialization/executions


(<hsfs.core.job.Job at 0x7f5d2c212da0>, None)

In [12]:
new_parking_detection_features_el2 = fs.get_or_create_feature_group(name="new_parking_detection_features_el2",
                                  version=1,
                                  primary_key=["id"],
                                  event_time='time',
                                  description="New data for EL2",
                                  online_enabled=True
                                 )
new_parking_detection_features_el2.insert(mag_df_el2)

Feature Group created successfully, explore it at 
https://c.app.hopsworks.ai:443/p/549014/fs/544837/fg/766329


Uploading Dataframe: 100.00% |██████████| Rows 1/1 | Elapsed Time: 00:05 | Remaining Time: 00:00


Launching job: new_parking_detection_features_el2_1_offline_fg_materialization
Job started successfully, you can follow the progress at 
https://c.app.hopsworks.ai/p/549014/jobs/named/new_parking_detection_features_el2_1_offline_fg_materialization/executions


(<hsfs.core.job.Job at 0x7f5d2cf2cc10>, None)

In [13]:
new_parking_detection_features_el3 = fs.get_or_create_feature_group(name="new_parking_detection_features_el3",
                                  version=1,
                                  primary_key=["id"],
                                  event_time='time',
                                  description="New data for EL3",
                                  online_enabled=True
                                 )
new_parking_detection_features_el3.insert(mag_df_el3)

Feature Group created successfully, explore it at 
https://c.app.hopsworks.ai:443/p/549014/fs/544837/fg/767324


Uploading Dataframe: 100.00% |██████████| Rows 1/1 | Elapsed Time: 00:05 | Remaining Time: 00:00


Launching job: new_parking_detection_features_el3_1_offline_fg_materialization
Job started successfully, you can follow the progress at 
https://c.app.hopsworks.ai/p/549014/jobs/named/new_parking_detection_features_el3_1_offline_fg_materialization/executions


(<hsfs.core.job.Job at 0x7f5d2c1d2e60>, None)

## Sensade API

### Sensor Data Access

 

GET request to `data.sensade.com`

Authentication: `Basic Auth` (user: miknie20@student.aau.dk, password: IJSN0cE9xYCWPCZNH4KbXD2MC)

{

    "dev_eui": "0080E115003BEA91",

    "from": "2024-03-01",

    "to": "2024-03-08"

}

### Sensors

Two sensors installed at Novi:

`0080E115003BEA91` (Hw2.0 Fw2.0) Installed towards building

`0080E115003E3597` (Hw2.0 Fw2.0) Installed towards bike lane

In [2]:
import requests
import json
import io
import base64
from datetime import datetime, timedelta

In [3]:
# Create a timezone object for GMT+2
timezone = pytz.timezone('Europe/Bucharest')

now = datetime.now(pytz.utc)  # Get current time in UTC
today = now.astimezone(timezone)  # Convert current time to the desired timezone
yesterday = today - timedelta(days=1)
tomorrow = today + timedelta(days=1)

In [40]:
# Format 'today', 'tomorrow', and 'yesterday' as "YYYY-MM-DD"
formatted_today = today.strftime('%Y-%m-%d')
formatted_tomorrow = tomorrow.strftime('%Y-%m-%d')
formatted_yesterday = yesterday.strftime('%Y-%m-%d')
url = "https://data.sensade.com"
dev_eui_building = "0080E115003BEA91"
dev_eui_bikelane = "0080E115003E3597"
username = "miknie20@student.aau.dk"
pwd = "IJSN0cE9xYCWPCZNH4KbXD2MC"
basic_auth = base64.b64encode(f"{username}:{pwd}".encode())
headers = {
    'Content-Type': 'application/json',
    'Authorization': f'Basic {basic_auth.decode("utf-8")}'
}

In [41]:
# API call for the building parking spot
def API_call(dev_eui, from_date, to_date):
    payload = json.dumps({
    "dev_eui": dev_eui,
    "from": from_date,
    "to": to_date
})

    response = requests.request("GET", url, headers=headers, data=payload)

    if response.status_code != 200:
        exit(13)

    csv_data = response.text
    df = pd.read_csv(io.StringIO(csv_data))
    return df

In [42]:
df_building_from_api = API_call(dev_eui_building, formatted_yesterday, formatted_tomorrow)

In [44]:
df_bikelane_from_api = API_call(dev_eui_bikelane, formatted_yesterday, formatted_tomorrow)

In [47]:
df_building_newest = df_building_from_api.tail(1)
df_bikelane_newest = df_bikelane_from_api.tail(1)

## Backfill the Sensade API feature group

In [None]:
BACKFILL=False

In [48]:
# Create a unique identifier for each row in the datasets
def create_id(df, dataset_name):
    # Assign the sensor prefix based on the dataset name
    if dataset_name == 'df_building_newest':
        df['psensor'] = "BUILDING"
    elif dataset_name == 'df_bikelane_newest':
        df['psensor'] = "BIKELANE"
    else:
        raise ValueError("Unknown dataset name provided")

    # Create a new column 'id' with a unique identifier for each row
    df['id'] = [str(uuid.uuid4()) for _ in df.index]

    return df

In [52]:
# Applying the function to the datasets
df_bikelane_newest = df_bikelane_newest.copy()
df_bikelane = create_id(df_bikelane_newest, 'df_bikelane_newest')
df_building_newest = df_building_newest.copy()
df_building = create_id(df_building_newest, 'df_building_newest')

In [53]:
#converting the time column to datetime
df_bikelane['time'] = pd.to_datetime(df_bikelane['time'])
df_building['time'] = pd.to_datetime(df_building['time'])

In [56]:
#Renaming the radar columns to start with radar
df_bikelane = df_bikelane.rename(columns={'0_radar': 'radar_0', '1_radar': 'radar_1', '2_radar': 'radar_2', '3_radar': 'radar_3', '4_radar': 'radar_4', '5_radar': 'radar_5', '6_radar': 'radar_6', '7_radar': 'radar_7'})
df_building = df_building.rename(columns={'0_radar': 'radar_0', '1_radar': 'radar_1', '2_radar': 'radar_2', '3_radar': 'radar_3', '4_radar': 'radar_4', '5_radar': 'radar_5', '6_radar': 'radar_6', '7_radar': 'radar_7'})


In [57]:
# Converting the columns to float
df_bikelane[['x','y','z', 'radar_0', 'radar_1', 'radar_2', 'radar_3', 'radar_4', 'radar_5', 'radar_6', 'radar_7', 'f_cnt', 'dr', 'rssi']] = df_bikelane[['x','y','z', 'radar_0', 'radar_1', 'radar_2', 'radar_3', 'radar_4', 'radar_5', 'radar_6', 'radar_7', 'f_cnt', 'dr', 'rssi']].astype(float)
df_building[['x','y','z', 'radar_0', 'radar_1', 'radar_2', 'radar_3', 'radar_4', 'radar_5', 'radar_6', 'radar_7', 'f_cnt', 'dr', 'rssi']] = df_building[['x','y','z', 'radar_0', 'radar_1', 'radar_2', 'radar_3', 'radar_4', 'radar_5', 'radar_6', 'radar_7', 'f_cnt', 'dr', 'rssi']].astype(float)


In [None]:
# Connceting to the Hopsworks project

project = hopsworks.login()

fs = project.get_feature_store()

In [None]:
bikelane_fg = fs.get_or_create_feature_group(name="bike_lane",
                                  version=1,
                                  primary_key=["id"],
                                  event_time='time',
                                  description="Bike lane data",
                                 )
bikelane_fg.insert(df_bikelane)

In [None]:
building_fg = fs.get_or_create_feature_group(name="building",
                                    version=1,
                                    primary_key=["id"],
                                    event_time='time',
                                    description="Building data",
                                     )
building_fg.insert(df_building)

## **Next up:** 3: Feature view creation
Go to the 3_featureview_creation.ipynb notebook