# Machine Learning - Reseller Classification

This Notebook demonstrates how to <span style="color: var(--vscode-foreground);">retrieve data from Azure SQL Database,</span> <span style="color: var(--vscode-foreground);">generate predictions using a newly trained transformer model, and save the results back to the database.</span>

The following aspects are involved:

1. Environment set up
2. Data retrieval from Azure SQL Database for prediction
3. Model retrieval from MLflow and prediction
4. Result saving to Azure SQL Database

In the previous Notebook "Attribute Selection", the results indicate that f\_score, units\_per\_sku, units\_per\_order, total\_taxes, and total\_shipping have less than 1% importance. The transformer model has been retrained with the dataset excluding these attributes, and this Notebook uses the new-trained model to make the inference.

## Instructions:
1. Run the notebook against a cluster with Databricks ML Runtime version 13.0.x-cpu, to best re-create the training environment.
2. Add additional data processing on your loaded table to match the model schema if necessary (see the "Define input and output" section below).
3. "Run All" the notebook.
4. Note: If the `%pip` does not work for your model (i.e. it does not have a `requirements.txt` file logged), modify to use `%conda` if possible.

In [None]:
model_name = "Resellers Detection"

## Environment Recreation
Run the notebook against a cluster with Databricks ML Runtime version 13.0.x-cpu, to best re-create the training environment.. The cell below downloads the model artifacts associated with your model in the remote registry, which include `conda.yaml` and `requirements.txt` files. In this notebook, `pip` is used to reinstall dependencies by default.

### (Optional) Conda Instructions
Models logged with an MLflow client version earlier than 1.18.0 do not have a `requirements.txt` file. If you are using a Databricks ML runtime (versions 7.4-8.x), you can replace the `pip install` command below with the following lines to recreate your environment using `%conda` instead of `%pip`.
```
conda_yml = os.path.join(local_path, "conda.yaml")
%conda env update -f $conda_yml
```

In [None]:
from mlflow.store.artifact.models_artifact_repo import ModelsArtifactRepository
import os

model_uri = f"models:/{model_name}/Staging"
local_path = ModelsArtifactRepository(model_uri).download_artifacts("") # download model from remote registry

requirements_path = os.path.join(local_path, "requirements.txt")
if not os.path.exists(requirements_path):
  dbutils.fs.put("file:" + requirements_path, "", True)

In [None]:
# Remove tensorflow-cpu==2.11.1 from requirements.txt to advoid package versions conflicting dependencies.
with open(requirements_path, "r") as f:
    lines = f.readlines()
with open(requirements_path, "w") as f:
    for line in lines:
        if line.strip("\n") != "tensorflow-cpu==2.11.1": 
            f.write(line)

In [None]:
%pip install -r $requirements_path

