# Distributed computing - Spark

Spark is currently the first framework people are considering when they want to do an in-memory distributed data processing.

However, there are many other possibilities, tools and technologies:
* **Hadoop (MapReduce + HDFS)** - tool to consider when you have very large amounts of data, that would not fit to memory on even a large cluster or/and you want to process them on a disk
* Storm
* Flink


<img src="http://mattturck.com/wp-content/uploads/2019/07/2019_Matt_Turck_Big_Data_Landscape_Final_Fullsize.png" alt="BigData landscape 2019"/>
source: https://mattturck.com/data2019/

# Spark - a few properties
I will be using new keywords which you've probably already met in different courser, If we stumble upon one you don't understand, stop me.

* In-memory data processing
* Uniform access to the data and computational resources (I write the same code regardless whether for one computer or for a whole cluster)
* Scala and JVM on the background, but has very good API accessible from Python and R
* basically MapReduce but in memory and with the possibility to process in micro batches and as stream of data
* The basis is RDD (Resilient distributed dataset) - a collection of data distributed to various computational nodes. The foundation of work are transformations on data represented as RDD. Simple support for basic operations such as map, filter, collect
* Transformations are lazy. They are not executed until they are needed.


In [1]:
sc

In [2]:
data = [1, 2, 3, 4, 5]
distData = sc.parallelize(data)
distData # this is an example of RDD

ParallelCollectionRDD[0] at parallelize at PythonRDD.scala:480

In [3]:
f = distData.map(lambda x: x % 2 == 0)
f.take(3)

[False, True, False]

In [4]:
f = distData.filter(lambda x: x % 2 == 0)
f.take(5)

[2, 4]

# Toy example with searching primes

I have a function testing the number whether it is a prime and I want to distribute it on a lot of data.


In [5]:
# taken from https://districtdatalabs.silvrback.com/getting-started-with-spark-in-python

def isprime(n):
    n = abs(int(n))
    if n < 2:
        return False
    if n == 2:
        return True
    if not n & 1:
        return False
    for x in range(3, int(n**0.5)+1, 2):
        if n % x == 0:
            return False
    return True

In [6]:
nums = sc.parallelize(range(10**6))

In [7]:
%%time
nums.filter(isprime).count()

CPU times: user 24 ms, sys: 4 ms, total: 28 ms
Wall time: 3.07 s


78498

# Late evaluation

In [8]:
%%time
nums.filter(isprime)

CPU times: user 0 ns, sys: 0 ns, total: 0 ns
Wall time: 63.7 µs


PythonRDD[7] at RDD at PythonRDD.scala:48

I didn't call a function that returns results so nothing was executed. Only a RDD with transformations was prepared. These transformations will be executed when needed.

In [9]:
%%time
nums.filter(isprime).take(5)

CPU times: user 12 ms, sys: 4 ms, total: 16 ms
Wall time: 79.2 ms


[2, 3, 5, 7, 11]

Now, I have called a function, that requires to compute some results. It was enough to compute a few results, so only part of the computation was performed. Other functions starting the execution are for example collect, count, ...

Be careful with these functions (mainly with collect). They return all the data that are the result of the computation. Even if it is a lot of data, they will still try to return it to the computer you use to access the SPark cluster from.


In [10]:
%%time
nums.filter(isprime).takeOrdered(5, key = lambda x: -x)

CPU times: user 16 ms, sys: 8 ms, total: 24 ms
Wall time: 3.2 s


[999983, 999979, 999961, 999959, 999953]

Now I needed to compute it all to be able to sort the results. The whole computation had to be executed and it took some time.

# Let's try other, more real example

sources:
* https://github.com/jadianes/spark-py-notebooks
* https://www.codementor.io/jadianes/python-spark-sql-dataframes-du107w74i


## Download the data about attacks on a computer network

The data describe connections in the network


In [20]:
import urllib.request
# f = urllib.request.urlretrieve("http://kdd.ics.uci.edu/databases/kddcup99/kddcup.data.gz", "data/kddcup.data.gz")
# f = urllib.request.urlretrieve("http://kdd.ics.uci.edu/databases/kddcup99/kddcup.data_10_percent.gz", "data/kddcup.data_10_percent.gz")
# f = urllib.request.urlretrieve("http://kdd.ics.uci.edu/databases/kddcup99/corrected.gz", "data/corrected.gz")

In [21]:
%%bash
ls -lh data

total 21M
-rw-r--r-- 1 jakub.sevcech anomaly 1.4M Dec 11 23:39 corrected.gz
-rw-r--r-- 1 jakub.sevcech anomaly 2.1M Dec 11 23:32 kddcup.data_10_percent.gz
-rw-r--r-- 1 jakub.sevcech anomaly  18M Dec 11 23:32 kddcup.data.gz


# I will create the RDD

