In [1]:
import pandas as pd

data = { 
    'credit_card_number': ['1111 2222 3333 4444', '1111 2222 3333 4444','1111 2222 3333 4444',
                           '1111 2222 3333 4444'],
    'trans_datetime': ['2022-01-01 08:44', '2022-01-02 19:44', '2022-01-02 20:44', '2022-01-02 20:55'],
    'amount': [142.34, 12.34, 66.29, 112.33],
    'location': ['Sao Paolo', 'Rio De Janeiro', 'Stockholm', 'Stockholm'],
    'fraud': [False, False, True, True] 
}

df = pd.DataFrame.from_dict(data)
df['trans_datetime']= pd.to_datetime(df['trans_datetime'])
df

Unnamed: 0,credit_card_number,trans_datetime,amount,location,fraud
0,1111 2222 3333 4444,2022-01-01 08:44:00,142.34,Sao Paolo,False
1,1111 2222 3333 4444,2022-01-02 19:44:00,12.34,Rio De Janeiro,False
2,1111 2222 3333 4444,2022-01-02 20:44:00,66.29,Stockholm,True
3,1111 2222 3333 4444,2022-01-02 20:55:00,112.33,Stockholm,True


In [2]:
import hopsworks
proj = hopsworks.login()
fs = proj.get_feature_store()

Connected. Call `.close()` to terminate connection gracefully.

Logged in to project, explore it here https://c.app.hopsworks.ai:443/p/398
Connected. Call `.close()` to terminate connection gracefully.


### Create a Feature Group

Hopsworks have comprehensive documentation on Feature Groups. Click on these links to learn more.

