In [1]:
# initialize sql context
from pyspark.sql import SQLContext

sqlContext = SQLContext(sc)

In [2]:
#Todo: using SQLContext to read csv and assign to dataframe
df = sqlContext.read.csv('abfss://synapsedatalake@synapseaiadadls.dfs.core.windows.net/pocdata.csv', header=True, inferSchema= True)	

In [3]:
#Todo:printSchema
df =df.drop('CUSTID', 'Latest_TxnDate', 'State', 'LGA', 'Account_df.printSchema()Open_Date')
df.printSchema()

In [None]:
spark.sql("CREATE DATABASE IF NOT EXISTS demo")
df.write.mode("overwrite").saveAsTable("demo.Churnmldata")

In [4]:
df

In [5]:
# Import all from `sql.types`
from pyspark.sql.types import *

# Write a custom function to convert the data type of DataFrame columns
def convertColumn(df, names, newType):
    for name in names: 
        df = df.withColumn(name, df[name].cast(newType))
    return df 
# List of continuous features
CONTI_FEATURES  = ['No_of_Accounts', 
'GDP_in_Billions_of_USSD',
'Inflation',
'Population',
'txn_amount_M1',
 'txn_vol_M1',
 'txn_amount_M2',
 'txn_vol_M2',
 'txn_amount_M3',
 'txn_vol_M3',
 'F1']
# Convert the type
df = convertColumn(df, CONTI_FEATURES, FloatType())
# Check the dataset
df.printSchema()

In [6]:
df.show(5, truncate = False)

In [7]:
#import libraries for pipeline
from pyspark.ml import Pipeline
from pyspark.ml.feature import OneHotEncoder
from pyspark.ml.feature import StringIndexer, OneHotEncoder, VectorAssembler

In [8]:
# 1. Encode the categorical data
CATE_FEATURES = ['Gender',
'Age_Band',
'Tenure',
'Education', 
'Marital_Status', 
'Segment', 
'Occupation', 
'F2', 
'Recency', 
'Freq_M1',
'Freq_M2',
'F3']
# stages in our Pipeline
stages = [] 


for categoricalCol in CATE_FEATURES:
    stringIndexer = StringIndexer(inputCol=categoricalCol, outputCol=categoricalCol + "Index")
    
    encoder = OneHotEncoder(inputCols=[stringIndexer.getOutputCol()],
                                     outputCols=[categoricalCol + "classVec"])
    
    stages += [stringIndexer, encoder]

In [9]:
# 2. Index the label feature
# Convert label into label indices using the StringIndexer
CHURN_stringIdx =  StringIndexer(inputCol="CHURN", outputCol="newCHURN")
stages += [CHURN_stringIdx]

In [10]:
# 3. Add continuous variable
assemblerInputs = [c + "classVec" for c in CATE_FEATURES] + CONTI_FEATURES

In [11]:
# 4. Assemble the steps
assembler = VectorAssembler(inputCols=assemblerInputs, outputCol="features")
stages += [assembler]

In [12]:
stages

In [13]:
# Create a Pipeline.
pipeline = Pipeline(stages=stages)
pipelineModel = pipeline.fit(df)
model = pipelineModel.transform(df)

In [14]:
model

In [15]:
# To make the computation faster, you convert model to a DataFrame.
#You need to select newlabel and features from model using map.

from pyspark.ml.linalg import DenseVector
input_data = model.rdd.map(lambda x: (x["newCHURN"], DenseVector(x["features"])))

In [16]:
# import 
# from pyspark.ml.linalg import DenseVector
df_train = sqlContext.createDataFrame(input_data, ["newCHURN", "features"])

In [17]:
df

In [18]:
df_train.show(2)

In [19]:
# Split the data into train and test sets
train_data, test_data = df_train.randomSplit([.8,.2],seed=1234)

In [20]:
# Let's count how many people with income below/above 50k in both training and test set
train_data.groupby('newCHURN').agg({'newCHURN': 'count'}).show()	

In [21]:
test_data.groupby('newCHURN').agg({'newCHURN': 'count'}).show()

In [22]:
#You initialize lr by indicating the label column and feature columns. 
# Import `LogisticRegression`
from pyspark.ml.classification import LogisticRegression

# Initialize `lr`
lr = LogisticRegression(labelCol="newCHURN",
                        featuresCol="features",
                        maxIter=10,
                        regParam=0.3)

# Fit the data to the model
clr = linearModel = lr.fit(train_data)

In [23]:
# Print the coefficients and intercept for logistic regression
print("Coefficients: " + str(linearModel.coefficients))
print("Intercept: " + str(linearModel.intercept))

