In [1]:
import pandas as pd
from influxdb_client import InfluxDBClient
from pulsar_data_collection.database_connectors.influxdb import Influxdb
import os

# Set up InfluxDB connection
# token = os.getenv("DOCKER_INFLUXDB_INIT_ADMIN_TOKEN")
# org = os.getenv("DOCKER_INFLUXDB_INIT_ORG")
# bucket_name = os.getenv("DOCKER_INFLUXDB_INIT_BUCKET")
token = 'mytoken'
org = 'pulsarml'
bucket_name = 'demo'
url = 'http://influxdb:8086'

print(token, org, bucket_name)
# Create an instance of Influxdb from your package
influxdb = Influxdb().get_database_actions()

db_login = {
    "url": url,
    "token": token,
    "org": org,
    "bucket_name": bucket_name
}

# Test InfluxDB connection
db_connection = influxdb.make_connection(**db_login)
if db_connection:
    print("Connection successful")
else:
    print("Connection failed")

mytoken pulsarml demo
Connection successful


In [2]:
# Print the existing bucket names
client = InfluxDBClient(url=url, token=token, org=org)

buckets_api = client.buckets_api()
buckets = buckets_api.find_buckets()

for bucket in buckets.buckets:
    print(bucket.name)

_tasks
_monitoring
demo


In [3]:
import sys
sys.path.append('../src/')

from gen_data_and_simulate_drift import DriftIntensity, DriftSimulator, GenerateFakeData
from training_script import Classifier

In [4]:
pokemon_test_data='pokemon.csv'
SAMPLE_SIZE=1000
target = 'Legendary'
genertor_fake_data = GenerateFakeData(path_ref_data=pokemon_test_data, sample_size=SAMPLE_SIZE, target=target)
sampled_data = genertor_fake_data.get_dataclass_sampling()

Legendary


In [5]:
# if the task is classification
pok_classifier = Classifier(df_train=sampled_data.train_data,
            num_features=sampled_data.list_num_col,
            cat_features=None,
            target=target,
            pkl_file_path=f'class_{target}_model.pkl')
pok_classifier.train()
pok_classifier.serialize()

0:	learn: 0.1320925	total: 97.7ms	remaining: 97.7ms
1:	learn: 0.0470756	total: 104ms	remaining: 0us


In [6]:
import numpy
from datetime import datetime
drift_sim_info = DriftSimulator(sampled_data, nb_cols_to_drift=1, drift_intensity=DriftIntensity.MODERATE)
# to get test_data after drifting
   
df_test_drifted = drift_sim_info.get_test_data_drifted()
df_test_drifted["timestamp"] = datetime.now()
print('info:',df_test_drifted.dtypes)
print('drift data:',df_test_drifted)
# df_test_drifted[target] = df_test_drifted[target].astype(int)

prediction = pok_classifier.predict(df_test=df_test_drifted)
prediction_int = [1 if e=='True' else 0 for e in prediction]
prediction_numpy = numpy.asarray(prediction_int)

number of columns to drift is : 1
select random column to drift ...
Drifting column Sp. Atk
info: #                      int64
Total                  int64
HP                     int64
Attack                 int64
Defense                int64
Sp. Atk                int64
Sp. Def                int64
Speed                  int64
Generation             int64
Legendary               bool
timestamp     datetime64[ns]
dtype: object
drift data:        #  Total   HP  Attack  Defense  Sp. Atk  Sp. Def  Speed  Generation  \
588  292    554   56      82      159      246       97     90           2   
249  775    687  137     164       90      146       64    110           6   
874  491    689   86     138       60      158      105     95           3   
621  537    504   82     129       82      160       52     46           5   
581   46    522  101      60       76      222      105     74           1   
..   ...    ...  ...     ...      ...      ...      ...    ...         ...   
55   402   

In [7]:
from datetime import datetime

param_dict = influxdb.create_param_dict(
    client=db_connection,
    login=db_login,
    bucket_name=bucket_name,
    model_id="mod1",
    model_version="ver1",
    data_id="data1",
    timestamp_column_name="timestamp",
    timestamp=datetime.now(),
    additional_tags={}
    )

print('param_dictionary:',param_dict)

print('api_client:',client.api_client)

param_dictionary: {'client': <influxdb_client.client.influxdb_client.InfluxDBClient object at 0x4055e209d0>, 'bucket_name': 'demo', 'data_frame_measurement_name': 'mod1_ver1_input_data', 'data_frame_timestamp_column': 'timestamp', 'tags': {'model_id': 'mod1', 'model_version': 'ver1', 'data_id': 'data1'}}
api_client: <influxdb_client._sync.api_client.ApiClient object at 0x404b873f40>


In [8]:
import pulsar_data_collection.database_connectors.influxdb as influxdb
factories = {"influxdb": influxdb.Influxdb()}

import datetime
from typing import Any, Dict, List, Optional, Union

import pandas as pd
from pydantic import BaseModel, validator


