In [1]:
%fs ls /FileStore/tables/smallersample.csv

path,name,size
dbfs:/FileStore/tables/smallersample.csv,smallersample.csv,745371


In [2]:
%sql DROP TABLE IF EXISTS auth_small_sample

In [3]:
%sql
CREATE TABLE auth_small_sample (
  Time DOUBLE,
  SourceUserDomain STRING,
  DestinationUserDomain STRING,
  SourceComputer STRING,
  DestinationComputer STRING,
  AuthenticationType STRING,
  LogonType STRING,
  AuthenticationOrientation STRING,
  SuccessFailure STRING)
USING com.databricks.spark.csv
OPTIONS (path "/FileStore/tables/smallersample.csv", header "true")


In [4]:
dataset = spark.table("auth_small_sample")
cols = dataset.columns

In [5]:
display(dataset)

Time,SourceUserDomain,DestinationUserDomain,SourceComputer,DestinationComputer,AuthenticationType,LogonType,AuthenticationOrientation,SuccessFailure
964.0,C2374$@DOM1,C2374$@DOM1,C2375,C586,?,?,TGS,Success
2011.0,C1652$@DOM1,C1652$@DOM1,C1652,C612,Kerberos,Network,LogOn,Success
3081.0,C538$@DOM1,C538$@DOM1,C539,C523,Kerberos,Network,LogOn,Success
4104.0,C1008$@DOM1,C1008$@DOM1,C1008,C625,Kerberos,Network,LogOn,Success
5118.0,U292@DOM1,U292@DOM1,C1737,C1737,?,?,TGT,Success
6134.0,C287$@DOM1,C287$@DOM1,C529,C529,?,Network,LogOff,Success
7198.0,C2491$@DOM1,C2491$@DOM1,C457,C457,?,Network,LogOff,Success
8145.0,C1367$@DOM1,C1367$@DOM1,C612,C612,?,Network,LogOff,Success
9170.0,C1656$@DOM1,C1656$@DOM1,C1656,C1656,?,?,TGS,Success
10208.0,U22@DOM1,U22@DOM1,C506,C586,Kerberos,Network,LogOn,Success


In [6]:
# I made this a permanent table in order to make sharing available, in case someone wants to run everything over I have commented out the bottom line so that there is no error, since I already have the permanent table from the code below

#permanent_table_name = "auth_small_sample"

#df.write.format("parquet").saveAsTable(permanent_table_name)

In [7]:
from pyspark.ml import Pipeline
from pyspark.ml.feature import OneHotEncoderEstimator, StringIndexer, VectorAssembler
categoricalColumns = ["SourceUserDomain", "DestinationUserDomain", "SourceComputer", "DestinationComputer", "AuthenticationType", "LogonType", "AuthenticationOrientation"]
stages = [] # stages in our Pipeline
for categoricalCol in categoricalColumns:
    # Category Indexing with StringIndexer
    stringIndexer = StringIndexer(inputCol=categoricalCol, outputCol=categoricalCol + "Index")
    # Use OneHotEncoder to convert categorical variables into binary SparseVectors
    # encoder = OneHotEncoderEstimator(inputCol=categoricalCol + "Index", outputCol=categoricalCol + "classVec")
    encoder = OneHotEncoderEstimator(inputCols=[stringIndexer.getOutputCol()], outputCols=[categoricalCol + "classVec"])
    # Add stages.  These are not run here, but will run all at once later on.
    stages += [stringIndexer, encoder]

In [8]:
# Convert label into label indices using the StringIndexer
label_stringIdx = StringIndexer(inputCol="SuccessFailure", outputCol="label")
stages += [label_stringIdx]

In [9]:
# Transform all features into a vector using VectorAssembler
numericCols = ["Time"]
assemblerInputs = [c + "classVec" for c in categoricalColumns] + numericCols
assembler = VectorAssembler(inputCols=assemblerInputs, outputCol="features")
stages += [assembler]

In [10]:
# Create a Pipeline.
pipeline = Pipeline(stages=stages)
# Run the feature transformations.
#  - fit() computes feature statistics as needed.
#  - transform() actually transforms the features.
pipelineModel = pipeline.fit(dataset)
dataset = pipelineModel.transform(dataset)
# Keep relevant columns
selectedcols = ["label", "features"] + cols
dataset = dataset.select(selectedcols)
display(dataset)

