In [0]:
#%sh rm -rf /local_disk0/.ephemeral_nfs/cluster_libraries/python/lib/python3.11/site-packages/churn_predictor*

In [0]:
#%sh ls /local_disk0/.ephemeral_nfs/cluster_libraries/python/lib/python3.11/site-packages/

In [0]:
#%pip install /Volumes/mlops_dev/mtrofimo/churn_predictor/churn_predictor-0.0.4-py3-none-any.whl

In [0]:
#dbutils.library.restartPython()

In [0]:
#%sh cat /local_disk0/.ephemeral_nfs/cluster_libraries/python/lib/python3.11/site-packages/churn_predictor/data_processor.py

In [0]:
# COMMAND ----------

# MAGIC %md
# MAGIC ## Feature Importance

# COMMAND ----------
#%pip install /Volumes/mlops_dev/mtrofimo/churn_predictor/churn_predictor-0.0.4-py3-none-any.whl

import pandas as pd
from pyspark.sql.functions import col
from pyspark.sql.functions import current_timestamp, to_utc_timestamp
import numpy as np
from pyspark.sql import SparkSession
import pandas as pd
import numpy as np
import time
import os
import datetime
import requests
import itertools

from churn_predictor.config import ProjectConfig

spark = SparkSession.builder.getOrCreate()

# Load configuration
# Determine the environment and set the config path accordingly
if "DATABRICKS_RUNTIME_VERSION" in os.environ:
    config_path = "../project_config.yml"
else:
    config_path = os.path.abspath("project_config.yml")
config = ProjectConfig.from_yaml(config_path=config_path, env="dev")
spark = SparkSession.builder.getOrCreate()

train_set = spark.table(f"{config.catalog_name}.{config.schema_name}.train_set").toPandas()
test_set = spark.table(f"{config.catalog_name}.{config.schema_name}.test_set").toPandas()

print(train_set.columns)

# COMMAND ----------

# MAGIC %md
# MAGIC ## Generate Synthetic Data

# COMMAND ----------
from churn_predictor.data_processor import generate_synthetic_data_with_drift

inference_data_skewed = generate_synthetic_data_with_drift(train_set, True, 200)

inference_data_skewed.display()

# COMMAND ----------

# MAGIC %md
# MAGIC ### Create Tables and Update house_features_online

# COMMAND ----------

inference_data_skewed_spark = spark.createDataFrame(inference_data_skewed).withColumn(
    "update_timestamp_utc", to_utc_timestamp(current_timestamp(), "UTC")
)

inference_data_skewed_spark.write.mode("overwrite").saveAsTable(
    f"{config.catalog_name}.{config.schema_name}.inference_data_skewed"
)

test_set = spark.table(f"{config.catalog_name}.{config.schema_name}.test_set") \
                        .withColumn("CustomerId", col("CustomerId").cast("string")) \
                        .toPandas()


inference_data_skewed = spark.table(f"{config.catalog_name}.{config.schema_name}.inference_data_skewed") \
                        .withColumn("CustomerId", col("CustomerId").cast("string")) \
                        .toPandas()

print(test_set.head())
print(inference_data_skewed.head())

# COMMAND ----------

token = dbutils.notebook.entry_point.getDbutils().notebook().getContext().apiToken().get()
host = spark.conf.get("spark.databricks.workspaceUrl")

# COMMAND ----------

# Required columns for inference
required_columns = ["Geography", "Gender", "NumOfProducts", "CreditScore", "Age", "Balance", "IsActiveMember"]

# Sample records from inference datasets
sampled_skewed_records = inference_data_skewed[required_columns].to_dict(orient="records")
test_set_records = test_set[required_columns].to_dict(orient="records")

def send_request_https(dataframe_record):
    model_serving_endpoint = f"https://{host}/serving-endpoints/churn_predictor-model-serving/invocations"
    response = requests.post(
        model_serving_endpoint,
        headers={"Authorization": f"Bearer {token}"},
        json={"dataframe_records": [dataframe_record]},
    )
    return response

# COMMAND ----------

# Loop over test records and send requests for 10 minutes
'''
end_time = datetime.datetime.now() + datetime.timedelta(minutes=20)
for index, record in enumerate(itertools.cycle(test_set_records)):
    if datetime.datetime.now() >= end_time:
        break
    print(f"Sending request for test data, index {index}")
    response = send_request_https(record)
    print(f"Response status: {response.status_code}")
    print(f"Response text: {response.text}")
    time.sleep(0.2)
'''


# COMMAND ----------

# Loop over skewed records and send requests for 10 minutes
'''
end_time = datetime.datetime.now() + datetime.timedelta(minutes=30)
for index, record in enumerate(itertools.cycle(sampled_skewed_records)):
    if datetime.datetime.now() >= end_time:
        break
    print(f"Sending request for skewed data, index {index}")
    response = send_request_https(record)
    print(f"Response status: {response.status_code}")
    print(f"Response text: {response.text}")
    time.sleep(0.2)
'''


In [0]:
# COMMAND ----------

# MAGIC %md
# MAGIC ## Refresh Monitoring

# COMMAND ----------

from pyspark.sql.functions import col
from databricks.connect import DatabricksSession
from pyspark.sql import functions as F
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, DoubleType, ArrayType
from databricks.sdk import WorkspaceClient
from churn_predictor.monitoring import create_or_refresh_monitoring

workspace = WorkspaceClient()

create_or_refresh_monitoring(config=config, spark=spark, workspace=workspace)