In [None]:
! pip3 install --upgrade google-cloud-aiplatform \
                         google-cloud-bigquery \
                         pyarrow --quiet

In [None]:
REGION = "europe-west1"
DATA_REGION = "EU"

In [None]:
%%bash
export PROJECT=$(gcloud config list project --format "value(core.project)")
echo "Your current GCP Project Name is: "$PROJECT


In [None]:
PROJECT="integrated-myth-392717"

In [None]:
import google.cloud.aiplatform as aiplatform
from google.cloud import bigquery

In [None]:

aiplatform.init(project=PROJECT, location=REGION)
bqclient = bigquery.Client(project=PROJECT)

In [None]:
import numpy as np
import pandas as pd
from google.cloud import aiplatform, bigquery
from google.cloud.aiplatform import Feature, Featurestore

In [None]:
def download_bq_table(bq_table_uri: str) -> pd.DataFrame:
    # Remove bq:// prefix if present
    prefix = "bq://"
    if bq_table_uri.startswith(prefix):
        bq_table_uri = bq_table_uri[len(prefix) :]

    table = bigquery.TableReference.from_string(bq_table_uri)

    # Create a BigQuery client
    bqclient = bigquery.Client(project=PROJECT)

    # Download the table rows
    rows = bqclient.list_rows(
        table,
    )
    return rows.to_dataframe()

In [None]:
BQ_SOURCE = "integrated-myth-392717.features.cust_agr"

# Download penguins BigQuery table
clients_df = download_bq_table(BQ_SOURCE)

In [None]:
print(clients_df)
clients_df = clients_df.set_index("client_id")
clients_df.index = clients_df.index.map(str)
print(clients_df)


<font size="5">Tworzenie Feature Store </font>

In [None]:
import random
import string


# Generate a uuid of a specifed length(default=8)
def generate_uuid(length: int = 8) -> str:
    return "".join(random.choices(string.ascii_lowercase + string.digits, k=length))


UUID = generate_uuid()
     

In [None]:
FEATURESTORE_ID = "clients"

clients_feature_store = aiplatform.Featurestore.create(
    featurestore_id=FEATURESTORE_ID,
    online_store_fixed_node_count=1,
    project=PROJECT,
    location=REGION,
    sync=True,
)

In [None]:
clients_feature_store = aiplatform.featurestore.Featurestore(featurestore_name=FEATURESTORE_ID)

<font size="5">Weryfikacja Feature Store </font>

In [None]:
fs = Featurestore(
    featurestore_name=FEATURESTORE_ID,
    project=PROJECT,
    location="europe-west1",
)
print(fs.gca_resource)

## Tworzenie entity

Tutaj utworzymy entity nazwaną `client_entity` za pomocą `create_entity_type` z poniższymi parametrami:


`entity_type_id (str)`: Identyfikator używany dla entity, który będzie finalnym komponentem Feature Store. Wartość musi być unikalna w ramach Feature Store.
`description`: Opis entity.

In [None]:
ENTITY_TYPE_ID = "client"

# Create client entity type
client_entity_type = clients_feature_store.create_entity_type(
    entity_type_id=ENTITY_TYPE_ID,
    description="Clients entity type",
)

### Weryfikacja utworzenia entity
Weryfikacja, czy `entity type` zostało poprawnie utworzone poprzez uruchomienie poniższego kodu

In [None]:
entity_type = clients_feature_store.get_entity_type(entity_type_id=ENTITY_TYPE_ID)
print(entity_type.gca_resource)

# Create Features

Feature / cecha jest mierzalną właściwością lub atrybutem dla typu jednostki. Dla przykładu `client entity` posiada cechę taką jak liczba posiadanych produktów kredytowych.

Podczas tworzenia cechy określany jest typ wartości - może być to DOUBLE oraz STRING. 


In [None]:


# Create age feature
max_dpd_6m = client_entity_type.create_feature(
    feature_id="max_dpd_6m",
    value_type="INT64",
    description="Max DPD",
)