* [Feature Group Concept](https://docs.hopsworks.ai/3.0/concepts/fs/feature_group/fg_overview/)
* [Feature Group Creation Guide](https://docs.hopsworks.ai/3.0/user_guides/fs/feature_group/create/)
* [Feature Group API Docs](https://docs.hopsworks.ai/feature-store-api/3.0/generated/api/feature_group_api/)

In [3]:
fg = fs.get_or_create_feature_group(
     name="credit_card_transactions",
     version=1,
     description="Credit Card Transaction data",
     primary_key=['credit_card_number'],
     event_time='trans_datetime'
) 

### Write your DataFrame to the Feature Group
When you write your DataFrame to the feature group, first the DataFrame is copied to Hopsworks. 
Then a backfill ingestion job is run on Hopsworks to insert/append the DataFrame to the Feature Group. 
The job is a Spark job, and the data is stored in a Apache Hudi table in Hopsworks.

It will take about 1 minute for the ingestion job to complete.
If you don't want to wait 1 minute, you make the ingestion job run in the background with:


    fg.insert(df, write_options={"wait_for_job": False})

In [4]:
fg.insert(df)

Feature Group created successfully, explore it at 
https://c.app.hopsworks.ai:443/p/398/fs/335/fg/1364


Uploading Dataframe: 0.00% |          | Rows 0/4 | Elapsed Time: 00:00 | Remaining Time: ?

Launching offline feature group backfill job...
Backfill Job started successfully, you can follow the progress at 
https://c.app.hopsworks.ai/p/398/jobs/named/credit_card_transactions_1_offline_fg_backfill/executions


(<hsfs.core.job.Job at 0x7feaa70a2df0>, None)

In [5]:
query = fg.select(["amount", "location", "fraud"])

In [6]:
fv = fs.create_feature_view(name="credit_card_transactions",
                            version=1,
                            description="Features from the credit_card_transactions FG",
                            labels=["fraud"],
                            query=query)

Feature view created successfully, explore it at 
https://c.app.hopsworks.ai:443/p/398/fs/335/fv/credit_card_transactions/version/1


In [8]:
X_train, X_test, y_train, y_test = fv.train_test_split(0.5)
X_train

RestAPIError: Metadata operation error: (url: https://c.app.hopsworks.ai/hopsworks-api/api/project/398/featurestores/335/featureview/credit_card_transactions/version/1/trainingdatasets). Server response: 
HTTP code: 500, HTTP reason: Internal Server Error, body: b'{"type":"restApiJsonResponse","errorCode":120000,"errorMsg":"A generic error occurred."}', error code: 120000, error msg: A generic error occurred., user msg: 

In [None]:
X_train

In [None]:
X_test

In [None]:
y_train

In [None]:
y_test

### Aggregations

Compute the total amount spent on the credit card by first grouping all the rows together with the same `credit_card_number` and then summing up their amounts. 

The code first creates a new DataFrame with only the `credit_card_number` and `amount` columns, then the logic of a group-by could be described as 

    for-each (`credit_card_number`) do \sigma amount

In [None]:
df2 = df[["credit_card_number", "amount"]].groupby("credit_card_number").sum()
df2.rename(columns={"amount": "total_spent"}, inplace=True)
df2.info()

In [None]:
df2

 We might also want to know at what point-in-time was that total and add a column with the datetime of the last (most recent) credit card transaction.

In [None]:
df2["as_of_datetime"] = df[["credit_card_number", "trans_datetime"]].groupby("credit_card_number").max()
df2

The `groupby` operation sets `credit_card_number` as the index of our DataFrame.
We want `credit_card_number` as a column, as Pandas indexes are not written to the Feature Group.
We can move the index to a column using `reset_index`.

In [None]:
df2.reset_index(inplace=True)
df2

We create a feature group to store the contents of `df2` with our aggregated credit card spending information.

In [None]:
fg2 = fs.get_or_create_feature_group(
     name="credit_card_spending",
     version=1,
     description="Credit Card Spending",
     primary_key=['credit_card_number'],
     event_time='as_of_datetime'
) 

In [None]:
fg2.insert(df2, write_options={"wait_for_job": False})

Let's add some more data to our original feature group

In [None]:
more_data = { 
    'credit_card_number': ['9999 8888 7777 6666', '9999 8888 7777 6666','9999 8888 7777 6666',
                           '9999 8888 7777 6666'],
    'trans_datetime': ['2022-01-02 04:11', '2022-01-03 07:24', '2022-01-05 10:33', '2022-01-05 11:50'],
    'amount': [55.67, 84, 77.95, 183],
    'location': ['San Francisco', 'San Francisco', 'Dublin', 'Dublin'],
    'fraud': [False, False, False, False] 
}

df3 = pd.DataFrame.from_dict(more_data)
df3['trans_datetime']= pd.to_datetime(df3['trans_datetime'])

fg = fs.get_feature_group(name="credit_card_transactions", version=1)

fg.insert(df3, write_options={"wait_for_job": False})

Now let's compute how much money was spent on the card since the last time we computed amount spent

In [None]:
df4=df

In [None]:
df4['is_big'] = df['amount'].apply(lambda amount: amount > 100)
df4

In [None]:
def is_small(row):
    return row['amount'] < 100

df4['is_small'] = df.apply(is_small, axis=1)

df4

## Time Series: Window Aggregations

Count the amount of money spent per day (make the length of the window '1d').
We will need to set the `event_time` column as the index in order to use Pandas built-in window aggregations.

In [None]:
df5 = fg.read()
df5

In [None]:
df5 = df5.set_index('trans_datetime')

In [None]:
df5 = df5.sort_index()

In [None]:
df5['rolling_max_1d'] = df5.rolling('1D').amount.max()
df5

In [None]:
df5['rolling_mean_1d'] = df5.rolling('1D').amount.mean()
df5

In [None]:
df5.reset_index(inplace=True)

In [None]:
fg_agg = fs.get_or_create_feature_group(
     name="credit_card_rolling_windows",
     version=1,
     description="Daily Credit Card Spending",
     primary_key=['credit_card_number'],
     event_time='trans_datetime'
) 

In [None]:
fg_agg.insert(df5)

### Create a Feature View using features from multiple Feature Groups

We want to create a model that uses features from multiple feature groups. 
We will select features from the different feature groups and join them together to create a query object. 
We can read the data in the query object as a DataFrame to inspect it before we create the feature view. 
We will use the feature view to read the training data for the model.

In [None]:
query = fg.select_all().join(fg_agg.select(['rolling_max_1d', 'rolling_mean_1d']))

training_data = query.read()
training_data.head()

In [None]:
fv = fs.create_feature_view(name="credit_card_fraud_rolling",
                            description="Features for a model to predict credit card fraud, including rolling windows",
                            version=1,
                            query=query)

In [None]:
X_train, y_train, X_test, y_test = fv.train_test_split(0.25)

In [None]:
X_train

In [None]:
df5

In [None]:
fg = fs.get_feature_group(name="credit_card_transactions", version=1)
read_df = fg.read()

In [None]:
read_df

In [None]:
df3