In [1]:
import pandas as pd

# Data loading and simple exploration

In [2]:
data = pd.read_csv('../data/03_primary/preprocessed_data.csv')
data.head()

Unnamed: 0,scaled_amount,scaled_time,V1,V2,V3,V4,V5,V6,V7,V8,...,V20,V21,V22,V23,V24,V25,V26,V27,V28,Class
0,-0.282401,-0.62872,1.237076,0.240536,0.644799,0.700348,-0.662809,-1.186289,0.051565,-0.212036,...,-0.120263,-0.238889,-0.705162,0.179966,0.699409,0.165166,0.075183,-0.030301,0.021784,0
1,1.186334,-0.083001,-7.13906,2.773082,-6.757845,4.446456,-5.464428,-1.713401,-6.485365,3.409395,...,-0.616949,1.30325,-0.016118,-0.87667,0.38223,-1.054624,-0.614606,-0.766848,0.409424,1
2,0.391253,0.870217,-1.01847,-0.800243,2.73563,-1.710196,-0.756206,-0.299152,-0.86865,0.263516,...,0.360591,0.499789,1.190104,-0.230243,0.016621,0.394464,0.046844,0.052241,0.082638,0
3,2.29344,0.289078,-1.000611,3.34685,-5.534491,6.835802,-0.299803,0.095951,-2.440419,1.286301,...,1.189814,0.439757,-0.694099,0.29966,-0.657601,0.101648,0.430457,0.824685,0.326952,1
4,-0.135401,0.969807,0.211042,1.352425,-2.257035,0.154344,0.530051,0.207244,-2.051166,-5.231705,...,1.100233,-2.511672,0.943468,-0.019769,0.586162,0.618007,0.621341,-0.098063,0.13397,0


The data is already preprocessed and it looks like there is no room for further feature engineering.<br>
<br>
That way, the only thing left to do is to push all the columns to our feature store.

# Hopsworks 

## Upload

In [3]:
from dotenv import load_dotenv
import os

In [4]:
load_dotenv()

True

In [5]:
import hopsworks
from great_expectations.core import ExpectationSuite

In [6]:
from keyword import iskeyword

In [7]:
iskeyword('class_')

False

In [8]:
def to_feature_store(
    df: pd.DataFrame,
    group_name: str,
    description: str,
    group_description: dict,
    feature_group_version: Union[int, None] = None,
    validation_expectation_suite: ExpectationSuite = None
):
    '''
    This function takes in a pandas DataFrame and a validation expectation suite,
      performs validation on the data using the suite, and then saves the data to a
      feature store in the feature store.

    Args:
        - data (pd.DataFrame): Dataframe with the data to be stored
        - group_name (str): Name of the feature group.
        - feature_group_version (int | None): Version of the feature group. If None, increment 1 from last version
        - description (str): Description for the feature group.
        - group_description (dict): Description of each feature of the feature group. 
        - validation_expectation_suite (ExpectationSuite): group of expectations to check data.
        
    Returns:
        - A dictionary with the feature view version, feature view name and training dataset feature version.
    '''
    ##### DELETE THIS BIT AFTER GX IS IMPLEMENTED #####
    if validation_expectation_suite:
        raise NotImplementedError
    ###################################################
    
    if not isinstance(df, pd.DataFrame) and not isinstance(df, pd.Series):
        raise TypeError(f'Expect pd.DataFrame, got {type(df)}')
    if isinstance(df, pd.Series):
        new_df = pd.DataFrame({'index': df.index, df.name: df})
        df = new_df
    
    # Create primary key to posteriorly joins
    if 'index' not in df.columns:
        df = df.reset_index()

    # Hopsworks only accepts lowercase column names, better to sanitize beforehand. Also, try to protect from Python's reserved words
    df.columns = list(map(lambda x: x.lower() if not iskeyword(x.lower()) else x.lower() + '_', df.columns))
    
    # Get credentials
    project_name = os.environ.get('FS_PROJECT_NAME')
    api_key = os.environ.get('FS_API_KEY')
    
    # Connect to feature store.
    project = hopsworks.login(
        api_key_value=api_key, project=project_name
    )
    feature_store = project.get_feature_store()

    # Create feature group.
    object_feature_group = feature_store.get_or_create_feature_group(
        name=group_name,
        version=feature_group_version,
        primary_key=['index'],
        description= description,
        online_enabled=False,
        expectation_suite=validation_expectation_suite
    )
    
    # Upload data.
    object_feature_group.insert(
        features=df,
        overwrite=False,
        write_options={
            "wait_for_job": True,
        },
    )

    # Add feature descriptions.
    for description in group_description:
        object_feature_group.update_feature_description(
            description["name"], description["description"]
        )

    # Update statistics.
    object_feature_group.statistics_config = {
        "enabled": True,
        "histograms": True,
        "correlations": True,
    }
    object_feature_group.update_statistics_config()
    object_feature_group.compute_statistics()

    return object_feature_group

