In [1]:
# Configure input parameters

dbutils.widgets.text("STORAGE_ACCOUNT", "")
dbutils.widgets.text("CONTAINER", "")
dbutils.widgets.text("ML_PATH", "")
dbutils.widgets.text("ACCOUNT_KEY", "")

In [2]:
# Set up connection to Azure Blob Storage
STORAGE_ACCOUNT = dbutils.widgets.get("STORAGE_ACCOUNT").strip()
CONTAINER = dbutils.widgets.get("CONTAINER").strip()
#wasbs://models@azureailabs.blob.core.windows.net/churn_classifier
ML_PATH = dbutils.widgets.get("ML_PATH").strip() 
ACCOUNT_KEY = dbutils.widgets.get("ACCOUNT_KEY").strip()

if ACCOUNT_KEY != "":
  # Set up account access key
  conf_key = "fs.azure.account.key.{storage_acct}.blob.core.windows.net".format(storage_acct=STORAGE_ACCOUNT)
  spark.conf.set(conf_key, ACCOUNT_KEY)

source_str = "wasbs://{container}@{storage_acct}.blob.core.windows.net/".format(container=CONTAINER, storage_acct=STORAGE_ACCOUNT)
result_str = "wasbs://{container}@{storage_acct}.blob.core.windows.net/{dirname}".format(container=CONTAINER, storage_acct=STORAGE_ACCOUNT,dirname="results")

In [3]:
from pyspark.sql.types import *

# Load data to score
schema = StructType([
  StructField("age", DoubleType()),
  StructField("annualincome", DoubleType()),
  StructField("calldroprate", DoubleType()),
  StructField("callfailurerate", DoubleType()),
  StructField("callingnum", StringType()),
  StructField("customerid", StringType()),
  StructField("customersuspended",  StringType()),
  StructField("education",  StringType()),
  StructField("gender", StringType()),
  StructField("homeowner", StringType()),
  StructField("maritalstatus", StringType()),
  StructField("monthlybilledamount", DoubleType()),
  StructField("noadditionallines", StringType()),
  StructField("numberofcomplaints", DoubleType()),
  StructField("numberofmonthunpaid", DoubleType()),
  StructField("numdayscontractequipmentplanexpiring", DoubleType()),
  StructField("occupation", StringType()),
  StructField("penaltytoswitch", DoubleType()),
  StructField("state", StringType()),
  StructField("totalminsusedinlastmonth", DoubleType()),
  StructField("unpaidbalance", DoubleType()),
  StructField("usesinternetservice", StringType()),
  StructField("usesvoiceservice", StringType()),
  StructField("percentagecalloutsidenetwork", DoubleType()),
  StructField("totalcallduration", DoubleType()),
  StructField("avgcallduration", DoubleType()),
  StructField("churn", DoubleType()),
  StructField("year", DoubleType()),
  StructField("month", DoubleType())
])

df = (spark.read
     .option("header", True)
     .schema(schema)
     .csv(source_str))


In [4]:
from pyspark.ml import PipelineModel
from pyspark.sql.functions import col

# Load churn classification model
pipelineModel = PipelineModel.load(ML_PATH)

# Score input data
predictions = pipelineModel.transform(df)

scoredDF = predictions.select(col("customerid").alias("customerid"),col("callingnum").alias("callingnum"), col("prediction").alias("churn_prediction"))


In [5]:
#Save results to Parquet
print("Saving results to: ", result_str)
scoredDF.write.mode("overwrite").csv(result_str)

In [6]:
import json

# Return status
dbutils.notebook.exit(json.dumps({
    "status": "OK",
    "output_path": result_str}))