In [1]:
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("sparky").\
        master("spark://spark-master:7077").\
        config("spark.executor.memory", "512m").\
        getOrCreate()

In [2]:
from urllib import request
dataDir = '../data/%s'
irisDataFile = dataDir % 'iris.data'
request.urlretrieve('https://archive.ics.uci.edu/ml/machine-learning-databases/iris/iris.data', irisDataFile)

('../data/iris.data', <http.client.HTTPMessage at 0x7f9d382adcd0>)

In [3]:
data = spark.read.csv(irisDataFile)

In [4]:
data.show(n=4)

+---+---+---+---+-----------+
|_c0|_c1|_c2|_c3|        _c4|
+---+---+---+---+-----------+
|5.1|3.5|1.4|0.2|Iris-setosa|
|4.9|3.0|1.4|0.2|Iris-setosa|
|4.7|3.2|1.3|0.2|Iris-setosa|
|4.6|3.1|1.5|0.2|Iris-setosa|
+---+---+---+---+-----------+
only showing top 4 rows



In [5]:
kddcupDataFile = dataDir % 'kddcup.data_10_percent.gz'
request.urlretrieve("http://kdd.ics.uci.edu/databases/kddcup99/kddcup.data_10_percent.gz", kddcupDataFile)

('../data/kddcup.data_10_percent.gz',
 <http.client.HTTPMessage at 0x7f9d382ad790>)

In [6]:
# Creates a DataFrame type. - https://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.DataFrame
kddcup_data = spark.read.csv(kddcupDataFile)

In [7]:
kddcup_data.count()

494021

In [8]:
kddcup_data.take(2)

[Row(_c0='0', _c1='tcp', _c2='http', _c3='SF', _c4='181', _c5='5450', _c6='0', _c7='0', _c8='0', _c9='0', _c10='0', _c11='1', _c12='0', _c13='0', _c14='0', _c15='0', _c16='0', _c17='0', _c18='0', _c19='0', _c20='0', _c21='0', _c22='8', _c23='8', _c24='0.00', _c25='0.00', _c26='0.00', _c27='0.00', _c28='1.00', _c29='0.00', _c30='0.00', _c31='9', _c32='9', _c33='1.00', _c34='0.00', _c35='0.11', _c36='0.00', _c37='0.00', _c38='0.00', _c39='0.00', _c40='0.00', _c41='normal.'),
 Row(_c0='0', _c1='tcp', _c2='http', _c3='SF', _c4='239', _c5='486', _c6='0', _c7='0', _c8='0', _c9='0', _c10='0', _c11='1', _c12='0', _c13='0', _c14='0', _c15='0', _c16='0', _c17='0', _c18='0', _c19='0', _c20='0', _c21='0', _c22='8', _c23='8', _c24='0.00', _c25='0.00', _c26='0.00', _c27='0.00', _c28='1.00', _c29='0.00', _c30='0.00', _c31='19', _c32='19', _c33='1.00', _c34='0.00', _c35='0.05', _c36='0.00', _c37='0.00', _c38='0.00', _c39='0.00', _c40='0.00', _c41='normal.')]

In [9]:
from pyspark.sql.functions import col
from time import time

normal_kddcup_data = kddcup_data.filter(col('_c41') == 'normal.')
countStart = time()
normalCount = normal_kddcup_data.count()
countSpan = time() - countStart

print("There are %s 'normal' interactions" % normalCount)
print('Count completed in %s seconds.' % round(countSpan,3))
# Resilient Distributed Datasets (RDD) - https://spark.apache.org/docs/latest/rdd-programming-guide.html
c31_data = normal_kddcup_data.rdd.map(lambda x: int(x['_c31']))
# Collect will take everything from the rdd and put it in a list.
# c31_data.collect()
c31DataFiltered = c31_data.filter(lambda x: x < 10)
print('Filtered count: %s' % c31DataFiltered.count())
print('First five: %s' % c31DataFiltered.take(5))

There are 97278 'normal' interactions
Count completed in 1.275 seconds.
Filtered count: 10090
First five: [9, 1, 8, 8, 4]
