In [39]:
%%configure -f

{
    "name": "mmlspark",
    "conf": {
        "spark.jars.packages": "com.microsoft.azure:synapseml_2.12:0.9.4,com.microsoft.azure.kusto:kusto-spark_3.0_2.12:2.9.1,org.apache.hadoop:hadoop-azure:3.3.1",
        "spark.jars.repositories": "https://mmlspark.azureedge.net/maven",
        "spark.jars.excludes": "org.scala-lang:scala-reflect,org.apache.spark:spark-tags_2.12,org.scalactic:scalactic_2.12,org.scalatest:scalatest_2.12,org.json4s:json4s-ast_2.12",
        "spark.yarn.user.classpath.first": "true"
    }
}


In [None]:
from datetime import datetime, timedelta

from azure.identity import AzureCliCredential
from azure.mgmt.storage import StorageManagementClient
from azure_gamedev.ai.playfab import churn_classification as pfc
from azure_gamedev.ai.playfab.churn_classification import PFChurnStatistics as tsMetrics
from azureml.core import Workspace
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.ml.tuning import ParamGridBuilder, TrainValidationSplit
from pyspark.ml import Pipeline
from pyspark.sql import SparkSession
from synapse.ml.lightgbm import LightGBMClassifier as Lgbm


In [None]:
subscription_id = "8c40f547-9775-44d6-bb8b-19f20d050dd2"
resource_group = "AGCIAI"
storage_account = "agcidevstorage"

workspace_name = "churnaml"
workspace_region = "East US"
experiment_name = "churn_prediction_training"

kusto_database = "b236349bb6f8437aa1cd9fdf37ca8f72"


In [None]:

def _load_config():

    snapshot_date = datetime(2022, 3, 19)
    num_days = 28
    num_of_weeks = 4

    only_feature_window = num_of_weeks * 7
    churn_window = 7  # in days

    date_range = [snapshot_date - timedelta(days=i) for i in range(num_days)]
    return {
        "storage": {
            "linked_service": "PlayFabUserEventData",
            "kusto_database": kusto_database,
        },
        "label": {
            "feature_and_label_window": only_feature_window + churn_window,
            "only_feature_window": num_of_weeks * 7,
            "only_label_window1": 2 * churn_window,
            "label_generation_method": 1,
            "only_label_window2": churn_window,
        },
        "window": {
            "snapshot_date": snapshot_date.date(),
            "churn_window": churn_window,
            "date_range": date_range,
        },
        "title": {
            "titles": "E1E6,1FCF,8B9B7,13D0C,B343,9FC07,6C1,6A49D,CCFB3,6BF5,B8646,33F0,F7659,38F3,9BF8C,DC4BB,CA87,B009,5888,9C1B0,5CC39,78597,C284C,9F83A,9EC4,88E9,B689,D777E,85B34,39C72,3703,C845,7A051,3D16B,117F3,E32E,20C23,A55E1,5449,AE2C,BE5B1,24D7,726B,3751,EB20F,A9F,358A,CB6AF,DEE32,B3BA1,3D70F,C7B27,35B2,E55FF,5EA1,72E0,2993,B1CBB,16B8C,9092,FBBE5,76F,3169F,B4132,7B5B8,FE9A9,10C7,E0D42,4EED8,49935,C2AE5,299E,33E71,64A9,DDAB6,D66A,263C7,3646,8728,E991C,AE0F,2BC3,8C3,F0B70,CF6C,63FDD,7D37,10282,82E3,7EAA,674EF,C4BB0,D02F3,4A02,98A86,8D5C2,6CF2,F2ECB,224C8,6E10,B0D4D,37A9,E5ED,6B7A,7D89,FAF7,A5F3,1291,B85B,6EEA,82E21,F7679,CB681,97675,7500B,9915E",
        },
        "filter": {"with_nonactive_users": True, "with_new_users": True},
        "path": "/model/lightgbm.model",
        "time": "Timestamp",
        "training": {"train_split": 0.85, "label": "label"},
    }


config_json = _load_config()
titles_list = list(map(lambda title: [title], config_json['title']['titles'].split(",")))


In [None]:
gametitles = spark.createDataFrame(titles_list, ["TitleId"]).cache()

featurizer = pfc.PFChurnFeaturizer(config_json=config_json)
classifier = pfc.PFChurnClassifier(config_json=config_json)

churn_pipe = Pipeline(stages=[featurizer, classifier.get_classifier()])
churn_grid = ParamGridBuilder().addGrid(Lgbm.isUnbalance, [True, False]).build()
churn_eval = BinaryClassificationEvaluator(rawPredictionCol="prediction")


# Train V2

In [None]:
# import mlflow

# mlflow.spark.autolog()
# mlflow.pyspark.ml.autolog()

# with mlflow.start_run() as run:
game_titles = spark.createDataFrame(titles_list, ["TitleId"]).cache()
featurizers = pfc.PFChurnFeaturizer(config_json=config_json)
features_df = featurizers.transform(game_titles)

train, test = features_df.randomSplit([.85, .15], seed=1)

churn_model = pfc.PFChurnClassifier(config_json).fit(train) #.save(config_json["path"])
predictions = churn_model.transform(test)
run_metrics = tsMetrics().transform(predictions)


In [None]:
predictions = churn_model.transform(test)

predicictions.write.parquet("abfss://dev@{STORAGE_ACCOUNT_NAME}.dfs.core.windows.net/")

# Train V2 and Validate

In [None]:
gaming_tvs = TrainValidationSplit(
    estimator=churn_pipe, estimatorParamMaps=churn_grid, evaluator=churn_eval, parallelism=16, seed=42
)

score_model = gaming_tvs.fit(gametitles).save(config_json["path"])
predictions = score_model.transform(gametitles).cache()

print(f"Model Metrics: {score_model.validationMetrics}")
predictions.select("prediction").summary().show()

# Train V2 and Cross Validate

In [None]:
from pyspark.ml.tuning import CrossValidator

gaming_cvs = CrossValidator(
    estimator=churn_pipe, estimatorParamMaps=churn_grid, evaluator=churn_eval, parallelism=16, seed=42
)

cross_model = gaming_cvs.fit(gametitles)
cross_preds = cross_model.transform(gametitles).cache()


print(f"Model Metrics: {cross_model.validationMetrics}")
cross_preds.select("prediction").summary().show()