class PulseParameters(BaseModel):
    """
    Model of input Pulse class

    Attributes
    ----------
    model_id: str
    Model identifier
    model_version: str
    Model version
    data_id: str
    Data identifier
    reference_data_storage: Any  # Union[str, Dict[str, str]]
    Storage path for reference data used in drift recognition, usually training dataset
    target_name: str
    Name of target feature
    storage_engine: str
    Storage engine used to store collected logs
    login: Dict[str, Union[str, int, bool]]
    Dictionary containing the element required to perform successful login to storage engine
    features_metadata: Optional[Dict[str, type]]  # key: feature_name, value : feature type
    Dictionary containing the schema of features used for the model prediction
    other_labels: Optional[Dict[str, Union[str, int, bool]]] = None
    Dictionary of additional labels used provide more metadata regarding the context surrounding the model
    timestamp_column_name: str = "timestamp"
    Name of the column containing the timestamp

    """

    model_id: str
    model_version: str
    data_id: str
    reference_data_storage: Any  # Union[str, Dict[str, str]]
    target_name: str
    storage_engine: str
    login: Dict[str, Union[str, int, bool]]
    features_metadata: Optional[Dict[str, type]]  # key: feature_name, value : feature type
    additional_tags: Optional[Dict[str, Union[str, int, bool]]] = None
    timestamp_column_name: str = "timestamp"

    @validator("storage_engine")
    def check_storage_engine(cls, value):
        if value not in factories:
            raise ValueError(f"Storage Engine for {value} not supported")
        return value


class DataWithPrediction(BaseModel):
    """
    Model of input for capture_data method of Pulse parameter class

    Attributes
    ----------

    prediction_id: Optional[str]
    Identifier of prediction
    timestamp: datetime.datetime = datetime.datetime.now()
    timestamp of when the prediction have been performed
    predictions: Any  # Union[Dict, pd.DataFrame]
    Object containing the array of predictions
    data_points: pd.DataFrame
    Object containing the data points on which the predicitions were performed
    features_names: List[str]
    list of features that were used to perform the prediction

    """

    class Config:
        arbitrary_types_allowed = True

    prediction_id: Optional[str]
    timestamp: datetime.datetime
    predictions: Any
    data_points: pd.DataFrame
    features_names: List[str]





# class Pulse:
    """
    This class expose methods in order to collect data from
    an inference container/webapp
    """

    def __init__(self, data: PulseParameters):
        """
        Initializing Pulse class instance

        Parameters
        ----------

        data : PulseParameters
        Pydantic Model providing the interface to the class constructor,
        Refer to PulseParameters model attributes for detailed list of inputs



        """
        self.model_id = data.model_id
        self.model_version = data.model_version
        self.reference_data_storage = data.reference_data_storage
        self.target_name = data.target_name
        factory = factories.get(data.storage_engine)
        self.storage_engine = factory.get_database_actions()
        self.db_connection = self.storage_engine.make_connection(**data.login)
        self.additional_tags = data.additional_tags
        self.params = self.storage_engine.create_param_dict(client=self.db_connection, **data.dict())

    def capture_data(self, data: DataWithPrediction):
        """
        Capturing data from inference code
        """

        self.params.update(
            {
                "data_points": data.data_points,
                "prediction": data.predictions,
                "timestamp": data.timestamp,
            }
        )
        self.storage_engine.write_data(**self.params)


NameError: name 'DataWithPrediction' is not defined

In [None]:
params = PulseParameters(
    model_id="id1",
    model_version="ver1",
    data_id="dat1",
    reference_data_storage=reference_data,
    target_name="target_feature_name",
    storage_engine="influxdb",
    timestamp_column_name="_time",
    login={
        "url": "url_influxdb",
        "token": "mytoken",
        "org": "pulsarml",
        "bucket_name": "demo",
    },
    other_labels={"timezone": "EST", "reference_dataset": reference_data},
)

In [None]:
# from pulsar_data_collection.database_connectors.influxdb import InfluxdbActions
# from pulsar_data_collection.config import factories
# from pulsar_data_collection.models import DataWithPrediction, PulseParameters
# from pulsar_data_collection.pulse import Pulse
from datetime import datetime
# Convert prediction_numpy to a DataFrame
prediction_df = pd.DataFrame(prediction_numpy)

# Reset the index of data_points and prediction_df
df_test_drifted = df_test_drifted.reset_index(drop=True)
prediction_df = prediction_df.reset_index(drop=True)

df_test_drifted.columns = df_test_drifted.columns.astype(str)

#  Add the timestamp field to param_dict
param_dict["timestamp"] = datetime.now()

print('param_dictionary:',param_dict)

influxdb.write_data(
    client=param_dict["client"],
    bucket_name=param_dict["bucket_name"],
    data_frame_measurement_name=param_dict["data_frame_measurement_name"],
    data_frame_timestamp_column=param_dict["data_frame_timestamp_column"],
    tags={'model_id': 'mod1', 'model_version': 'ver1', 'data_id': 'data1'},
    prediction=prediction_df,
    data_points=df_test_drifted,
    timestamp=param_dict["timestamp"]
)