In [0]:
import mlflow
import data_model as dm
import datetime as dt
import pyspark.sql.functions as F
import pandas as pd
from delta.tables import DeltaTable

In [0]:
categorical = [
    "brand",
    "creditScoreBucket",
    "creditFileBucket",
    "employmentStatus"
]

In [0]:
# Get today's date
today = dt.datetime.today()

# Get last day of the month by first getting the first day of the current month and subtracting 1 day
first_of_month = today.replace(day=1)
last_day_last_month = first_of_month - dt.timedelta(days=1)
last_day_last_month_formatted = last_day_last_month.strftime('%Y-%m-%d')

data_model = dm.main(spark)
inference_dm = data_model.filter(F.col("lastDayOfMonth") == last_day_last_month_formatted)
inference_pd = inference_dm.toPandas()

In [0]:
# for debugging in the event the job fails
inference_pd

In [0]:
model_name = "Collections-V2"
model_version = "3"
model = mlflow.sklearn.load_model(model_uri=f"models:/{model_name}/{model_version}")

_inference_dm = inference_pd
inference_dm = pd.get_dummies(_inference_dm, columns=categorical, drop_first=True, dummy_na=True).drop(["isSelfCureMonth", "isSelfCureNextMonth"],axis=1)

X_inference = inference_dm.drop(["creditAccountId", "lastDayOfMonth"], axis = 1)
predicted = model.predict_proba(X_inference)[:,1]

population = inference_dm
final = population.copy(True)
final["score"] = predicted

In [0]:
_inference_dm_df = spark.createDataFrame(_inference_dm)
final_df = spark.createDataFrame(final)
out = _inference_dm_df.alias("isSelfCureMonth").join(final_df.alias("final"), ["creditAccountId", "lastDayOfMonth"], "left").select("isSelfCureMonth.*", "final.score")

In [0]:
# joining scored data back to original df to get data not used for training. (creditAccountId, lastDayOfMonth, isSelfCureMonth, isSelfCureNextMonth)
_inference_dm_df = spark.createDataFrame(_inference_dm)
final_df = spark.createDataFrame(final)
out = _inference_dm_df.alias("isSelfCureMonth").join(final_df.alias("final"), ["creditAccountId", "lastDayOfMonth"], "left").select("isSelfCureMonth.*", "final.score")

# Joining on data already scored to avoid duplicate data if re-run needed in the middle of the month
# Try to append, if table not exist, create the table
try:
  already_Scored = spark.table("neo_data_science_production.test_table_not_exist")
  out = out.join(already_Scored.alias("already_scored"), ["creditAccountId", "lastDayOfMonth"], "left_anti")
  # append new data
  out.write.mode("append").format("delta").option("overwriteSchema", "True").saveAsTable(f"neo_data_science_production.test_table_not_exist")
  # if table not exist create it.
except:
  out.write.format("delta").option("overwriteSchema", "True").saveAsTable(f"neo_data_science_production.test_table_not_exist")

In [0]:
# update is selfCureMonth and isSelfCureNextMonth retroactively

data_model_dummy_score = data_model.withColumn("score", F.lit(None))
already_scored_delta = DeltaTable.forName(spark, "neo_data_science_production.test_table_not_exist")

(already_scored_delta.alias("target")
.merge(data_model_dummy_score.alias("source"),
    condition = "source.lastDayOfMonth = target.lastDayOfMonth and \
                 source.creditAccountId = target.creditAccountId and \
                 (source.isSelfCureMonth != target.isSelfCureMonth or \
                 source.isSelfCureNextMonth != target.isSelfCureNextMonth)")
    .whenMatchedUpdate(
              set={"target.isSelfCureMonth": "source.isSelfCureMonth",
                   "target.isSelfCureNextMonth": "source.isSelfCureNextMonth",
                   "target.score": "target.score"})
    .whenNotMatchedInsertAll()
    .execute()
)