In [1]:
from pyspark import SparkContext, SparkConf, StorageLevel
from pyspark.sql import SQLContext, Row, SparkSession
from __future__ import print_function

In [4]:
spark = SparkSession \
    .builder \
    .appName("KDD") \
    .getOrCreate()
sc = spark.sparkContext
sqlContext = SQLContext(sc)

input_data=sc.textFile("/home/dharamendra/Downloads/kddcup.data_10_percent_corrected", minPartitions=2).map(lambda line: line.encode("utf-8"))
print (input_data.count())

494021


In [14]:
from collections import OrderedDict
from time import time
labels = input_data.map(lambda line: line.strip().split(",")[-1])
t0 = time()
label_counts = labels.countByValue()
tt = time()-t0

sorted_labels = OrderedDict(sorted(label_counts.items(), key=lambda t: t[1], reverse=True))
for label, count in sorted_labels.items():
    print (label, count)
print ("Counted in {} seconds".format(round(tt,3)))

smurf. 280790
neptune. 107201
normal. 97278
back. 2203
satan. 1589
ipsweep. 1247
portsweep. 1040
warezclient. 1020
teardrop. 979
pod. 264
nmap. 231
guess_passwd. 53
buffer_overflow. 30
land. 21
warezmaster. 20
imap. 12
rootkit. 10
loadmodule. 9
ftp_write. 8
multihop. 7
phf. 4
perl. 3
spy. 2
Counted in 5.305 seconds


In [15]:
from pyspark.sql.types import StructField, StructType, StringType, IntegerType,DoubleType
def replacetab(line):
    data = line.split(',')
    return data


process_data = input_data.map(lambda line: replacetab(line))


fields = [StructField("duration_r", StringType(), True),
              StructField("protocol_type_r", StringType(), True), StructField("service_r", StringType(), True),
              StructField("flag_r", StringType(), True),
              StructField("src_bytes_r", StringType(), True), StructField("dst_bytes_r", StringType(), True),
              StructField("land_r", StringType(), True),
              StructField("wrong_fragment_r", StringType(), True), StructField("urgent_r", StringType(), True),
              StructField("hot_r", StringType(), True),
              StructField("num_failed_logins_r", StringType(), True),StructField("logged_in_r", StringType(), True), 
              StructField("num_compromised_r", StringType(), True),
              StructField("root_shell_r", StringType(), True),StructField("su_attempted_r", StringType(), True),
              StructField("num_root_r", StringType(), True),
              StructField("num_file_creations_r", StringType(), True),
              StructField("num_shells_r", StringType(), True),
              StructField("num_access_files_r", StringType(), True), StructField("num_outbound_cmds_r", StringType(), True),
              StructField("is_host_login_r", StringType(), True),
              StructField("is_guest_login_r", StringType(), True), StructField("count_r", StringType(), True),
              StructField("srv_count_r", StringType(), True),
              StructField("serror_rate_r", StringType(), True), StructField("srv_serror_rate_r", StringType(), True),
              StructField("rerror_rate_r", StringType(), True),
              StructField("srv_rerror_rate_r", StringType(), True), StructField("same_srv_rate_r", StringType(), True),
              StructField("diff_srv_rate_r", StringType(), True),
              StructField("srv_diff_host_rate_r", StringType(), True), StructField("dst_host_count_r", StringType(), True),
              StructField("dst_host_srv_count_r", StringType(), True),
              StructField("dst_host_same_srv_rate_r", StringType(), True), StructField("dst_host_diff_srv_rate_r", StringType(), True),
              StructField("dst_host_same_src_port_rate_r", StringType(), True),
              StructField("dst_host_srv_diff_host_rate_r", StringType(), True), StructField("dst_host_serror_rate_r", StringType(), True),
              StructField("dst_host_srv_serror_rate_r", StringType(), True),
              StructField("dst_host_rerror_rate_r", StringType(), True), StructField("dst_host_srv_rerror_rate_r", StringType(), True),
              StructField("label", StringType(), True)]