# Create gender feature
max_exp_csh_6m = client_entity_type.create_feature(
    feature_id="max_exp_csh_6m",
    value_type="DOUBLE",
    description="Maximum expositions in cash products in last 6 months",
)

# Create liked_genres feature
max_exp_hipo_3m = client_entity_type.create_feature(
    feature_id="max_exp_hipo_3m",
    value_type="DOUBLE",
    description="An array of genres this user liked",
)

# Create age feature
trx_out_sum = client_entity_type.create_feature(
    feature_id="trx_out_sum",
    value_type="DOUBLE",
    description="User age",
)

# Create gender feature
users_feature_gender = client_entity_type.create_feature(
    feature_id="trx_in_sum",
    value_type="DOUBLE",
    description="User gender",
)

# Create liked_genres feature
cnt_hipo = client_entity_type.create_feature(
    feature_id="cnt_hipo",
    value_type="INT64",
    description="An array of genres this user liked",
)


# Create liked_genres feature
cnt_csh = client_entity_type.create_feature(
    feature_id="cnt_csh",
    value_type="INT64",
    description="An array of genres this user liked",
)

In [None]:
features = clients_feature_store.get_features(entity_type_id=ENTITY_TYPE_ID)
print(entity_type.gca_resource)

#### Wyszukiwanie atrybutów wg kryteriów

In [None]:
double_features = Feature.search(
    query="value_type=DOUBLE AND featurestore_id={}".format(FEATURESTORE_ID)
)

for i in double_features:
    print(i.gca_resource)

## Weryfikacja utworzonych obiektów

In [None]:
fs.list_entity_types()

In [None]:
client_entity_type.list_features()


### Ładowanie danych do Feature Store
Use the write_feature_values API to write a feature to the Feature Store with the following parameter:

`instances:`  Feature values to be written to the Feature Store that can take the form of a list of WriteFeatureValuesPayload objects, a Python dict, or a pandas Dataframe.
This streaming ingestion feature has been introduced to the Vertex AI SDK under the preview namespace. Here, you pass the pandas Dataframe you created from penguins dataset as instances parameter.

In [None]:
client_entity_type.preview.write_feature_values(instances=clients_df)

### Czytanie danych

In [None]:
client_entity_type.read(entity_ids="6GFPZ3IMQZ8IFDOZZMMMZ3L0IR8BXXHQ")

In [None]:
client_entity_type.read(entity_ids="EN87HWPSW3BNTB28VX921RMK6KUCJY3I", feature_ids="trx_in_sum")

In [None]:
client_entity_type.read(entity_ids="6GFPZ3IMQZ8IFDOZZMMMZ3L0IR8BXXHQ", feature_ids=["trx_in_sum", "cnt_csh"])

In [None]:
client_entity_type.read(entity_ids=["6GFPZ3IMQZ8IFDOZZMMMZ3L0IR8BXXHQ", "GDRJAH6LZMH3ZTP92P4TESWYP9KNE27E"])

### Usuwanie Feature Store

In [78]:
clients_feature_store.delete(force=True)

Deleting Featurestore : projects/237095192912/locations/europe-west1/featurestores/clients
Delete Featurestore  backing LRO: projects/237095192912/locations/europe-west1/operations/6613878000909811712
Featurestore deleted. . Resource name: projects/237095192912/locations/europe-west1/featurestores/clients


### Ładowanie danych z GCS

In [None]:
FEATURESTORE_ID = "accounts"

featurestore_accounts = aiplatform.Featurestore.create(
    featurestore_id=FEATURESTORE_ID,
    online_store_fixed_node_count=1,
    project=PROJECT,
    location=REGION,
    sync=True,
)

In [None]:
featurestore_accounts = aiplatform.featurestore.Featurestore(featurestore_name=FEATURESTORE_ID)


#### Tutaj utworzymy entity poprzez consolę

In [None]:
featurestore_accounts.list_entity_types()

