In [1]:
from pyspark import SparkConf, SparkContext
from pyspark.sql import SQLContext

conf = SparkConf().setAppName("test")
sc = SparkContext(conf=conf)
                              
sqlCtx = SQLContext(sc)
df = sqlCtx.read.load('file:///home/fran/Descargas/p2/SplittedWithUpSizing/C1K_no_attacks.csv', 
                     format='com.databricks.spark.csv', header='true', inferSchema='true')
df2 = sqlCtx.read.load('file:///home/fran/Descargas/p2/SplittedWithUpSizing/C1K_attacks.csv', 
                     format='com.databricks.spark.csv', header='true', inferSchema='true')

In [2]:

df2.columns

['GyroscopeStat_x_MEAN',
 'GyroscopeStat_z_MEAN',
 'GyroscopeStat_COV_z_x',
 'GyroscopeStat_COV_z_y',
 'MagneticField_x_MEAN',
 'MagneticField_z_MEAN',
 'MagneticField_COV_z_x',
 'MagneticField_COV_z_y',
 'Pressure_MEAN',
 'LinearAcceleration_COV_z_x',
 'LinearAcceleration_COV_z_y',
 'LinearAcceleration_x_MEAN',
 'LinearAcceleration_z_MEAN',
 'attack']

In [3]:
df.printSchema()


root
 |-- GyroscopeStat_x_MEAN: double (nullable = true)
 |-- GyroscopeStat_z_MEAN: double (nullable = true)
 |-- GyroscopeStat_COV_z_x: double (nullable = true)
 |-- GyroscopeStat_COV_z_y: double (nullable = true)
 |-- MagneticField_x_MEAN: double (nullable = true)
 |-- MagneticField_z_MEAN: double (nullable = true)
 |-- MagneticField_COV_z_x: double (nullable = true)
 |-- MagneticField_COV_z_y: double (nullable = true)
 |-- Pressure_MEAN: double (nullable = true)
 |-- LinearAcceleration_COV_z_x: double (nullable = true)
 |-- LinearAcceleration_COV_z_y: double (nullable = true)
 |-- LinearAcceleration_x_MEAN: double (nullable = true)
 |-- LinearAcceleration_z_MEAN: double (nullable = true)
 |-- attack: integer (nullable = true)



In [4]:
df.describe().toPandas().transpose()

Unnamed: 0,0,1,2,3,4
summary,count,mean,stddev,min,max
GyroscopeStat_x_MEAN,1000,-0.007185128809498113,0.02082398943623495,-0.1373611818115172,0.20867960320920675
GyroscopeStat_z_MEAN,1000,-0.009689064325925242,0.03731235233030993,-0.7951338334444443,0.3676629841275509
GyroscopeStat_COV_z_x,1000,0.024020996311882387,0.10888640321569676,-0.33563322168617776,1.1033720660545596
GyroscopeStat_COV_z_y,1000,0.013878322365280468,0.06982765611453817,-0.19471655981661345,1.1975237400173495
MagneticField_x_MEAN,1000,-501.6594161452575,7290.234309721676,-88245.0,38056.142857142855
MagneticField_z_MEAN,1000,-375.82058291324614,5505.1958431942985,-93726.0,36405.55555555556
MagneticField_COV_z_x,1000,7.411439405910629,88.75330330303233,-2310.789677684387,1219.1180844074822
MagneticField_COV_z_y,1000,11.87434621275917,196.83980574351267,-294.4752870386481,6142.868476277499
Pressure_MEAN,1000,1948.609518041027,30848.41113764719,921.8236228571428,976485.375


In [5]:
featureColumns= ['GyroscopeStat_x_MEAN', 'Pressure_MEAN', 'LinearAcceleration_z_MEAN', 'GyroscopeStat_z_MEAN']
dfWN=df.na.drop()
dfWN.count(), len(dfWN.columns), df.count(), df2.count()

(1000, 14, 1000, 798)

In [7]:
from pyspark.ml.linalg import Vectors
from pyspark.ml.feature import VectorAssembler

assembler= VectorAssembler (inputCols= featureColumns, outputCol= "features")
assembled = assembler.transform(df)
assembled2 = assembler.transform(df2)

(trainingData, testData) = assembled.randomSplit([0.8,0.2], seed= 13234)
(trainingData2, testData2) = assembled2.randomSplit([0.8,0.2], seed= 13234)
print(testData2.count(), testData.count())


trainingData = trainingData.union(trainingData2)
testData = testData.union(testData2)
testData.count()




148 195


343

In [8]:
from pyspark.ml.classification import DecisionTreeClassifier
dt= DecisionTreeClassifier (labelCol="attack", featuresCol="features",
                           maxDepth=5, minInstancesPerNode=20,
                           impurity="gini")

In [9]:
from pyspark.ml import Pipeline

pipeline= Pipeline(stages=[dt])
model = pipeline.fit (trainingData)

In [10]:
predictions= model.transform(testData)

In [11]:
predictions.select("prediction", "attack").show(10)

+----------+------+
|prediction|attack|
+----------+------+
|       1.0|     0|
|       1.0|     0|
|       1.0|     0|
|       1.0|     0|
|       1.0|     0|
|       1.0|     0|
|       1.0|     0|
|       0.0|     0|
|       1.0|     0|
|       0.0|     0|
+----------+------+
only showing top 10 rows



In [12]:
predictions.select("prediction", "attack").write.save(path="file:///home/fran/Descargas/p2/predictionsUp",format="com.databricks.spark.csv", header="true")

In [13]:

predictions=sqlCtx.read.load('file:///home/fran/Descargas/p2/predictionsUp', format='com.databricks.spark.csv', header='true',inferSchema='true')

In [14]:
predictions.show(10)

+----------+------+
|prediction|attack|
+----------+------+
|       1.0|     0|
|       1.0|     0|
|       1.0|     0|
|       1.0|     0|
|       1.0|     0|
|       1.0|     0|
|       1.0|     0|
|       0.0|     0|
|       1.0|     0|
|       0.0|     0|
+----------+------+
only showing top 10 rows



In [15]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

evaluator = MulticlassClassificationEvaluator( labelCol="attack", predictionCol="prediction", metricName="accuracy")
acuracy= evaluator.evaluate(predictions)
print("Acuraccy = %g" % (acuracy))


Acuraccy = 0.723032


In [16]:
from pyspark.mllib.evaluation import MulticlassMetrics
predictions = (predictions.withColumn("attack", predictions["attack"].cast("double")))
metrics= MulticlassMetrics(predictions.rdd.map(tuple))

In [17]:
metrics.confusionMatrix().toArray().transpose()

array([[160.,  60.],
       [ 35.,  88.]])