label,features,Time,SourceUserDomain,DestinationUserDomain,SourceComputer,DestinationComputer,AuthenticationType,LogonType,AuthenticationOrientation,SuccessFailure
0.0,"List(0, 17893, List(119, 6398, 12721, 16490, 17874, 17879, 17889, 17892), List(1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 964.0))",964.0,C2374$@DOM1,C2374$@DOM1,C2375,C586,?,?,TGS,Success
0.0,"List(0, 17893, List(1273, 7540, 13341, 16491, 17875, 17878, 17887, 17892), List(1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 2011.0))",2011.0,C1652$@DOM1,C1652$@DOM1,C1652,C612,Kerberos,Network,LogOn,Success
0.0,"List(0, 17893, List(12, 6293, 12617, 16505, 17875, 17878, 17887, 17892), List(1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 3081.0))",3081.0,C538$@DOM1,C538$@DOM1,C539,C523,Kerberos,Network,LogOn,Success
0.0,"List(0, 17893, List(534, 6810, 12945, 16495, 17875, 17878, 17887, 17892), List(1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 4104.0))",4104.0,C1008$@DOM1,C1008$@DOM1,C1008,C625,Kerberos,Network,LogOn,Success
0.0,"List(0, 17893, List(18, 6299, 12641, 16592, 17874, 17879, 17890, 17892), List(1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 5118.0))",5118.0,U292@DOM1,U292@DOM1,C1737,C1737,?,?,TGT,Success
0.0,"List(0, 17893, List(1049, 7313, 12588, 16493, 17874, 17878, 17888, 17892), List(1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 6134.0))",6134.0,C287$@DOM1,C287$@DOM1,C529,C529,?,Network,LogOff,Success
0.0,"List(0, 17893, List(182, 6462, 12589, 16492, 17874, 17878, 17888, 17892), List(1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 7198.0))",7198.0,C2491$@DOM1,C2491$@DOM1,C457,C457,?,Network,LogOff,Success
0.0,"List(0, 17893, List(1685, 7953, 12590, 16491, 17874, 17878, 17888, 17892), List(1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 8145.0))",8145.0,C1367$@DOM1,C1367$@DOM1,C612,C612,?,Network,LogOff,Success
0.0,"List(0, 17893, List(1368, 7640, 13844, 16909, 17874, 17879, 17889, 17892), List(1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 9170.0))",9170.0,C1656$@DOM1,C1656$@DOM1,C1656,C1656,?,?,TGS,Success
0.0,"List(0, 17893, List(1, 6282, 12626, 16490, 17875, 17878, 17887, 17892), List(1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 10208.0))",10208.0,U22@DOM1,U22@DOM1,C506,C586,Kerberos,Network,LogOn,Success


In [11]:
### Randomly split data into training and test sets. 
(trainingData, testData) = dataset.randomSplit([0.7, 0.3], seed=100)
print(trainingData.count())
print(testData.count())

In [12]:
from pyspark.ml.classification import LogisticRegression

# Create initial LogisticRegression model
lr = LogisticRegression(labelCol="label", featuresCol="features", maxIter=10)

# Train model with Training Data
lrModel = lr.fit(trainingData)

In [13]:
# Make predictions on test data using the transform() method.
# LogisticRegression.transform() will only use the 'features' column.
predictions = lrModel.transform(testData)

In [14]:
predictions.printSchema()

In [15]:
# View model's predictions and probabilities of each prediction class
# You can select any columns in the above schema to view as well. For example's sake we will choose age & occupation
selected = predictions.select("label", "prediction", "probability","SuccessFailure", "LogonType", "AuthenticationType")
display(selected)

label,prediction,probability,SuccessFailure,LogonType,AuthenticationType
0.0,0.0,"List(1, 2, List(), List(0.9999762602687509, 2.3739731249055805E-5))",Success,Network,?
0.0,0.0,"List(1, 2, List(), List(0.9999779113643612, 2.208863563875193E-5))",Success,Network,?
0.0,0.0,"List(1, 2, List(), List(0.9999787759997064, 2.1224000293591264E-5))",Success,Network,?
0.0,0.0,"List(1, 2, List(), List(0.9999796328851401, 2.036711485985834E-5))",Success,Network,?
0.0,0.0,"List(1, 2, List(), List(0.9999838580403718, 1.614195962802203E-5))",Success,Network,?
0.0,0.0,"List(1, 2, List(), List(0.9999856253346732, 1.4374665326814187E-5))",Success,Network,?
0.0,0.0,"List(1, 2, List(), List(0.9999867044822138, 1.3295517786076131E-5))",Success,Network,?
0.0,0.0,"List(1, 2, List(), List(0.9999874804505906, 1.2519549409280457E-5))",Success,Network,?
0.0,0.0,"List(1, 2, List(), List(0.9999899980854012, 1.0001914598847972E-5))",Success,Network,?
0.0,0.0,"List(1, 2, List(), List(0.999990398856346, 9.601143653899313E-6))",Success,Network,?