### Tworzenie features

In [None]:
accounts_feature_configs = {
    "max_dpd_3m": {
        "value_type": "DOUBLE",
    },
    "max_exp_6m": {
        "value_type": "DOUBLE",
    },
    "trx_out_sum": {
        "value_type": "DOUBLE",
    },
    "trx_in_sum": {
        "value_type": "DOUBLE",
    }
}

In [None]:
account_entity= aiplatform.featurestore.EntityType(featurestore_id=FEATURESTORE_ID, entity_type_name='account')

In [None]:
accounts_featurea = account_entity.batch_create_features(
    feature_configs=accounts_feature_configs,
)

In [None]:
print(account_entity.gca_resource)

#### Wyszukiwanie cech

In [None]:
double_features = Feature.search(
    query="value_type=DOUBLE AND featurestore_id={}".format(FEATURESTORE_ID)
)

for i in double_features:
    print(i.name)

max_exp_csh_6m
max_exp_hipo_3m
trx_in_sum
trx_out_sum


### Ładowanie features z GCS z pliku `csv`

### Ładowanie danych do features dla `account_entity`

In [None]:
ACCOUNT_FEATURES_IDS = [feature.name for feature in account_entity.list_features()]
ACCOUNT_FEATURE_TIME = "update_time"
ACCOUNT_ENTITY_ID_FIELD = "acct_id"
ACCOUNT_GCS_SOURCE_URI = ("gs://data_feature_store/features_acct.csv")
GCS_SOURCE_TYPE = "csv"
WORKER_COUNT = 1
print(ACCOUNT_FEATURES_IDS)

['max_exp_6m', 'trx_in_sum', 'trx_out_sum', 'max_dpd_3m']


In [None]:
account_entity.ingest_from_gcs(
    feature_ids=ACCOUNT_FEATURES_IDS,
    feature_time=ACCOUNT_FEATURE_TIME,
    entity_id_field=ACCOUNT_ENTITY_ID_FIELD,
    gcs_source_uris=ACCOUNT_GCS_SOURCE_URI,
    gcs_source_type=GCS_SOURCE_TYPE,
    worker_count=WORKER_COUNT,
    sync=False,
)
     

Importing EntityType feature values: projects/237095192912/locations/europe-west1/featurestores/accounts_hbsqrz5q/entityTypes/account


<google.cloud.aiplatform.featurestore.entity_type.EntityType object at 0x7f5af0161db0> 
resource name: projects/237095192912/locations/europe-west1/featurestores/accounts_hbsqrz5q/entityTypes/account

Import EntityType feature values backing LRO: projects/237095192912/locations/europe-west1/featurestores/accounts_hbsqrz5q/entityTypes/account/operations/8293720661919006720


#### Pobieranie danych
#### Online serving

Serwowanie danych online umozliwia udostępnianie wartości dla jednostki (enity) jako wycinek danych. Taki sposób udostępniania jest przeznaczony dla usług, dla których czas odczytu danych jest krytyczny. 
Dzięki SDK Pythona łatwo jest odczytać wartości artybutów dla danej jednostki [entity]. Domyślnie zapytanie zwróci najnowszą wartość dla atrybutu [feature], czyli wartości cechy [feature] z najnowszym przypisanym timestamp.

Aby odczytać wartości cech nalezy określić identyfikator jednostki [entity id] oraz listę cech [features] do odczytania. 
Domyślnie wybrane zostaną wszystkie cechy. 
Odpowiedź wyświetli wybrany identyfikator typu jednostki oraz wybrane wartości cech jako ramkę danych Pandas.

In [None]:

fetch_data = account_entity.read(
    entity_ids=["DUN12HN5H9WLNSQ2OYM2CUBX11XIPE4G", "HAMZSIBWJTXZUZ2QC6F3GKMVVRRKSDHL", "I6IMP5JO2XF53YK2QPLYQEMDFIV6Z78A", "P1AEIIVCXIRIRDP9MJRLNCUSJS52IO1Z"],
    feature_ids=["max_dpd_3m", "max_exp_6m" ,"trx_out_sum", "trx_in_sum"]
)