[43mNote: you may need to restart the kernel using dbutils.library.restartPython() to use updated packages.[0m
Collecting mlflow<3,>=2.3
  Using cached mlflow-2.6.0-py3-none-any.whl (18.3 MB)
Collecting docker<7,>=4.0.0
  Downloading docker-6.1.3-py3-none-any.whl (148 kB)
     ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ 148.1/148.1 kB 2.5 MB/s eta 0:00:00
Collecting querystring-parser<2
  Downloading querystring_parser-1.2.4-py2.py3-none-any.whl (7.9 kB)
Collecting alembic!=1.10.0,<2
  Downloading alembic-1.11.3-py3-none-any.whl (225 kB)
     ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ 225.4/225.4 kB 15.8 MB/s eta 0:00:00
Installing collected packages: querystring-parser, docker, alembic, mlflow
Successfully installed alembic-1.11.3 docker-6.1.3 mlflow-2.6.0 querystring-parser-1.2.4
[43mNote: you may need to restart the kernel using dbutils.library.restartPython() to use updated packages.[0m


## Define input and output

In [None]:
input_table_name = "customer"
output_table_name = "ml_resellers"

In [None]:
import os

# Define Azure SQL Database connection
jdbcHostname = os.getenv("SQLDB_HOST")
user = os.getenv("SQLDB_USER")
password = dbutils.secrets.get(scope="azure_key_vault", key='SQLDB-PW') # use Azure Key Vault to save this password. 
jdbcDatabase = os.getenv("SQLDB_DB")
jdbcPort = 1433
jdbcUrl = "jdbc:sqlserver://{0}:{1};database={2}".format(jdbcHostname, jdbcPort, jdbcDatabase)
connectionProperties = {
"user" : user,
"password" : password,
"driver" : "com.microsoft.sqlserver.jdbc.SQLServerDriver"
}

In [None]:
# Load table as a Spark DataFrame
table = (spark.read
  .format("jdbc")
  .option("url", jdbcUrl)
  .option("dbtable", input_table_name)
  .option("user", user)
  .option("password", password)
  .load()
)

# Take US only, the past 90 days, and non-employee data
customers = table.where('sales_channel_id = 1 AND CAST(last_transaction_date AS DATE) >= date_add(from_utc_timestamp(CURRENT_DATE(), "America/Los_Angeles"), -90) and email not like "%@quay.com" and email not like "%@quayaustralia.com" ')

## Preprocess the dataset

In [None]:
input_columns = ['sales_channel_id'
      ,'external_customer_id'
      ,'customer_type'
      ,'email'
      ,'first_name'
      ,'last_name'
      ,'last_shipping_address_address1'
      ,'last_shipping_address_address2'
      ,'last_shipping_address_city'
      ,'last_shipping_address_country'
      ,'last_shipping_address_phone'
      ,'last_shipping_address_province'
      ,'last_shipping_address_zip'
      ,'last_shipping_address_country_code'
      ,'last_shipping_address_province_code'
      ,'first_transaction_date'
      ,'first_transaction_id'
      ,'last_transaction_date'
      ,'last_transaction_id'
      ,'last_transaction_ip'
      ,'total_orders'
      ,'total_units'
      ,'total_gross'
      ,'total_discounts'
      ,'total_returns'
      ,'total_shipping'
      ,'total_taxes'
      ,'r_score'
      ,'f_score'
      ,'m_score'
      ,'rfm_score'
      ,'is_reseller'
      ,'avg_sku_count'
      ]

In [None]:
import pyspark.sql.functions as F

# Create custom features
customers = (
    customers.select(
        [customers[col] for col in input_columns]
        + [
            F.substring_index(
                F.substring_index(F.lower(customers["email"]), "@", -1), ".", 1
            ).alias("email_domain")
        ]
        + [
            F.concat(
                F.coalesce(F.lower(customers["last_shipping_address_address1"]), F.lit("")),
                F.lit(" "),
                F.coalesce(F.lower(customers["last_shipping_address_address2"]), F.lit("")),
                F.lit(" "),
                F.coalesce(F.lower(customers["last_shipping_address_city"]), F.lit("")),
                F.lit(" "),
                F.coalesce(F.lower(customers["last_shipping_address_country_code"]), F.lit("")),
                F.lit(" "),
                F.coalesce(F.lower(customers["last_shipping_address_zip"]), F.lit("")),
            ).alias("address")
        ]
    )
    # .withColumn("units_per_order", F.col("total_units") * 1.0 / F.col("total_orders"))
    .withColumn("gross_per_order", F.col("total_gross") * 1.0 / F.col("total_orders"))
    # .withColumn("units_per_sku", F.col('units_per_order') * 1.0 / F.col('avg_sku_count'))
)

In [None]:
# Fill up NULLs
customers = customers.fillna(0.0, subset=[#'units_per_order', 
                                          'gross_per_order', 
                                          #'units_per_sku'
                                          ])

# Shuffle the orders
customers = customers.orderBy(F.rand()) 

# Remove duplicates
customers = customers.dropDuplicates(subset=['sales_channel_id', 'external_customer_id']) 

In [None]:
IDENTIFIERS = ['external_customer_id', 'email']
CONTINUOUS_COLUMNS = [
  'total_orders',
  'total_units',
  'total_gross',
  'total_discounts',
  'total_returns',
#   'total_shipping',
#   'total_taxes',
  'r_score',
#   'f_score',
  'rfm_score',
  'avg_sku_count',
#   'units_per_order', 
  'gross_per_order', 
#   'units_per_sku'
]
CATEGORICAL_COLUMNS = ['email_domain', 'address']
TARGET_COLUMN = 'is_reseller'

In [None]:
import numpy as np

customers_df = customers.toPandas()

con = customers_df[CONTINUOUS_COLUMNS].astype(np.float32)
cat_email = customers_df[CATEGORICAL_COLUMNS].astype(str).values[:, 0]
cat_address = customers_df[CATEGORICAL_COLUMNS].astype(str).values[:, 1]



## Load model and run inference

In [None]:
import mlflow

# Take the production model from "Reseller Detection"
model_uri = f"models:/{model_name}/Staging"

# Load model as a Tensorflow Model.
loaded_model = mlflow.tensorflow.load_model(model_uri)

# Predictions
results = loaded_model.predict((cat_email, cat_address, con))





The model prioritizes precision over recall because misidentifying faithful customers as resellers damages the business more. The threshold is set by 98% of the precision from the validation dataset to achieve 95% precision, 90% recall, and 92% F1 score based on the testing dataset that has never been exposed to the model before.

In [None]:
# Define the threshold
threshold = 0.95796525

In [None]:
# Define the output columns
table_columns = ['sales_channel_id'
      ,'external_customer_id'
      ,'customer_type'
      ,'email'
      ,'first_name'
      ,'last_name'
      ,'last_shipping_address_address1'
      ,'last_shipping_address_address2'
      ,'last_shipping_address_city'
      ,'last_shipping_address_country'
      ,'last_shipping_address_phone'
      ,'last_shipping_address_province'
      ,'last_shipping_address_zip'
      ,'last_shipping_address_country_code'
      ,'last_shipping_address_province_code'
      ,'first_transaction_date'
      ,'first_transaction_id'
      ,'last_transaction_date'
      ,'last_transaction_id'
      ,'total_orders'
      ,'total_units'
      ,'total_gross'
      ,'total_discounts'
      ,'total_returns'
    #   ,'total_shipping'
    #   ,'total_taxes'
      ,'r_score'
    #   ,'f_score'
      ,'m_score'
      ,'rfm_score'
    #   ,'units_per_order'
      ,'gross_per_order'
      ,'avg_sku_count'
    #   ,'units_per_sku'
      ,'is_reseller'
      ]

In [None]:
# Get the final result with threshold
results_df = customers_df[results >= threshold][table_columns]

# Get the current Pacific Time
current_timestamp_pt = F.from_utc_timestamp(F.current_timestamp(), "America/Los_Angeles")

# Output the predictions of reseller with the timestamp
prediction_df = spark.createDataFrame(results_df).withColumn('RecordCreatedDate', current_timestamp_pt)

## Save inference

In [None]:
# Save the predictions into Azure SQL Database
prediction_df.write.jdbc(url=jdbcUrl, table=output_table_name, mode = "overwrite", properties=connectionProperties)