In [14]:
data_file = "data/kddcup.data_10_percent.gz"
raw_data = sc.textFile(data_file).cache()

Notice, that I am processing compressed data directly. The true amount is approx. 10 times higher.

# We define schema - header

In [15]:
from pyspark.sql import Row

# I will load the data and set the schema
csv_data = raw_data.map(lambda l: l.split(","))
row_data = csv_data.map(lambda p: Row(
    duration=int(p[0]), 
    protocol_type=p[1],
    service=p[2],
    flag=p[3],
    src_bytes=int(p[4]),
    dst_bytes=int(p[5])
    )
)

## I will create a DataFrame very similar to the dataframe in Pandas

In [16]:
interactions_df = sqlContext.createDataFrame(row_data)
type(interactions_df)

pyspark.sql.dataframe.DataFrame

## I can use similar operations as with Pandas, but distributed

In [17]:
interactions_df.groupBy("protocol_type").count().show()

+-------------+------+
|protocol_type| count|
+-------------+------+
|          tcp|190065|
|          udp| 20354|
|         icmp|283602|
+-------------+------+



# What if I want to access it through SQL?

In [24]:
from pyspark.sql import SQLContext
sqlContext = SQLContext(sc)

In [18]:
interactions_df.registerTempTable("interactions")

In [27]:
tcp_interactions = sqlContext.sql("""
    SELECT duration, dst_bytes FROM interactions WHERE protocol_type = 'tcp' AND duration > 1000 AND dst_bytes = 0
""")
tcp_interactions.show()

+--------+---------+
|duration|dst_bytes|
+--------+---------+
|    5057|        0|
|    5059|        0|
|    5051|        0|
|    5056|        0|
|    5051|        0|
|    5039|        0|
|    5062|        0|
|    5041|        0|
|    5056|        0|
|    5064|        0|
|    5043|        0|
|    5061|        0|
|    5049|        0|
|    5061|        0|
|    5048|        0|
|    5047|        0|
|    5044|        0|
|    5063|        0|
|    5068|        0|
|    5062|        0|
+--------+---------+
only showing top 20 rows



## So we can use the data for exploratory analysis on big amounts of data

## We also have libraries to train some models


# Get the train and test data

In [23]:
data_file = "data/kddcup.data.gz"
raw_data = sc.textFile(data_file)

print("Train data size is {}".format(raw_data.count()))

Train data size is 4898431


In [24]:
test_data_file = "data/corrected.gz"
test_raw_data = sc.textFile(test_data_file)

print("Test data size is {}".format(test_raw_data.count()))

Test data size is 311029


## Load it as list of rows = instances with attributes

In [25]:
from pyspark.mllib.regression import LabeledPoint
from numpy import array

csv_data = raw_data.map(lambda x: x.split(","))
test_csv_data = test_raw_data.map(lambda x: x.split(","))

## One instance looks like this

In [30]:
csv_data.take(1)

[['0',
  'tcp',
  'http',
  'SF',
  '215',
  '45076',
  '0',
  '0',
  '0',
  '0',
  '0',
  '1',
  '0',
  '0',
  '0',
  '0',
  '0',
  '0',
  '0',
  '0',
  '0',
  '0',
  '1',
  '1',
  '0.00',
  '0.00',
  '0.00',
  '0.00',
  '1.00',
  '0.00',
  '0.00',
  '0',
  '0',
  '0.00',
  '0.00',
  '0.00',
  '0.00',
  '0.00',
  '0.00',
  '0.00',
  '0.00',
  'normal.']]

## Data description is here
http://kdd.ics.uci.edu/databases/kddcup99/task.html

## These are the values some attributes have

In [26]:
protocols = csv_data.map(lambda x: x[1]).distinct().collect()
protocols

['icmp', 'udp', 'tcp']

In [27]:
services = csv_data.map(lambda x: x[2]).distinct().collect()
services

['finger',
 'netbios_dgm',
 'name',
 'X11',
 'hostnames',
 'vmnet',
 'systat',
 'shell',
 'netstat',
 'netbios_ssn',
 'urh_i',
 'pop_3',
 'ldap',
 'domain',
 'mtp',
 'remote_job',
 'exec',
 'supdup',
 'courier',
 'urp_i',
 'pop_2',
 'csnet_ns',
 'smtp',
 'whois',
 'daytime',
 'bgp',
 'imap4',
 'nntp',
 'http_443',
 'klogin',
 'rje',
 'IRC',
 'link',
 'http_8001',
 'uucp',
 'tftp_u',
 'iso_tsap',
 'uucp_path',
 'auth',
 'ecr_i',
 'other',
 'domain_u',
 'ssh',
 'discard',
 'ctf',
 'red_i',
 'tim_i',
 'time',
 'login',
 'Z39_50',
 'ftp',
 'telnet',
 'ntp_u',
 'sql_net',
 'aol',
 'private',
 'gopher',
 'efs',
 'http_2784',
 'ftp_data',
 'nnsp',
 'http',
 'sunrpc',
 'eco_i',
 'harvest',
 'kshell',
 'echo',
 'netbios_ns',
 'pm_dump',
 'printer']

