In [1]:
%load_ext autoreload
%autoreload 2

In [2]:
import logging

for handler in logging.root.handlers:
    logging.root.removeHandler(handler)
logging.basicConfig(level=logging.INFO)

In [3]:
# Set project root
from pathlib import Path

PROJECT_ROOT = Path("..").resolve()
PROJECT_ROOT_LS = [p.name for p in PROJECT_ROOT.iterdir()]
assert "featurologists" in PROJECT_ROOT_LS, f"Not a project root? {PROJECT_ROOT}, pwd: {Path().resolve()}"

In [4]:
import os

K8S_FEAST_NS = os.environ.get('K8S_FEAST_NS', 'feast-dev')
print(f'K8S_FEAST_NS={K8S_FEAST_NS}')

K8S_FEAST_NS=feast-dev


In [8]:
from featurologists.utils import kubectl_port_forward

In [9]:
port_forward_processes = []

In [10]:
feast_core_process, feast_core_port = kubectl_port_forward(K8S_FEAST_NS, 'service/feast-release-feast-core', 6565)

port_forward_processes.append(feast_core_process)

pid: 341319
Port-forward process for 'service/feast-release-feast-core' seems to be working: check 'localhost:34971'


In [11]:
import os

from feast import Client, Feature, Entity, ValueType, FeatureTable
from feast.data_source import FileSource, KafkaSource
from feast.data_format import ParquetFormat, AvroFormat

In [12]:
client = Client(core_url=f"localhost:{feast_core_port}") #, serving_url='localhost:8092')
client.list_projects()

  and should_run_async(code)


['default']

In [26]:
DATA_DIR = PROJECT_ROOT/'data/output'
DATA_FEAST_PATH = PROJECT_ROOT / 'data-feast'

In [16]:
! head -n2 {PROJECT_ROOT}/data/output/cancellation_prediction/offline_preprocessed.csv
# Quantity,UnitPrice,Country,InvoiceDate_year,InvoiceDate_month,InvoiceDate_day,InvoiceDate_hour,InvoiceDate_minute,InvoiceDate_second,IsCancelled

Quantity,UnitPrice,Country,InvoiceDate_year,InvoiceDate_month,InvoiceDate_day,InvoiceDate_hour,InvoiceDate_minute,InvoiceDate_second,IsCancelled
6,2.55,34,2010,12,1,8,26,0,0


In [49]:
source_uri = f'file://{DATA_FEAST_PATH}/offline_preprocessed_csv'

In [51]:
invoice_id = Entity(name="invoice_id", description="Invoice ID", value_type=ValueType.INT64)

customer_clusters = FeatureTable(
    name = "invoices",
    entities = ["invoice_id"],
    features = [
        Feature("Quantity", ValueType.INT64),
        Feature("UnitPrice", ValueType.FLOAT),
        Feature("Country", ValueType.INT64),
        Feature("InvoiceDate_year", ValueType.INT64),
        Feature("InvoiceDate_month", ValueType.INT64),
        Feature("InvoiceDate_day", ValueType.INT64),
        Feature("InvoiceDate_hour", ValueType.INT64),
        Feature("InvoiceDate_minute", ValueType.INT64),
        Feature("InvoiceDate_second", ValueType.INT64),
        Feature("IsCancelled", ValueType.INT64),        
    ],
    batch_source=FileSource(
        event_timestamp_column="InvoiceDate",
        created_timestamp_column="created",
        file_format=ParquetFormat(),
        #path= str(DATA_FEAST_PATH / 'offline_preprocessed_csv'),
        file_url=source_uri,
        date_partition_column="date"
    )
)

  'Argument "file_url" is being deprecated. Please use the "path" argument.'


In [52]:
client.apply(invoice_id)
client.apply(customer_clusters)

In [53]:
print(client.get_feature_table("invoices").to_yaml())


spec:
  name: invoices
  entities:
  - invoice_id
  features:
  - name: Quantity
    valueType: INT64
  - name: InvoiceDate_day
    valueType: INT64
  - name: UnitPrice
    valueType: FLOAT
  - name: Country
    valueType: INT64
  - name: InvoiceDate_month
    valueType: INT64
  - name: InvoiceDate_year
    valueType: INT64
  - name: InvoiceDate_minute
    valueType: INT64
  - name: IsCancelled
    valueType: INT64
  - name: InvoiceDate_hour
    valueType: INT64
  - name: InvoiceDate_second
    valueType: INT64
  batchSource:
    type: BATCH_FILE
    eventTimestampColumn: InvoiceDate
    datePartitionColumn: date
    createdTimestampColumn: created
    fileOptions:
      fileFormat:
        parquetFormat: {}
      fileUrl: file:///plain/github/opensource/Featurologists/data-feast/offline_preprocessed_csv
meta:
  createdTimestamp: '2021-06-12T15:48:06Z'



In [54]:
import pandas as pd
from featurologists.cancellation_prediction import (
    preprocess_persistent,
    preprocess_replace_invoice_date,
    load_country_encoder,
)

online_df = pd.read_csv(DATA_DIR / 'online_raw.csv')
country_encoder = load_country_encoder(DATA_DIR / 'cancellation_prediction' / 'country_encoder.npy')
online_df = preprocess_persistent(online_df, country_encoder, add_target=False)
#online_df = preprocess_replace_invoice_date(online_df)
online_df

Unnamed: 0,InvoiceDate,Quantity,UnitPrice,Country
0,2011-10-02 10:32:00,48,4.95,34
1,2011-10-02 10:32:00,20,1.25,34
2,2011-10-02 10:43:00,4,0.85,34
3,2011-10-02 10:43:00,15,7.08,34
4,2011-10-02 10:43:00,4,4.95,34
...,...,...,...,...
170973,2011-12-09 12:50:00,12,0.85,13
170974,2011-12-09 12:50:00,6,2.10,13
170975,2011-12-09 12:50:00,4,4.15,13
170976,2011-12-09 12:50:00,4,4.15,13


In [55]:
online_df_part = online_df.iloc[:10]

  and should_run_async(code)


In [56]:
client.ingest(customer_clusters, online_df_part)

Removing temporary file(s)...
Data has been successfully ingested into FeatureTable batch source.


In [59]:
help(client)

Help on Client in module feast.client object:

class Client(builtins.object)
 |  Client(options: Union[Dict[str, str], NoneType] = None, **kwargs)
 |  
 |  Feast Client: Used for creating, managing, and retrieving features.
 |  
 |  Methods defined here:
 |  
 |  __init__(self, options: Union[Dict[str, str], NoneType] = None, **kwargs)
 |      The Feast Client should be initialized with at least one service url
 |      Please see constants.py for configuration options. Commonly used options
 |      or arguments include:
 |          core_url: Feast Core URL. Used to manage features
 |          serving_url: Feast Serving URL. Used to retrieve features
 |          project: Sets the active project. This field is optional.
 |          core_secure: Use client-side SSL/TLS for Core gRPC API
 |          serving_secure: Use client-side SSL/TLS for Serving gRPC API
 |          enable_auth: Enable authentication and authorization
 |          auth_provider: Authentication provider – "google" or "o

In [67]:
job = client.get_historical_features(
    feature_refs=[
        "invoices:InvoiceDate"
    ], 
)

AttributeError: 'Client' object has no attribute 'get_historical_features'

In [68]:
for p in port_forward_processes:
    p.kill()

  and should_run_async(code)