In [16]:
from pyspark.ml.evaluation import BinaryClassificationEvaluator

# Evaluate model
evaluator = BinaryClassificationEvaluator(rawPredictionCol="rawPrediction")
evaluator.evaluate(predictions)

In [17]:
evaluator.getMetricName()

In [18]:
print(lr.explainParams())

In [19]:
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator

# Create ParamGrid for Cross Validation
paramGrid = (ParamGridBuilder()
             .addGrid(lr.regParam, [0.01, 0.5, 2.0])
             .addGrid(lr.elasticNetParam, [0.0, 0.5, 1.0])
             .addGrid(lr.maxIter, [1, 5, 10])
             .build())

In [20]:
# Create 5-fold CrossValidator
cv = CrossValidator(estimator=lr, estimatorParamMaps=paramGrid, evaluator=evaluator, numFolds=5)

# Run cross validations
cvModel = cv.fit(trainingData)
# this will likely take a fair amount of time because of the amount of models that we're creating and testing

In [21]:
# Use test set to measure the accuracy of our model on new data
predictions = cvModel.transform(testData)

In [22]:
# cvModel uses the best model found from the Cross Validation
# Evaluate best model
evaluator.evaluate(predictions)

In [23]:
print('Model Intercept: ', cvModel.bestModel.intercept)

In [24]:
weights = cvModel.bestModel.coefficients
weights = [(float(w),) for w in weights]  # convert numpy type to float, and to tuple
weightsDF = sqlContext.createDataFrame(weights, ["Feature Weight"])
display(weightsDF)

Feature Weight
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0


In [25]:
# View best model's predictions and probabilities of each prediction class
selected = predictions.select("label", "prediction", "probability","SuccessFailure", "LogonType", "AuthenticationType")
display(selected)

label,prediction,probability,SuccessFailure,LogonType,AuthenticationType
0.0,0.0,"List(1, 2, List(), List(0.9923058111822172, 0.007694188817782782))",Success,Network,?
0.0,0.0,"List(1, 2, List(), List(0.9923058111822172, 0.007694188817782782))",Success,Network,?
0.0,0.0,"List(1, 2, List(), List(0.9923058111822172, 0.007694188817782782))",Success,Network,?
0.0,0.0,"List(1, 2, List(), List(0.9923058111822172, 0.007694188817782782))",Success,Network,?
0.0,0.0,"List(1, 2, List(), List(0.9923058111822172, 0.007694188817782782))",Success,Network,?
0.0,0.0,"List(1, 2, List(), List(0.9923058111822172, 0.007694188817782782))",Success,Network,?
0.0,0.0,"List(1, 2, List(), List(0.9923058111822172, 0.007694188817782782))",Success,Network,?
0.0,0.0,"List(1, 2, List(), List(0.9923058111822172, 0.007694188817782782))",Success,Network,?
0.0,0.0,"List(1, 2, List(), List(0.9923058111822172, 0.007694188817782782))",Success,Network,?
0.0,0.0,"List(1, 2, List(), List(0.9923058111822172, 0.007694188817782782))",Success,Network,?


In [26]:
#Let's do a Decision Tree!!!
from pyspark.ml.classification import DecisionTreeClassifier

# Create initial Decision Tree Model
dt = DecisionTreeClassifier(labelCol="label", featuresCol="features", maxDepth=3)

# Train model with Training Data
dtModel = dt.fit(trainingData)

In [27]:
print("numNodes = ", dtModel.numNodes)
print("depth = ", dtModel.depth)

In [28]:
# Make predictions on test data using the Transformer.transform() method.
predictions = dtModel.transform(testData)

In [29]:
predictions.printSchema()

In [30]:
# View model's predictions and probabilities of each prediction class
selected = predictions.select("label", "prediction", "probability","SuccessFailure", "LogonType", "AuthenticationType")
display(selected)