In [28]:
flags = csv_data.map(lambda x: x[3]).distinct().collect()
flags

['S0', 'RSTR', 'SH', 'S1', 'S2', 'RSTOS0', 'REJ', 'OTH', 'SF', 'S3', 'RSTO']

# Let's encode categorical data

For simplicity, we can do it as they were ordinal. In the model, we will explicitly state, that these are categorical and not to be used as numbers.

In [31]:
def create_labeled_point(line_split):
    # leave_out = [41] (label)
    clean_line_split = line_split[0:41]
    
    # convert protocol to numeric categorical variable
    try: 
        clean_line_split[1] = protocols.index(clean_line_split[1])
    except:
        clean_line_split[1] = len(protocols)
        
    # convert service to numeric categorical variable
    try:
        clean_line_split[2] = services.index(clean_line_split[2])
    except:
        clean_line_split[2] = len(services)
    
    # convert flag to numeric categorical variable
    try:
        clean_line_split[3] = flags.index(clean_line_split[3])
    except:
        clean_line_split[3] = len(flags)
    
    # convert label to binary label
    attack = 1.0
    if line_split[41]=='normal.':
        attack = 0.0
        
    return LabeledPoint(attack, array([float(x) for x in clean_line_split]))

training_data = csv_data.map(create_labeled_point)
test_data = test_csv_data.map(create_labeled_point)

## And we will train a decision tree

We use library **mllib**


In [34]:
from pyspark.mllib.tree import DecisionTree, DecisionTreeModel

t0 = time.time()
tree_model = DecisionTree.trainClassifier(training_data, numClasses=2, 
                                          categoricalFeaturesInfo={1: len(protocols), 2: len(services), 3: len(flags)},
                                          impurity='gini', maxDepth=4, maxBins=100)

print("Classifier trained in {} seconds".format(round(time.time() - t0,3)))

Classifier trained in 272.709 seconds


## Predictions can be computed like this

In [35]:
t0 = time.time()
predictions = tree_model.predict(test_data.map(lambda p: p.features))

print("Predictions generated in {} seconds".format(round(time.time() - t0,3)))

Predictions generated in 0.049 seconds


## And the accuracy can be evaluated like this

In [38]:
labels_and_preds = test_data.map(lambda p: p.label).zip(predictions)

t0 = time.time()
test_accuracy = labels_and_preds.filter(lambda x: x[0] == x[1]).count() / float(test_data.count())

print("Prediction made in {} seconds. Test accuracy is {}".format(round(time.time() - t0,3), round(test_accuracy,4)))

Prediction made in 23.226 seconds. Test accuracy is 0.916


# Rules from the tree can be extracted like this

In [39]:
print("Learned classification tree model:")
print(tree_model.toDebugString())

Learned classification tree model:
DecisionTreeModel classifier of depth 4 with 27 nodes
  If (feature 22 <= 33.0)
   If (feature 25 <= 0.5)
    If (feature 36 <= 0.48)
     If (feature 34 <= 0.91)
      Predict: 0.0
     Else (feature 34 > 0.91)
      Predict: 1.0
    Else (feature 36 > 0.48)
     If (feature 2 in {0.0,56.0,42.0,52.0,14.0,61.0,38.0,13.0,41.0,2.0,32.0,22.0,44.0,50.0,11.0,23.0,30.0,51.0,19.0,47.0,15.0})
      Predict: 0.0
     Else (feature 2 not in {0.0,56.0,42.0,52.0,14.0,61.0,38.0,13.0,41.0,2.0,32.0,22.0,44.0,50.0,11.0,23.0,30.0,51.0,19.0,47.0,15.0})
      Predict: 1.0
   Else (feature 25 > 0.5)
    If (feature 3 in {5.0,6.0,9.0,3.0,8.0,4.0})
     If (feature 2 in {0.0,61.0,38.0,22.0,59.0,7.0,3.0,50.0,31.0,11.0,40.0,51.0,47.0})
      Predict: 0.0
     Else (feature 2 not in {0.0,61.0,38.0,22.0,59.0,7.0,3.0,50.0,31.0,11.0,40.0,51.0,47.0})
      Predict: 1.0
    Else (feature 3 not in {5.0,6.0,9.0,3.0,8.0,4.0})
     If (feature 38 <= 0.07)
      Predict: 0.0
     Else 

Now, I can interpret the whole tree. I only have to look what is in individual attributes. The data description is here: http://kdd.ics.uci.edu/databases/kddcup99/task.html