In [1]:
#All Imports
#For transforming from normal csv to a csv with features appropriate for training
from pyspark.ml.feature import OneHotEncoderEstimator, StringIndexer, VectorAssembler
from pyspark.ml import Pipeline
#The Models and a Performance Evaluator
from pyspark.ml.classification import GBTClassifier, DecisionTreeClassifier, LogisticRegression, RandomForestClassifier
from pyspark.ml.evaluation import BinaryClassificationEvaluator

In [2]:
#Read csv phishing and enron (non-phishing) file
df = spark.read.csv("/home/elkvm/Documents/MachineLearning/MachineLearningPhishing/code/resources/features_total.csv", header="True", inferSchema="True")
#Delete att1 column which is an ID column and useless for Machine Learning
df = df.drop('att1')
#df = df.withColumnRenamed('Phishy', 'label')
#Might need to change boolean to float
for item in df.dtypes:
    if item[1]==('boolean'):
        df = df.withColumn(item[0], df[item[0]].cast('float'))

df1, df2 = df.randomSplit([0.50, 0.50])

In [3]:
def feature_vectors(df):
    categorical_columns = [item[0] for item in df.dtypes if item[1].startswith('string')]
    numerical_columns = [item[0] for item in df.dtypes if item[1].startswith('int') | item[1].startswith('float')]#[1:]
    boolean_columns = [item[0] for item in df.dtypes if item[1].startswith('boolean')]
    return categorical_columns, numerical_columns, boolean_columns


def make_stages(df, label='label'):
    #Need the label name to remove it from the database
    from pyspark.ml.feature import OneHotEncoderEstimator, StringIndexer, VectorAssembler
    cat_cols, num_cols, bool_cols = feature_vectors(df)
    stages = []
    #Steps to create one "features" column
    #First encode every categorical (string type) column to a binary vector
    #The "stages" will be used to apply every change to the database with one command (later)
    for categoricalCol in cat_cols:
        stringIndexer = StringIndexer(inputCol = categoricalCol, outputCol = categoricalCol + 'Index')
        encoder = OneHotEncoderEstimator(inputCols=[stringIndexer.getOutputCol()], outputCols=[categoricalCol + "classVec"])
    stages += [stringIndexer, encoder]
    assemblerInputs = [c + "classVec" for c in cat_cols] + num_cols + bool_cols
    #Remove the 'label' columns from the features, because it is not a feature.
    assemblerInputs.remove(label)
    #Combine all the columns into one "features" column.
    assembler = VectorAssembler(inputCols=assemblerInputs, outputCol="features")
    #Add that to the transformations that will happen to the dataframe
    stages += [assembler]
    return stages


def stages_pipeline(df, stages, label='label', features='features'):
    #Use pipeline to apply all the stages of transformations
    from pyspark.ml import Pipeline
    cols = df.columns
    pipeline = Pipeline(stages = stages)
    pipelineModel = pipeline.fit(df)
    df = pipelineModel.transform(df)
    selectedCols = [features]+cols
    df = df.select(selectedCols)
    #Keep only the features and the label columns
    df = df.select(features, label)
    return df


def csv2features(df, label='label'):
    for item in df.dtypes:
        if item[1]==('boolean'):
            df = df.withColumn(item[0], df[item[0]].cast('float'))
    df = df.withColumnRenamed(label, 'label')
    stages = make_stages(df)
    df = stages_pipeline(df, stages)
    return df

def trainandevaluate(df, classifier, label='label'):
    df = csv2features(df, label)
    dftrain, dftest = df.randomSplit([0.90, 0.10], seed=444)
    model = classifier.fit(dftrain)
    predictions = model.transform(dftest)
    from pyspark.ml.evaluation import BinaryClassificationEvaluator
    evaluator = BinaryClassificationEvaluator()
    print("Test_SET (Area Under ROC): " + str(evaluator.evaluate(predictions, {evaluator.metricName: "areaUnderROC"})))
    cm = predictions.select('label','prediction')
    print("Accuracy: " + str(cm.filter(cm.label == cm.prediction).count() / cm.count()))
    return model

In [4]:
#Use half of the df to train one model and the other 
df1, df2 = df.randomSplit([0.50, 0.50], seed=240)

gbt = GBTClassifier(maxDepth=4)
gbt_model = trainandevaluate(df, gbt, 'Phishy')

Test_SET (Area Under ROC): 0.9963914384735498
Accuracy: 0.971677559912854


In [5]:
df2_predictions = gbt_model.transform(csv2features(df2, 'Phishy'))

from pyspark.sql.functions import monotonically_increasing_id
cm = df2_predictions.select('prediction')
cm = cm.withColumnRenamed('prediction', 'prevPrediction')
cm = cm.withColumn("id",monotonically_increasing_id())
df2 = df2.withColumn("id",monotonically_increasing_id())

cm.show(20)
df2.show(20)

+--------------+---+
|prevPrediction| id|
+--------------+---+
|           0.0|  0|
|           0.0|  1|
|           0.0|  2|
|           0.0|  3|
|           0.0|  4|
|           0.0|  5|
|           0.0|  6|
|           0.0|  7|
|           0.0|  8|
|           0.0|  9|
|           0.0| 10|
|           0.0| 11|
|           0.0| 12|
|           0.0| 13|
|           0.0| 14|
|           0.0| 15|
|           0.0| 16|
|           0.0| 17|
|           0.0| 18|
|           0.0| 19|
+--------------+---+
only showing top 20 rows

+---------+-----------+---+--------+------------------+-------------+------------+---------+-----------+-----------+----------+------+----+---+
|@ in URLs|Attachments|Css|Encoding|External Resources|Flash content|HTML content|Html Form|Html iFrame|IPs in URLs|Javascript|Phishy|URLs| id|
+---------+-----------+---+--------+------------------+-------------+------------+---------+-----------+-----------+----------+------+----+---+
|      0.0|          0|  0|    7bit|  

In [6]:
df2 = df2.join(cm, "id")

In [7]:
df2 = df2.drop("id")

In [8]:
dt = DecisionTreeClassifier(maxDepth=7)
dt_model = trainandevaluate(df2, dt, 'Phishy')

Test_SET (Area Under ROC): 0.9690993788819875
Accuracy: 0.9647577092511013


In [9]:
gbt_stacked = GBTClassifier(maxDepth=4)
gbt_stacked_model = trainandevaluate(df2, gbt, 'Phishy')

Test_SET (Area Under ROC): 0.9900621118012422
Accuracy: 0.9647577092511013