label,prediction,probability,SuccessFailure,LogonType,AuthenticationType
0.0,0.0,"List(1, 2, List(), List(0.9967895030709101, 0.0032104969290898937))",Success,Network,?
0.0,0.0,"List(1, 2, List(), List(0.9967895030709101, 0.0032104969290898937))",Success,Network,?
0.0,0.0,"List(1, 2, List(), List(0.9967895030709101, 0.0032104969290898937))",Success,Network,?
0.0,0.0,"List(1, 2, List(), List(0.9967895030709101, 0.0032104969290898937))",Success,Network,?
0.0,0.0,"List(1, 2, List(), List(0.9967895030709101, 0.0032104969290898937))",Success,Network,?
0.0,0.0,"List(1, 2, List(), List(0.9967895030709101, 0.0032104969290898937))",Success,Network,?
0.0,0.0,"List(1, 2, List(), List(0.9967895030709101, 0.0032104969290898937))",Success,Network,?
0.0,0.0,"List(1, 2, List(), List(0.9967895030709101, 0.0032104969290898937))",Success,Network,?
0.0,0.0,"List(1, 2, List(), List(0.9967895030709101, 0.0032104969290898937))",Success,Network,?
0.0,0.0,"List(1, 2, List(), List(0.9967895030709101, 0.0032104969290898937))",Success,Network,?


In [31]:
from pyspark.ml.evaluation import BinaryClassificationEvaluator
# Evaluate model
evaluator = BinaryClassificationEvaluator()
evaluator.evaluate(predictions)

In [32]:
# Create ParamGrid for Cross Validation
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
paramGrid = (ParamGridBuilder()
             .addGrid(dt.maxDepth, [1, 2, 6, 10])
             .addGrid(dt.maxBins, [20, 40, 80])
             .build())

In [33]:
# Create 5-fold CrossValidator
cv = CrossValidator(estimator=dt, estimatorParamMaps=paramGrid, evaluator=evaluator, numFolds=5)

# Run cross validations
cvModel = cv.fit(trainingData)
# Takes ~15 minutes

In [34]:
print("numNodes = ", cvModel.bestModel.numNodes)
print("depth = ", cvModel.bestModel.depth)

In [35]:
# Use test set to measure the accuracy of our model on new data
predictions = cvModel.transform(testData)

In [36]:
# cvModel uses the best model found from the Cross Validation
# Evaluate best model
evaluator.evaluate(predictions)

In [37]:
# View Best model's predictions and probabilities of each prediction class
selected = predictions.select("label", "prediction", "probability","SuccessFailure", "LogonType", "AuthenticationType")
display(selected)

label,prediction,probability,SuccessFailure,LogonType,AuthenticationType
0.0,0.0,"List(1, 2, List(), List(0.9960942948807365, 0.0039057051192634955))",Success,Network,?
0.0,0.0,"List(1, 2, List(), List(0.9960942948807365, 0.0039057051192634955))",Success,Network,?
0.0,0.0,"List(1, 2, List(), List(0.9960942948807365, 0.0039057051192634955))",Success,Network,?
0.0,0.0,"List(1, 2, List(), List(0.9960942948807365, 0.0039057051192634955))",Success,Network,?
0.0,0.0,"List(1, 2, List(), List(0.9960942948807365, 0.0039057051192634955))",Success,Network,?
0.0,0.0,"List(1, 2, List(), List(0.9960942948807365, 0.0039057051192634955))",Success,Network,?
0.0,0.0,"List(1, 2, List(), List(0.9960942948807365, 0.0039057051192634955))",Success,Network,?
0.0,0.0,"List(1, 2, List(), List(0.9960942948807365, 0.0039057051192634955))",Success,Network,?
0.0,0.0,"List(1, 2, List(), List(0.9960942948807365, 0.0039057051192634955))",Success,Network,?
0.0,0.0,"List(1, 2, List(), List(0.9960942948807365, 0.0039057051192634955))",Success,Network,?


In [38]:
#Now let's do a random forest
from pyspark.ml.classification import RandomForestClassifier

# Create an initial RandomForest model.
rf = RandomForestClassifier(labelCol="label", featuresCol="features")

# Train model with Training Data
rfModel = rf.fit(trainingData)

In [39]:
# Make predictions on test data using the Transformer.transform() method.
predictions = rfModel.transform(testData)

In [40]:
predictions.printSchema()