schema = StructType(fields)
input_dataframe = spark.createDataFrame(process_data, schema)
input_dataframe.printSchema()

#input_dataframe.describe('src_bytes','wrong_fragment').show()


root
 |-- duration_r: string (nullable = true)
 |-- protocol_type_r: string (nullable = true)
 |-- service_r: string (nullable = true)
 |-- flag_r: string (nullable = true)
 |-- src_bytes_r: string (nullable = true)
 |-- dst_bytes_r: string (nullable = true)
 |-- land_r: string (nullable = true)
 |-- wrong_fragment_r: string (nullable = true)
 |-- urgent_r: string (nullable = true)
 |-- hot_r: string (nullable = true)
 |-- num_failed_logins_r: string (nullable = true)
 |-- logged_in_r: string (nullable = true)
 |-- num_compromised_r: string (nullable = true)
 |-- root_shell_r: string (nullable = true)
 |-- su_attempted_r: string (nullable = true)
 |-- num_root_r: string (nullable = true)
 |-- num_file_creations_r: string (nullable = true)
 |-- num_shells_r: string (nullable = true)
 |-- num_access_files_r: string (nullable = true)
 |-- num_outbound_cmds_r: string (nullable = true)
 |-- is_host_login_r: string (nullable = true)
 |-- is_guest_login_r: string (nullable = true)
 |-- count_

In [16]:
changedTypedf = input_dataframe.withColumn("duration", input_dataframe["duration_r"].cast("integer")).\
        withColumn("src_bytes", input_dataframe["src_bytes_r"].cast("integer")).\
        withColumn("dst_bytes", input_dataframe["dst_bytes_r"].cast("integer")).\
        withColumn("land", input_dataframe["land_r"].cast("integer")). \
        withColumn("wrong_fragment", input_dataframe["wrong_fragment_r"].cast("integer")). \
        withColumn("urgent", input_dataframe["urgent_r"].cast("integer")). \
        withColumn("hot", input_dataframe["hot_r"].cast("integer")). \
        withColumn("num_failed_logins", input_dataframe["num_failed_logins_r"].cast("integer")). \
        withColumn("logged_in", input_dataframe["logged_in_r"].cast("integer")). \
        withColumn("num_compromised", input_dataframe["num_compromised_r"].cast("integer")). \
        withColumn("root_shell", input_dataframe["root_shell_r"].cast("integer")). \
        withColumn("su_attempted", input_dataframe["su_attempted_r"].cast("integer")). \
        withColumn("num_root", input_dataframe["num_root_r"].cast("integer")).\
        withColumn("num_file_creations", input_dataframe["num_file_creations_r"].cast("integer")).\
        withColumn("num_shells", input_dataframe["num_shells_r"].cast("integer")).\
        withColumn("num_access_files", input_dataframe["num_access_files_r"].cast("integer")).\
        withColumn("num_outbound_cmds", input_dataframe["num_outbound_cmds_r"].cast("integer")).\
        withColumn("is_host_login", input_dataframe["is_host_login_r"].cast("integer")).\
        withColumn("is_guest_login", input_dataframe["is_guest_login_r"].cast("integer")).\
        withColumn("count", input_dataframe["count_r"].cast("integer")).\
        withColumn("srv_count", input_dataframe["srv_count_r"].cast("integer")).\
        withColumn("serror_rate", input_dataframe["serror_rate_r"].cast("double")).\
        withColumn("srv_serror_rate", input_dataframe["srv_serror_rate_r"].cast("double")).\
        withColumn("rerror_rate", input_dataframe["rerror_rate_r"].cast("double")).\
        withColumn("srv_rerror_rate", input_dataframe["srv_rerror_rate_r"].cast("double")).\
        withColumn("same_srv_rate", input_dataframe["same_srv_rate_r"].cast("double")).\
        withColumn("diff_srv_rate", input_dataframe["diff_srv_rate_r"].cast("double")).\
        withColumn("srv_diff_host_rate", input_dataframe["srv_diff_host_rate_r"].cast("double")).\
        withColumn("dst_host_count", input_dataframe["dst_host_count_r"].cast("integer")).\
        withColumn("dst_host_srv_count", input_dataframe["dst_host_srv_count_r"].cast("integer")).\
        withColumn("dst_host_same_srv_rate", input_dataframe["dst_host_same_srv_rate_r"].cast("double")).\
        withColumn("dst_host_diff_srv_rate", input_dataframe["dst_host_diff_srv_rate_r"].cast("double")).\
        withColumn("dst_host_same_src_port_rate", input_dataframe["dst_host_same_src_port_rate_r"].cast("double")).\
        withColumn("dst_host_srv_diff_host_rate", input_dataframe["dst_host_srv_diff_host_rate_r"].cast("double")).\
        withColumn("dst_host_serror_rate", input_dataframe["dst_host_serror_rate_r"].cast("double")).\
        withColumn("dst_host_srv_serror_rate", input_dataframe["dst_host_srv_serror_rate_r"].cast("double")).\
        withColumn("dst_host_rerror_rate", input_dataframe["dst_host_rerror_rate_r"].cast("double")).\
        withColumn("dst_host_srv_rerror_rate", input_dataframe["dst_host_srv_rerror_rate_r"].cast("double"))

