# CSV to Parquet

In [1]:
!pwd

/home/pierre/Realtime-MLOps/demonstration/apps/feature_store


In [2]:
import pandas as pd
df = pd.read_csv('../../../demonstration/apps/basic-model/BTC-2021min.csv')
df["unix"] = df["unix"].astype('datetime64[s]')
df["date"] = df["date"].astype('datetime64[s]')
df["timestamp"] = df["unix"]
df["timestamp_created"] = df["unix"]
df.drop(["unix", "date", "Volume BTC", "Volume USD"], inplace=True, axis=1)
df

Unnamed: 0,symbol,open,high,low,close,timestamp,timestamp_created
0,BTC/USD,43046.58,43046.58,43046.58,43046.58,2022-03-01 03:43:00,2022-03-01 03:43:00
1,BTC/USD,43018.23,43046.59,43018.23,43046.58,2022-03-01 03:41:00,2022-03-01 03:41:00
2,BTC/USD,43022.24,43022.24,43016.03,43016.03,2022-03-01 03:40:00,2022-03-01 03:40:00
3,BTC/USD,43035.16,43035.16,42999.44,42999.44,2022-03-01 03:39:00,2022-03-01 03:39:00
4,BTC/USD,43077.82,43077.82,43049.46,43049.46,2022-03-01 03:38:00,2022-03-01 03:38:00
...,...,...,...,...,...,...,...
610777,BTC/USD,29021.86,29023.38,28982.33,28999.50,2021-01-01 00:05:00,2021-01-01 00:05:00
610778,BTC/USD,29048.13,29057.73,29035.61,29045.19,2021-01-01 00:04:00,2021-01-01 00:04:00
610779,BTC/USD,29037.68,29069.39,29019.00,29048.13,2021-01-01 00:03:00,2021-01-01 00:03:00
610780,BTC/USD,29069.80,29073.02,29028.14,29035.89,2021-01-01 00:02:00,2021-01-01 00:02:00


In [20]:
df.to_parquet('feature_repo/data/BTC-2021min.parquet')

from sqlalchemy import create_engine
engine = create_engine('postgresql://feast:feast@offline-store-postgresql.feast.svc.cluster.local:5432/feast')
df.to_sql('crypto_source', engine)

782

# Read SQLite

In [4]:
import sqlite3

con = sqlite3.connect('feature_repo/data/online_store.db')


In [5]:
cursor = con.cursor()
cursor.execute('Select name from sqlite_master where type = "table"; ')
cursor.fetchall()

[('feature_store_crypto_stats',)]

In [6]:
cursor = con.cursor()
for row in cursor.execute('Select * from feature_store_crypto_stats;'):
    print(row)

(b'\x02\x00\x00\x00symbol\x02\x00\x00\x00\x07\x00\x00\x00BTC/USD', 'open', b'5=f\x95F', '2022-09-20 13:03:02', '2022-09-20 13:03:02')
(b'\x02\x00\x00\x00symbol\x02\x00\x00\x00\x07\x00\x00\x00BTC/USD', 'high', b'5\xc8t\x95F', '2022-09-20 13:03:02', '2022-09-20 13:03:02')
(b'\x02\x00\x00\x00symbol\x02\x00\x00\x00\x07\x00\x00\x00BTC/USD', 'low', b'5=f\x95F', '2022-09-20 13:03:02', '2022-09-20 13:03:02')
(b'\x02\x00\x00\x00symbol\x02\x00\x00\x00\x07\x00\x00\x00BTC/USD', 'close', b'5\xc8t\x95F', '2022-09-20 13:03:02', '2022-09-20 13:03:02')


In [7]:
pd.read_sql_query("select * from feature_store_crypto_stats", con)