In [41]:
# View model's predictions and probabilities of each prediction class
selected = predictions.select("label", "prediction", "probability","SuccessFailure", "LogonType", "AuthenticationType")
display(selected)

label,prediction,probability,SuccessFailure,LogonType,AuthenticationType
0.0,0.0,"List(1, 2, List(), List(0.9923058111822172, 0.007694188817782782))",Success,Network,?
0.0,0.0,"List(1, 2, List(), List(0.9923058111822172, 0.007694188817782782))",Success,Network,?
0.0,0.0,"List(1, 2, List(), List(0.9923058111822172, 0.007694188817782782))",Success,Network,?
0.0,0.0,"List(1, 2, List(), List(0.9923058111822172, 0.007694188817782782))",Success,Network,?
0.0,0.0,"List(1, 2, List(), List(0.9923058111822172, 0.007694188817782782))",Success,Network,?
0.0,0.0,"List(1, 2, List(), List(0.9923058111822172, 0.007694188817782782))",Success,Network,?
0.0,0.0,"List(1, 2, List(), List(0.9923058111822172, 0.007694188817782782))",Success,Network,?
0.0,0.0,"List(1, 2, List(), List(0.9923058111822172, 0.007694188817782782))",Success,Network,?
0.0,0.0,"List(1, 2, List(), List(0.9923058111822172, 0.007694188817782782))",Success,Network,?
0.0,0.0,"List(1, 2, List(), List(0.9923058111822172, 0.007694188817782782))",Success,Network,?


In [42]:
from pyspark.ml.evaluation import BinaryClassificationEvaluator

# Evaluate model
evaluator = BinaryClassificationEvaluator()
evaluator.evaluate(predictions)

In [43]:
# Create ParamGrid for Cross Validation
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator

paramGrid = (ParamGridBuilder()
             .addGrid(rf.maxDepth, [2, 4, 6])
             .addGrid(rf.maxBins, [20, 60])
             .addGrid(rf.numTrees, [5, 20])
             .build())

In [44]:
# Create 5-fold CrossValidator
cv = CrossValidator(estimator=rf, estimatorParamMaps=paramGrid, evaluator=evaluator, numFolds=5)

# Run cross validations.  This can take about 30 minutes since it is training over 20 trees!
cvModel = cv.fit(trainingData)

In [45]:
# Use test set here so we can measure the accuracy of our model on new data
predictions = cvModel.transform(testData)

In [46]:
# cvModel uses the best model found from the Cross Validation
# Evaluate best model
evaluator.evaluate(predictions)

In [47]:
# View Best model's predictions and probabilities of each prediction class
selected = predictions.select("label", "prediction", "probability","SuccessFailure", "LogonType", "AuthenticationType")
display(selected)

label,prediction,probability,SuccessFailure,LogonType,AuthenticationType
0.0,0.0,"List(1, 2, List(), List(0.9923058111822172, 0.007694188817782782))",Success,Network,?
0.0,0.0,"List(1, 2, List(), List(0.9923058111822172, 0.007694188817782782))",Success,Network,?
0.0,0.0,"List(1, 2, List(), List(0.9923058111822172, 0.007694188817782782))",Success,Network,?
0.0,0.0,"List(1, 2, List(), List(0.9923058111822172, 0.007694188817782782))",Success,Network,?
0.0,0.0,"List(1, 2, List(), List(0.9923058111822172, 0.007694188817782782))",Success,Network,?
0.0,0.0,"List(1, 2, List(), List(0.9923058111822172, 0.007694188817782782))",Success,Network,?
0.0,0.0,"List(1, 2, List(), List(0.9923058111822172, 0.007694188817782782))",Success,Network,?
0.0,0.0,"List(1, 2, List(), List(0.9923058111822172, 0.007694188817782782))",Success,Network,?
0.0,0.0,"List(1, 2, List(), List(0.9923058111822172, 0.007694188817782782))",Success,Network,?
0.0,0.0,"List(1, 2, List(), List(0.9923058111822172, 0.007694188817782782))",Success,Network,?


In [48]:
bestModel = cvModel.bestModel

In [49]:
# Generate predictions for entire dataset
finalPredictions = bestModel.transform(dataset)

In [50]:
# Evaluate best model
evaluator.evaluate(finalPredictions)

In [51]:
#this creates a temporary view if you'd like to query the data a little further. 
finalPredictions.createOrReplaceTempView("finalPredictions")