input_filter = changedTypedf.drop("duration_r").drop("src_bytes_r").drop("dst_bytes_r").drop("land_r").drop("wrong_fragment_r").drop("urgent_r"). \
        drop("logged_in_r").drop("num_compromised_r").drop("rerror_rate_r").drop("srv_rerror_rate_r").\
        drop("dst_host_rerror_rate_r").drop("dst_host_srv_rerror_rate_r").\
        drop("hot_r").drop("num_failed_logins_r").drop("root_shell_r").drop("su_attempted_r").drop("num_root_r").drop("num_file_creations_r"). \
        drop("num_shells_r").drop("num_access_files_r").drop("num_outbound_cmds_r").drop("is_host_login_r").drop("is_guest_login_r").\
        drop("count_r").drop("srv_count_r").drop("serror_rate_r").drop("srv_serror_rate_r").drop("same_srv_rate_r").\
        drop("diff_srv_rate_r").drop("srv_diff_host_rate_r").drop("dst_host_count_r").drop("dst_host_srv_count_r").drop("dst_host_same_srv_rate_r").\
        drop("dst_host_diff_srv_rate_r").drop("dst_host_same_src_port_rate_r").drop("dst_host_srv_diff_host_rate_r").drop("dst_host_serror_rate_r").drop("dst_host_srv_serror_rate_r")
        
input_filter.show(2)
        

+---------------+---------+------+-------+--------+---------+---------+----+--------------+------+---+-----------------+---------+---------------+----------+------------+--------+------------------+----------+----------------+-----------------+-------------+--------------+-----+---------+-----------+---------------+-----------+---------------+-------------+-------------+------------------+--------------+------------------+----------------------+----------------------+---------------------------+---------------------------+--------------------+------------------------+--------------------+------------------------+
|protocol_type_r|service_r|flag_r|  label|duration|src_bytes|dst_bytes|land|wrong_fragment|urgent|hot|num_failed_logins|logged_in|num_compromised|root_shell|su_attempted|num_root|num_file_creations|num_shells|num_access_files|num_outbound_cmds|is_host_login|is_guest_login|count|srv_count|serror_rate|srv_serror_rate|rerror_rate|srv_rerror_rate|same_srv_rate|diff_srv_rate|srv_di

In [17]:
from pyspark.ml.feature import MinMaxScaler
from pyspark.ml.feature import StringIndexer
from pyspark.ml import Pipeline



    
categorical_col=["protocol_type_r","service_r","flag_r"]
                                                                                                                                                                                
stringIndexer_cat = [StringIndexer(inputCol=column, outputCol=column + "_i").fit(input_filter) for column in categorical_col]
pipeline = Pipeline(stages=stringIndexer_cat)
cat_model = pipeline.fit(input_filter)
cat_indexed = cat_model.transform(input_filter).drop("protocol_type_r").drop("service_r").drop("flag_r")