Unnamed: 0,entity_key,feature_name,value,event_ts,created_ts
0,b'\x02\x00\x00\x00symbol\x02\x00\x00\x00\x07\x...,open,b'5=f\x95F',2022-09-20 13:03:02,2022-09-20 13:03:02
1,b'\x02\x00\x00\x00symbol\x02\x00\x00\x00\x07\x...,high,b'5\xc8t\x95F',2022-09-20 13:03:02,2022-09-20 13:03:02
2,b'\x02\x00\x00\x00symbol\x02\x00\x00\x00\x07\x...,low,b'5=f\x95F',2022-09-20 13:03:02,2022-09-20 13:03:02
3,b'\x02\x00\x00\x00symbol\x02\x00\x00\x00\x07\x...,close,b'5\xc8t\x95F',2022-09-20 13:03:02,2022-09-20 13:03:02


In [8]:
con.close()

# Read Features

In [21]:
from feast import FeatureStore
from datetime import datetime, timedelta
store = FeatureStore(repo_path="feature_repo")


In [22]:
store.list_entities()[0].name

'crypto'

In [23]:
entity_df = pd.DataFrame.from_dict(
{"symbol": ["BTC/USD"], "event_timestamp": datetime.now() - timedelta(days=365)}
)

In [24]:
features=[
    "crypto_stats:open",
    "crypto_stats:high",
    "crypto_stats:low",
    "crypto_stats:close",
]

online_features = store.get_online_features(
    features=features,
    entity_rows=[
        {"symbol": "BTC/USD"},]
).to_df()

In [25]:
online_features

Unnamed: 0,symbol,open,close,high,low
0,BTC/USD,21123.119141,21130.390625,21130.390625,21123.119141


In [26]:
df_new = pd.DataFrame(
[ {'symbol': 'BTC/USD', 'low': 21123.12, 'high': 21130.39, 'open': 21123.12, 'close': 21130.39, 'timestamp': 1663679399.863017, 'timestamp_created': 1663679999.157469}, ] )
    

In [27]:
df_new["timestamp"] = df_new["timestamp"].astype('datetime64[s]')
df_new["timestamp_created"] = df_new["timestamp_created"].astype('datetime64[s]')

In [28]:
df_new

Unnamed: 0,symbol,low,high,open,close,timestamp,timestamp_created
0,BTC/USD,21123.12,21130.39,21123.12,21130.39,2022-09-20 13:09:59,2022-09-20 13:19:59


In [228]:
!feast materialize  $(date -u +"%Y-%m-%dT%H:%M:%S")

Usage: feast materialize [OPTIONS] START_TS END_TS
Try 'feast materialize --help' for help.

Error: Missing argument 'END_TS'.


In [None]:
from feast.data_source import PushMode

store.write_to_online_store("crypto_stats", df_new, allow_registry_cache = False)
store.write_to_offline_store("crypto_stats", df_new, allow_registry_cache = True)
store.push("crypto_push_source", df_new, to=PushMode.ONLINE_AND_OFFLINE)

In [236]:
pd.read_parquet('demonstration/apps/feature_store/feature_repo/data/BTC-2021min.parquet')

Unnamed: 0,symbol,open,high,low,close,timestamp,timestamp_created
0,BTC/USD,19123.12,19130.39,19123.12,19130.39,2022-09-20 13:03:02,2022-09-20 13:03:02
1,BTC/USD,43046.58,43046.58,43046.58,43046.58,2022-03-01 03:43:00,2022-03-01 03:43:00
2,BTC/USD,43018.23,43046.59,43018.23,43046.58,2022-03-01 03:41:00,2022-03-01 03:41:00
3,BTC/USD,43022.24,43022.24,43016.03,43016.03,2022-03-01 03:40:00,2022-03-01 03:40:00
4,BTC/USD,43035.16,43035.16,42999.44,42999.44,2022-03-01 03:39:00,2022-03-01 03:39:00
...,...,...,...,...,...,...,...
610778,BTC/USD,29021.86,29023.38,28982.33,28999.50,2021-01-01 00:05:00,2021-01-01 00:05:00
610779,BTC/USD,29048.13,29057.73,29035.61,29045.19,2021-01-01 00:04:00,2021-01-01 00:04:00
610780,BTC/USD,29037.68,29069.39,29019.00,29048.13,2021-01-01 00:03:00,2021-01-01 00:03:00
610781,BTC/USD,29069.80,29073.02,29028.14,29035.89,2021-01-01 00:02:00,2021-01-01 00:02:00