print(fetch_data)

                          entity_id  max_dpd_3m  max_exp_6m  trx_out_sum  \
0  DUN12HN5H9WLNSQ2OYM2CUBX11XIPE4G       851.0     8795.01 -38959666.92   
1  HAMZSIBWJTXZUZ2QC6F3GKMVVRRKSDHL       107.0     3811.05 -51956201.29   
2  I6IMP5JO2XF53YK2QPLYQEMDFIV6Z78A       175.0     1515.91 -55816551.75   
3  P1AEIIVCXIRIRDP9MJRLNCUSJS52IO1Z       144.0     7700.99 -29396886.41   

   trx_in_sum  
0   125079.82  
1    19026.41  
2   257443.58  
3   172535.72  


#### Czytanie danych - dla jednostki per request

In [None]:
users_entity_type.read(entity_ids="DUN12HN5H9WLNSQ2OYM2CUBX11XIPE4G")

In [None]:

feature_selector = FeatureSelector(
    id_matcher=IdMatcher(ids=["max_exp_6m", "trx_out_sum", "trx_in_sum"])
)

data_client.read_feature_values(
    featurestore_online_service_pb2.ReadFeatureValuesRequest(
        entity_type=admin_client.entity_type_path(
            PROJECT_ID, REGION, FEATURESTORE_ID, "account"
        ),
        # Fetch the user features whose ID is "alice"
        entity_id="DUN12HN5H9WLNSQ2OYM2CUBX11XIPE4G",
        feature_selector=feature_selector,
    )
)

### Batch Serving

Udostępnianie batchowe słuzy do pobierania duzych ilości danych 

In [None]:
from datetime import datetime
from google.cloud import bigquery

In [None]:

# Output dataset
DESTINATION_DATA_SET = "client_data"  # @param {type:"string"}
TIMESTAMP = datetime.now().strftime("%Y%m%d%H%M%S")
DESTINATION_DATA_SET = "{prefix}_{timestamp}".format(
    prefix=DESTINATION_DATA_SET, timestamp=TIMESTAMP
)

# Output table. Make sure that the table does NOT already exist; the BatchReadFeatureValues API cannot overwrite an existing table
DESTINATION_TABLE_NAME = "client"  # @param {type:"string"}

DESTINATION_PATTERN = "bq://{project}.{dataset}.{table}"
DESTINATION_TABLE_URI = DESTINATION_PATTERN.format(
    project=PROJECT, dataset=DESTINATION_DATA_SET, table=DESTINATION_TABLE_NAME
)

In [None]:

# Create dataset
client = bigquery.Client(project=PROJECT)
dataset_id = "{}.{}".format(client.project, DESTINATION_DATA_SET)
dataset = bigquery.Dataset(dataset_id)
dataset.location = REGION
dataset = client.create_dataset(dataset)
print("Created dataset {}.{}".format(client.project, dataset.dataset_id))

In [None]:

SERVING_FEATURE_IDS = {
    # to choose all the features use 'entity_type_id: ['*']'
    "account": ["max_dpd_3m", "trx_out_sum", "trx_in_sum"],
}
     

In [None]:
fs.batch_serve_to_bq(
    bq_destination_output_uri=DESTINATION_TABLE_URI,
    serving_feature_ids=SERVING_FEATURE_IDS,
    read_instances_uri=ACCOUNT_GCS_SOURCE_URI,
)
   


## Czyszczenie

In [None]:
featurestore_accounts.delete(force=True)

Deleting Featurestore : projects/237095192912/locations/europe-west1/featurestores/clients_hbsqrz5q
Delete Featurestore  backing LRO: projects/237095192912/locations/europe-west1/operations/8295128036802560000
Featurestore deleted. . Resource name: projects/237095192912/locations/europe-west1/featurestores/clients_hbsqrz5q