NameError: name 'Union' is not defined

### Test run

In [18]:
data.columns

Index(['scaled_amount', 'scaled_time', 'V1', 'V2', 'V3', 'V4', 'V5', 'V6',
       'V7', 'V8', 'V9', 'V10', 'V11', 'V12', 'V13', 'V14', 'V15', 'V16',
       'V17', 'V18', 'V19', 'V20', 'V21', 'V22', 'V23', 'V24', 'V25', 'V26',
       'V27', 'V28', 'Class'],
      dtype='object')

### Feature descriptions

In [82]:
feature_descriptions = [{'name': f'v{i}', 'description': 'Anonymized credit card data', 'validation_rules': 'TO DETERMINE'} for i in range(1, 29)] 
feature_descriptions += [
    {'name': 'scaled_amount', 'description': 'Scaled amount of transaction', 'validation_rules': 'TO DETERMINE'},
    {'name': 'scaled_time', 'description': 'Scaled amount of time, relative to first transaction observation', 'validation_rules': 'TO DETERMINE'},
    {'name': 'index', 'description': 'Index of the observations', 'validation_rules': 'Positive integer, unique'},
]

feature_descriptions

[{'name': 'v1',
  'description': 'Anonymized credit card data',
  'validation_rules': 'TO DETERMINE'},
 {'name': 'v2',
  'description': 'Anonymized credit card data',
  'validation_rules': 'TO DETERMINE'},
 {'name': 'v3',
  'description': 'Anonymized credit card data',
  'validation_rules': 'TO DETERMINE'},
 {'name': 'v4',
  'description': 'Anonymized credit card data',
  'validation_rules': 'TO DETERMINE'},
 {'name': 'v5',
  'description': 'Anonymized credit card data',
  'validation_rules': 'TO DETERMINE'},
 {'name': 'v6',
  'description': 'Anonymized credit card data',
  'validation_rules': 'TO DETERMINE'},
 {'name': 'v7',
  'description': 'Anonymized credit card data',
  'validation_rules': 'TO DETERMINE'},
 {'name': 'v8',
  'description': 'Anonymized credit card data',
  'validation_rules': 'TO DETERMINE'},
 {'name': 'v9',
  'description': 'Anonymized credit card data',
  'validation_rules': 'TO DETERMINE'},
 {'name': 'v10',
  'description': 'Anonymized credit card data',
  'valid

In [115]:
class_description = [
    {'name': 'index', 'description': 'Index of the observations', 'validation_rules': 'Positive integer, unique'},
    {'name': 'class_', 'description': 'Predicted class of the observation. 1 for fraud, 0 otherwise', 'validation_rules': '0 or 1'}
]

In [84]:
X = data.drop(columns=['Class'])
y = data['Class']

In [85]:
to_feature_store(
    df=X, group_name='features',
    feature_group_version=1, description='Test run of features',
    group_description=feature_descriptions
)

Connection closed.
Connected. Call `.close()` to terminate connection gracefully.

Logged in to project, explore it here https://c.app.hopsworks.ai:443/p/824468
Connected. Call `.close()` to terminate connection gracefully.


Uploading Dataframe: 0.00% |          | Rows 0/946 | Elapsed Time: 00:00 | Remaining Time: ?

Launching job: features_1_offline_fg_materialization
Job started successfully, you can follow the progress at 
https://c.app.hopsworks.ai/p/824468/jobs/named/features_1_offline_fg_materialization/executions


<hsfs.feature_group.FeatureGroup at 0x7b1253e48890>

In [116]:
to_feature_store(
    df=y, group_name='target',
    feature_group_version=5, description='Test run of target',
    group_description=class_description
)

Connection closed.
Connected. Call `.close()` to terminate connection gracefully.

Logged in to project, explore it here https://c.app.hopsworks.ai:443/p/824468
Connected. Call `.close()` to terminate connection gracefully.


Uploading Dataframe: 0.00% |          | Rows 0/946 | Elapsed Time: 00:00 | Remaining Time: ?

Launching job: target_5_offline_fg_materialization
Job started successfully, you can follow the progress at 
https://c.app.hopsworks.ai/p/824468/jobs/named/target_5_offline_fg_materialization/executions
Statistics Job started successfully, you can follow the progress at 
https://c.app.hopsworks.ai/p/824468/jobs/named/target_5_compute_stats_22062024193250/executions


<hsfs.feature_group.FeatureGroup at 0x7b1253e067d0>

## Download

In [26]:
from typing import Union

def get_features(
    group_name: str,
    version: Union[int, None] = None
):
    '''
    This function takes in the group name of the desired features in Hopsworks and returns ad pd.DataFrame with them.

    Args:
        - group_name (str): Name of the feature group.
        - version (int | None): Version number of feature group. If None, latest is returned
        
    Returns:
        - A pd.DataFrame with the features.
    '''
    project_name = os.environ.get('FS_PROJECT_NAME')
    api_key = os.environ.get('FS_API_KEY')
    
    project = hopsworks.login(api_key_value=api_key, project=project_name)
    fs = project.get_feature_store()

    if version:
        features = fs.get_feature_group(name=group_name, version=version)
    else:
        # Get a list with all the versions and chooses the latest
        features = max(fs.get_feature_groups(name='target'), key=attrgetter('version'))
        
    df = features.read()

    return df

In [27]:
get_features(
    'target'
)

Connection closed.
Connected. Call `.close()` to terminate connection gracefully.

Logged in to project, explore it here https://c.app.hopsworks.ai:443/p/824468
Connected. Call `.close()` to terminate connection gracefully.
Finished: Reading data from Hopsworks, using Hopsworks Feature Query Service (0.82s) 


Unnamed: 0,index,class_
0,866,0
1,848,0
2,21,1
3,762,0
4,294,1
...,...,...
941,707,0
942,239,1
943,171,0
944,874,1


In [123]:
get_features(
    'target',
    5
)

Connection closed.
Connected. Call `.close()` to terminate connection gracefully.

Logged in to project, explore it here https://c.app.hopsworks.ai:443/p/824468
Connected. Call `.close()` to terminate connection gracefully.
Finished: Reading data from Hopsworks, using Hopsworks Feature Query Service (0.74s) 


Unnamed: 0,index,class_
0,866,0
1,848,0
2,21,1
3,762,0
4,294,1
...,...,...
941,707,0
942,239,1
943,171,0
944,874,1


In [9]:
# Get credentials
project_name = os.environ.get('FS_PROJECT_NAME')
api_key = os.environ.get('FS_API_KEY')

# Connect to feature store.
project = hopsworks.login(
    api_key_value=api_key, project=project_name
)
feature_store = project.get_feature_store()

Connected. Call `.close()` to terminate connection gracefully.

Logged in to project, explore it here https://c.app.hopsworks.ai:443/p/824468
Connected. Call `.close()` to terminate connection gracefully.


In [11]:
feature_store.get_or_create_feature_group(name='features', version=5)

<hsfs.feature_group.FeatureGroup at 0x7527fad5ca90>