In [24]:
#To generate prediction for your test set, you can use linearModel with transform() on test_data
# Make predictions on test data using the transform() method.
predictions = linearModel.transform(test_data)

In [25]:
predictions.printSchema()

In [26]:
selected = predictions.select("newCHURN", "prediction", "probability")
selected.show(20)

In [27]:
#We need to look at the accuracy metric to see how well (or bad) the model performs.

def accuracy_m(model): 
    predictions = model.transform(test_data)
    cm = predictions.select("newCHURN", "prediction")
    acc = cm.filter(cm.newCHURN == cm.prediction).count() / cm.count()
    print("Model accuracy: %.3f%%" % (acc * 100)) 

accuracy_m(model = linearModel)

In [28]:
### Use ROC 
from pyspark.ml.evaluation import BinaryClassificationEvaluator

# Evaluate model
evaluator = BinaryClassificationEvaluator(rawPredictionCol="rawPrediction", labelCol="newCHURN")
print(evaluator.evaluate(predictions))
print(evaluator.getMetricName())

In [29]:
#you can tune the hyperparameters. 
#Similar to scikit learn you create a parameter grid, and you add the parameters you want to tune. 
#To reduce the time of the computation, you only tune the regularization parameter with only two values.
#use 
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator

# Create ParamGrid for Cross Validation
paramGrid = (ParamGridBuilder()
             .addGrid(lr.regParam, [0.01, 0.5])
             .build())

In [30]:
from time import *
start_time = time()

# Create 5-fold CrossValidator
cv = CrossValidator(estimator=lr,
                    estimatorParamMaps=paramGrid,
                    evaluator=evaluator, numFolds=5)

# Run cross validations
cvModel = cv.fit(train_data)
# likely take a fair amount of time
end_time = time()
elapsed_time = end_time - start_time
print("Time to train model: %.3f seconds" % elapsed_time)

In [31]:
#accuracy of cv selected model
accuracy_m(model = cvModel)

In [32]:
#We can exctract the recommended parameter by chaining cvModel.bestModel with extractParamMap()

bestModel = cvModel.bestModel
bestModel.extractParamMap()

In [33]:
from onnxmltools.convert.common.data_types import StringTensorType
from onnxmltools.convert.common.data_types import FloatTensorType

In [34]:
initial_types = [
    ("CHURN", StringTensorType([1, 1])),
    ("Gender", StringTensorType([1, 1])), 
    ("Age_Band", StringTensorType([1, 1])), 
    ("Tenure", StringTensorType([1, 1])), 
    ("Occupation", StringTensorType([1, 1])), 
    ("Education", StringTensorType([1, 1])), 
    ("Segment", StringTensorType([1, 1])), 
    ("Marital_Status", StringTensorType([1, 1])), 
    ("No_of_Accounts", FloatTensorType([1, 1])), 
    ("GDP_in_Billions_of_USSD", FloatTensorType([1, 1])),
    ("Inflation", FloatTensorType([1, 1])), 
    ("Population", FloatTensorType([1, 1])), 
    ("txn_amount_M1", FloatTensorType([1, 1])), 
    ("txn_vol_M1", FloatTensorType([1, 1])), 
    ("txn_amount_M2", FloatTensorType([1, 1])), 
    ("txn_vol_M2", FloatTensorType([1, 1])), 
    ("txn_amount_M3", FloatTensorType([1, 1])), 
    ("txn_vol_M3", FloatTensorType([1, 1])), 
    ("F1", FloatTensorType([1, 1])), 
    ("F2", StringTensorType([1, 1])), 
    ("Recency", StringTensorType([1, 1])), 
    ("Freq_M1", StringTensorType([1, 1])), 
    ("Freq_M2", StringTensorType([1, 1])), 
    ("F3", StringTensorType([1, 1]))]

In [35]:
from onnxmltools import convert_sparkml
from onnxmltools.utils import save_model
model_onnx = convert_sparkml(pipelineModel, 'churn prediction model', initial_types)
model_onnx

In [36]:
with open("model.onnx", "wb") as f:
    f.write(model_onnx.SerializeToString())

In [39]:
connection_string = "DefaultEndpointsProtocol=https;AccountName=synapseaiadadls;AccountKey=6qaIFdm8bVwvLMRgpdDWRl4pOWK6qS2p0MuFYISrs6TdX4MxWxLpUXd3Lm33PidhWo/P5jU+Ws5FmLQS3t6+Ww==;EndpointSuffix=core.windows.net"
from azure.storage.blob import BlobClient
blob = BlobClient.from_connection_string(conn_str=connection_string, container_name="models",blob_name="onnx/model.onnx")

with open("./model.onnx", "rb") as data:
    blob.upload_blob(data, overwrite = True)