In [1]:
import findspark
findspark.init()

import pyspark
from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .appName("RPA_Test") \
    .getOrCreate()

In [2]:
spark

In [3]:
# Read the data from sql database
"""database = "RobotWorkForce"
table = "(SELECT distinct ML.[ID],ML.[workNumber],ML.[templateID],ML.[robotID],ML.[receivedTime],ML.[startTime],ML.[endTime],isnull(MT.[resolutionTime]* 60,0) as ResolutionTimeObjective,ML.[transactionTime],ML.[status],BP.[data] FROM [RobotWorkForce].[dbo].[MasterLog] ML JOIN [BluePrism].[dbo].[BPAWorkQueueItem] BP ON BP.[keyvalue] = ML.workNumber JOIN [MasterType] MT ON ML.[templateID] = MT.[id] WHERE templateID in (2311)) as MasterLogData"
user = "xxxx"
password = "xxxx"

DF = spark.read.format("jdbc") \
    .option("url", f"jdbc:sqlserver://172.27.116.14:1433;databaseName={database};") \
    .option("dbtable",table) \
    .option("user",user) \
    .option("password", password) \
    .option("driver","com.microsoft.sqlserver.jdbc.SQLServerDriver") \
    .load()
"""
# Read the date from parquet file
DF = spark.read.parquet("../data/logs/MasterLogData")

In [4]:
DF.createOrReplaceTempView("MasterLogData")
DF.printSchema()

root
 |-- ID: integer (nullable = true)
 |-- workNumber: string (nullable = true)
 |-- templateID: integer (nullable = true)
 |-- robotID: integer (nullable = true)
 |-- receivedTime: timestamp (nullable = true)
 |-- startTime: timestamp (nullable = true)
 |-- endTime: timestamp (nullable = true)
 |-- ResolutionTimeObjective: double (nullable = true)
 |-- transactionTime: integer (nullable = true)
 |-- status: string (nullable = true)
 |-- data: string (nullable = true)



In [5]:
#Get Data
from pyspark.sql.functions import *
df = spark.sql("SELECT id, workNumber, case when status = '' THEN 'unknown' ELSE status END as status, replace(replace(replace(data,'<',' '),'>', ' '),'/',' ') as data FROM MasterLogData WHERE templateID = 2311")

In [6]:
#split set training and testing
seed = 0  # set seed for reproducibility

train, test = df.randomSplit([0.7,0.3],seed)

In [7]:
from pyspark.ml.feature import Tokenizer, CountVectorizer, IDF, StringIndexer,HashingTF

#String Indexer
stringIndexer = StringIndexer(inputCol="status",outputCol="label").setHandleInvalid("skip")

#Tokenizer
tokenizer = Tokenizer(inputCol="data",outputCol="Token")

#Vectorizer
vectorizer = CountVectorizer(inputCol="Token", outputCol="rawFeatures")

#TD-IDF
tf = HashingTF() \
    .setInputCol("Token") \
    .setOutputCol("TFout") \
    .setNumFeatures(10000)

idf = IDF() \
    .setInputCol("TFout") \
    .setOutputCol("IDFOut")

#Vector Assembler
from pyspark.ml.feature import VectorAssembler
FEATURES_COL = ['rawFeatures','IDFOut']
vecAssembler = VectorAssembler(inputCols=FEATURES_COL, outputCol="features")

In [8]:
#Logistic Regression Classifier
from pyspark.ml.classification import LogisticRegression
lr = LogisticRegression().setLabelCol("label").setFeaturesCol("features")

In [9]:
#Pipeline Creation
from pyspark.ml import Pipeline
stages = [stringIndexer,tokenizer,vectorizer,tf,idf,vecAssembler,lr]
pipeline = Pipeline().setStages(stages)

In [10]:
#Param Tuning
from pyspark.ml.tuning import ParamGridBuilder
params = ParamGridBuilder() \
    .addGrid(lr.elasticNetParam, [0.0, 0.5, 1.0]) \
    .addGrid(lr.regParam, [0.1, 2]) \
    .build()

In [11]:
#Evaluation Model
from pyspark.ml.evaluation import BinaryClassificationEvaluator
evaluator = BinaryClassificationEvaluator() \
    .setMetricName("areaUnderROC") \
    .setRawPredictionCol("prediction") \
    .setLabelCol("label")

In [12]:
#Training validation split
from pyspark.ml.tuning import TrainValidationSplit
tvs = TrainValidationSplit() \
    .setTrainRatio(0.75) \
    .setEstimatorParamMaps(params) \
    .setEstimator(pipeline) \
    .setEvaluator(evaluator)

In [13]:
#Fitting the models using Parameters tuning, evaluation model and training validation split
tvsFitted = tvs.fit(train)

In [14]:
#tranforming the models
transformed = tvsFitted.transform(test)

In [15]:
#Brut result
evaluator.evaluate(transformed)

0.7698009315239848

In [16]:
#Create Result table and output table
transformed.createOrReplaceTempView("Result")
finalDF = spark.sql("SELECT status,label, prediction, count(*) FROM Result GROUP BY status, label,prediction ORDER BY status,label, prediction")
finalDF.show()

+-----------------+-----+----------+--------+
|           status|label|prediction|count(1)|
+-----------------+-----+----------+--------+
|         Complete|  0.0|       0.0|  208841|
|         Complete|  0.0|       1.0|    1810|
|         Complete|  0.0|       2.0|     890|
|Complete-Reassign|  1.0|       0.0|    7222|
|Complete-Reassign|  1.0|       1.0|    5089|
|Complete-Reassign|  1.0|       2.0|      92|
|             Fail|  3.0|       0.0|    1383|
|             Fail|  3.0|       1.0|      70|
|             Fail|  3.0|       2.0|     155|
|          unknown|  2.0|       0.0|      64|
|          unknown|  2.0|       1.0|       2|
|          unknown|  2.0|       2.0|    5233|
+-----------------+-----+----------+--------+



In [17]:
spark.stop()