# scaler = MinMaxScaler(inputCol="duration", outputCol="duration_n")
# scalerModel = scaler.fit(input_filter)
# scaledData = scalerModel.transform(input_filter)
# input_filter.describe('duration_n').show()



In [18]:
from pyspark.sql.functions import udf
def replacedot(label_i):
    return label_i.replace(".","")

def custom_udf(x):
    y = udf(replacedot, StringType())
    return y(x)


input_df=cat_indexed.withColumn("label_r",custom_udf(cat_indexed.label)).drop("label")
input_df.printSchema()

root
 |-- duration: integer (nullable = true)
 |-- src_bytes: integer (nullable = true)
 |-- dst_bytes: integer (nullable = true)
 |-- land: integer (nullable = true)
 |-- wrong_fragment: integer (nullable = true)
 |-- urgent: integer (nullable = true)
 |-- hot: integer (nullable = true)
 |-- num_failed_logins: integer (nullable = true)
 |-- logged_in: integer (nullable = true)
 |-- num_compromised: integer (nullable = true)
 |-- root_shell: integer (nullable = true)
 |-- su_attempted: integer (nullable = true)
 |-- num_root: integer (nullable = true)
 |-- num_file_creations: integer (nullable = true)
 |-- num_shells: integer (nullable = true)
 |-- num_access_files: integer (nullable = true)
 |-- num_outbound_cmds: integer (nullable = true)
 |-- is_host_login: integer (nullable = true)
 |-- is_guest_login: integer (nullable = true)
 |-- count: integer (nullable = true)
 |-- srv_count: integer (nullable = true)
 |-- serror_rate: double (nullable = true)
 |-- srv_serror_rate: double (nul

In [19]:
from pyspark.ml.feature import VectorAssembler
 
col_names = ["duration","src_bytes",
    "dst_bytes","land","wrong_fragment","urgent","hot","num_failed_logins",
    "logged_in","num_compromised","root_shell","su_attempted","num_root",
    "num_file_creations","num_shells","num_access_files","num_outbound_cmds",
    "is_host_login","is_guest_login","count","srv_count","serror_rate",
    "srv_serror_rate","rerror_rate","srv_rerror_rate","same_srv_rate",
    "diff_srv_rate","srv_diff_host_rate","dst_host_count","dst_host_srv_count",
    "dst_host_same_srv_rate","dst_host_diff_srv_rate","dst_host_same_src_port_rate",
    "dst_host_srv_diff_host_rate","dst_host_serror_rate","dst_host_srv_serror_rate",
    "dst_host_rerror_rate","dst_host_srv_rerror_rate","protocol_type_r_i","service_r_i","flag_r_i"]
assembler = VectorAssembler(inputCols=col_names, outputCol="features")
output = assembler.transform(input_df)
assembledData = output.select([column for column in output.columns if column not in col_names])
assembledData.show(2)





+-------+--------------------+
|label_r|            features|
+-------+--------------------+
| normal|(41,[1,2,8,19,20,...|
| normal|(41,[1,2,8,19,20,...|
+-------+--------------------+
only showing top 2 rows



In [23]:
from pyspark.ml.feature import MinMaxScaler
scaler = MinMaxScaler(inputCol="features", outputCol="scaledFeatures")
scalerModel = scaler.fit(assembledData)

scaledData = scalerModel.transform(assembledData).drop("features")
labelIndexer = StringIndexer(inputCol="label_r", outputCol="label").fit(scaledData)

train_df=labelIndexer.transform(scaledData)
#train_df.show(100)

train=train_df.selectExpr("scaledFeatures as features" ,"label")
#train.persist(StorageLevel(True, True, False, False, 1))
#train.write.parquet("final_data.parquet")


+-------+--------------------+-----+
|label_r|      scaledFeatures|label|
+-------+--------------------+-----+
| normal|[0.0,2.6104176374...|  2.0|
| normal|[0.0,3.4469050571...|  2.0|
| normal|[0.0,3.3892162695...|  2.0|
| normal|[0.0,3.1584611192...|  2.0|
| normal|[0.0,3.1296167255...|  2.0|
| normal|[0.0,3.1296167255...|  2.0|
| normal|[0.0,3.0575057410...|  2.0|
| normal|[0.0,2.2931293057...|  2.0|
| normal|[0.0,3.0286613472...|  2.0|
| normal|[0.0,3.0575057410...|  2.0|
| normal|[0.0,3.0286613472...|  2.0|
| normal|[0.0,2.5527288498...|  2.0|
| normal|[0.0,3.2017277099...|  2.0|
| normal|[0.0,3.6920824042...|  2.0|
| normal|[0.0,3.4757494509...|  2.0|
| normal|[0.0,3.7497711918...|  2.0|
| normal|[0.0,3.4757494509...|  2.0|
| normal|[0.0,3.7065046011...|  2.0|
| normal|[0.0,3.3603718757...|  2.0|
| normal|[0.0,3.3603718757...|  2.0|
| normal|[0.0,3.6920824042...|  2.0|
| normal|[0.0,3.3747940726...|  2.0|
| normal|[0.0,3.4757494509...|  2.0|
| normal|[0.0,3.4469050571...|  2.0|
|

In [18]:
train_df = spark.read.parquet("final_data.parquet")
train_df.show(2)
#train_df.persist(StorageLevel(True, True, False, False, 1))
from pyspark.ml.classification import RandomForestClassifier
df_test, df_train = train_df.randomSplit([0.3, 0.7])
rfc = RandomForestClassifier(maxDepth=8, maxBins=2400000, numTrees=128,impurity="gini")
rfc_model = rfc.fit(train_df)
rfc_model.save("rfc_model")

                                                                                                                                                                                                                                                                                                                

+--------------------+-----+
|            features|label|
+--------------------+-----+
|[0.0,2.6104176374...|  2.0|
|[0.0,3.4469050571...|  2.0|
+--------------------+-----+
only showing top 2 rows



In [27]:
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

train_df = spark.read.parquet("final_data.parquet")
df_test, df_train = train_df.randomSplit([0.3, 0.7])
rfc = RandomForestClassifier(maxDepth=8, maxBins=12000, numTrees=32,impurity="gini")
rfc_model = rfc.fit(df_train)
predictions = rfc_model.transform(df_test)
predictions.registerTempTable('Predictions')
predictions.show(2)

evaluator_acc = MulticlassClassificationEvaluator(predictionCol="prediction", labelCol="label", metricName="accuracy")
accuracy = evaluator_acc.evaluate(predictions)
print (accuracy)
evaluator_acc = MulticlassClassificationEvaluator(predictionCol="prediction", labelCol="label", metricName="weightedPrecision")
precison=evaluator_acc.evaluate(predictions)
print (precison)
evaluator_acc = MulticlassClassificationEvaluator(predictionCol="prediction", labelCol="label", metricName="weightedRecall")
recall=evaluator_acc.evaluate(predictions)
print (recall)



+--------------------+-----+--------------------+--------------------+----------+
|            features|label|       rawPrediction|         probability|prediction|
+--------------------+-----+--------------------+--------------------+----------+
|[0.0,0.0,0.0,0.0,...|  6.0|[0.0,2.2195269323...|[0.0,0.0693602166...|       6.0|
|[0.0,0.0,0.0,0.0,...|  2.0|[0.15643338919068...|[0.00488854341220...|       2.0|
+--------------------+-----+--------------------+--------------------+----------+
only showing top 2 rows

0.998682414627
0.998569922522
0.998682414627


In [41]:
predictions.createOrReplaceTempView("predictionTable")
test=spark.sql("SELECT count(*) as TruePositive from predictionTable a1 where a1.label!=2.0")
test.show(2)

+------------+
|TruePositive|
+------------+
|      118832|
+------------+

