# Distribuovane pocitanie - Spark

Spark je v sucasnosti prvy framework, po ktorom ludia idu ak chcu robit nejake distribuovane in-memory spracovanie udajov.

Existuje ale velmi vela dalsich moznosti, nastrojov a technologii:
* **Hadoop (MapReduce + HDFS)** - po tomto idu ludia ak maju fakt vela dat, ktore nevlezu do pamati ani na velkom clustri a/alebo chcu pracovat na disku
* Storm
* Flink

<img src="http://mattturck.com/wp-content/uploads/2018/07/Matt_Turck_FirstMark_Big_Data_Landscape_2018_Final.png" alt="BigData landscape 2018"/>
zdroj: http://mattturck.com/bigdata2018/

# Spark - zopar vlastnosti

* In-memory spracovanie dat
* Jednotny pristup k datam a vypoctovym prostriedkom (rovnako pisem kod ak pracujem na mojom notebooku a na celom klustri)
* Na pozadi Scala a JVM, ale ma velmi dobre API pre Python a aj R
* v podstate MapReduce ale v pamati a s moznosotu microbatchov na spravocanie prudov dat
* Zaklad je RDD (Resilient distributed dataset) - kolekcia dat distribuovana na jednotlive uzly. Zaklad prace su transformacie nda datami reprezentovanymi ako RDD. Jednoducha podpora pre zakladne operacie ako map, filter, collect
* Transformacie su lenive (lazy). Nevykonavaju sa dokedy ich nepotrebujete.

In [1]:
sc

In [2]:
data = [1, 2, 3, 4, 5]
distData = sc.parallelize(data)
distData # toto je prklad RDD

ParallelCollectionRDD[0] at parallelize at PythonRDD.scala:480

In [3]:
f = distData.map(lambda x: x % 2 == 0)
f.take(3)

[False, True, False]

In [4]:
f = distData.filter(lambda x: x % 2 == 0)
f.take(5)

[2, 4]

# Hrackarsky priklad s hladanim prvocisel

Mam funkciu, ktora overuje ci je cislo prvocislo a ja ju chcem distribuovat na vela dat

In [5]:
# prevzane z https://districtdatalabs.silvrback.com/getting-started-with-spark-in-python

def isprime(n):
    n = abs(int(n))
    if n < 2:
        return False
    if n == 2:
        return True
    if not n & 1:
        return False
    for x in range(3, int(n**0.5)+1, 2):
        if n % x == 0:
            return False
    return True

In [6]:
import time

In [7]:
nums = sc.parallelize(range(10**6))

In [8]:
start = time.time()
print(nums.filter(isprime).count())
end = time.time()
print("Elapsed time: {} s".format(end - start))

78498
Elapsed time: 2.577939033508301 s


# Oneskoreny vypocet

In [9]:
start = time.time()
print(nums.filter(isprime))
end = time.time()
print("Elapsed time: {} s".format(end - start))

PythonRDD[7] at RDD at PythonRDD.scala:48
Elapsed time: 0.019066810607910156 s


Nezavolal som funkciu, ktora by vracala vysledky, tak sa este nic nevykonalo. Pripravil sa len RDD s transformaciami, ktore sa vykonaju vtedy, ked to bude treba

In [10]:
start = time.time()
print(nums.filter(isprime).take(5))
end = time.time()
print("Elapsed time: {} s".format(end - start))

[2, 3, 5, 7, 11]
Elapsed time: 0.07054328918457031 s


Az teraz som zavolal funkciu, ktora vyzaduje aby sa nieco aj spocitalo. Stacilo mi ale spocitat prvych par cisel, takze sa vykonala len cast vypoctu. Dalsie funkcie, ktore vracaju data su napr collect, count, ...

OPatrne s tymito funkciami (hlavne s collect). Vracaju vam vsetky data, ktore su vysledkom vypoctu. Ak by ich bolo vela, tak sa ich aj tak pokusia vratit a pocitac, z ktoreho pristupujete k sparku to nemusi zvladnut (ak teda priztupujem k nejakemu vacsiemi stroju.)

In [11]:
start = time.time()
print(nums.filter(isprime).takeOrdered(5, key = lambda x: -x))
end = time.time()
print("Elapsed time: {} s".format(end - start))

[999983, 999979, 999961, 999959, 999953]
Elapsed time: 2.7241902351379395 s


Tu som uz potreboval spocitat vsetko na to aby som to usporiadal, tak sa to muselo vykonat vsetko a chvilu to teda trvalo

# Skusme trochu realnejsi priklad

zdroje:
* https://github.com/jadianes/spark-py-notebooks
* https://www.codementor.io/jadianes/python-spark-sql-dataframes-du107w74i

## Stiahnem si data o utokoch na pocitacovu siet

Su to data charakterizujuce spojenia v sieti

In [20]:
import urllib.request
# f = urllib.request.urlretrieve("http://kdd.ics.uci.edu/databases/kddcup99/kddcup.data.gz", "data/kddcup.data.gz")
# f = urllib.request.urlretrieve("http://kdd.ics.uci.edu/databases/kddcup99/kddcup.data_10_percent.gz", "data/kddcup.data_10_percent.gz")
# f = urllib.request.urlretrieve("http://kdd.ics.uci.edu/databases/kddcup99/corrected.gz", "data/corrected.gz")

In [21]:
%%bash
ls -lh data

total 21M
-rw-r--r-- 1 jakub.sevcech anomaly 1.4M Dec 11 23:39 corrected.gz
-rw-r--r-- 1 jakub.sevcech anomaly 2.1M Dec 11 23:32 kddcup.data_10_percent.gz
-rw-r--r-- 1 jakub.sevcech anomaly  18M Dec 11 23:32 kddcup.data.gz


# Vytvorim si RDD

In [14]:
data_file = "data/kddcup.data_10_percent.gz"
raw_data = sc.textFile(data_file).cache()

In [15]:
from pyspark.sql import Row

# nacitam data a nastavim im schemu
csv_data = raw_data.map(lambda l: l.split(","))
row_data = csv_data.map(lambda p: Row(
    duration=int(p[0]), 
    protocol_type=p[1],
    service=p[2],
    flag=p[3],
    src_bytes=int(p[4]),
    dst_bytes=int(p[5])
    )
)

## Vytvorim si DataFrame velmi podobny tomu v Pandas

In [16]:
interactions_df = sqlContext.createDataFrame(row_data)
type(interactions_df)

pyspark.sql.dataframe.DataFrame

## Viem robit podobne opearcie ako s Pandas, akurat distribuovane

In [17]:
interactions_df.groupBy("protocol_type").count().show()

+-------------+------+
|protocol_type| count|
+-------------+------+
|          tcp|190065|
|          udp| 20354|
|         icmp|283602|
+-------------+------+



# Co ked by som chcel k tym datam pristupovat cez SQL?

In [24]:
from pyspark.sql import SQLContext
sqlContext = SQLContext(sc)

In [18]:
interactions_df.registerTempTable("interactions")

In [27]:
tcp_interactions = sqlContext.sql("""
    SELECT duration, dst_bytes FROM interactions WHERE protocol_type = 'tcp' AND duration > 1000 AND dst_bytes = 0
""")
tcp_interactions.show()

+--------+---------+
|duration|dst_bytes|
+--------+---------+
|    5057|        0|
|    5059|        0|
|    5051|        0|
|    5056|        0|
|    5051|        0|
|    5039|        0|
|    5062|        0|
|    5041|        0|
|    5056|        0|
|    5064|        0|
|    5043|        0|
|    5061|        0|
|    5049|        0|
|    5061|        0|
|    5048|        0|
|    5047|        0|
|    5044|        0|
|    5063|        0|
|    5068|        0|
|    5062|        0|
+--------+---------+
only showing top 20 rows



## Cize by sme vedeli SPark pouzit napriklad na explorativnu analyzu celkom velkych objemov dat

## Mame tiez kniznice, na to aby sme natrenovali aj nejake modely

# Zoberme si trenovacie a testovacie data

In [23]:
data_file = "data/kddcup.data.gz"
raw_data = sc.textFile(data_file)

print("Train data size is {}".format(raw_data.count()))

Train data size is 4898431


In [24]:
test_data_file = "data/corrected.gz"
test_raw_data = sc.textFile(test_data_file)

print("Test data size is {}".format(test_raw_data.count()))

Test data size is 311029


## Nacitajme si ich ako zoznamy riadkov = pozorovania s atributmi

In [25]:
from pyspark.mllib.regression import LabeledPoint
from numpy import array

csv_data = raw_data.map(lambda x: x.split(","))
test_csv_data = test_raw_data.map(lambda x: x.split(","))

protocols = csv_data.map(lambda x: x[1]).distinct().collect()
services = csv_data.map(lambda x: x[2]).distinct().collect()
flags = csv_data.map(lambda x: x[3]).distinct().collect()

## Pozorovanie vizera nejak takto

In [30]:
csv_data.take(1)

[['0',
  'tcp',
  'http',
  'SF',
  '215',
  '45076',
  '0',
  '0',
  '0',
  '0',
  '0',
  '1',
  '0',
  '0',
  '0',
  '0',
  '0',
  '0',
  '0',
  '0',
  '0',
  '0',
  '1',
  '1',
  '0.00',
  '0.00',
  '0.00',
  '0.00',
  '1.00',
  '0.00',
  '0.00',
  '0',
  '0',
  '0.00',
  '0.00',
  '0.00',
  '0.00',
  '0.00',
  '0.00',
  '0.00',
  '0.00',
  'normal.']]

## Opis dat je dostupny tu
http://kdd.ics.uci.edu/databases/kddcup99/task.html

## Taketo hodnoty nadobudaju niektore atributy

In [26]:
protocols

['icmp', 'udp', 'tcp']

In [27]:
services

['finger',
 'netbios_dgm',
 'name',
 'X11',
 'hostnames',
 'vmnet',
 'systat',
 'shell',
 'netstat',
 'netbios_ssn',
 'urh_i',
 'pop_3',
 'ldap',
 'domain',
 'mtp',
 'remote_job',
 'exec',
 'supdup',
 'courier',
 'urp_i',
 'pop_2',
 'csnet_ns',
 'smtp',
 'whois',
 'daytime',
 'bgp',
 'imap4',
 'nntp',
 'http_443',
 'klogin',
 'rje',
 'IRC',
 'link',
 'http_8001',
 'uucp',
 'tftp_u',
 'iso_tsap',
 'uucp_path',
 'auth',
 'ecr_i',
 'other',
 'domain_u',
 'ssh',
 'discard',
 'ctf',
 'red_i',
 'tim_i',
 'time',
 'login',
 'Z39_50',
 'ftp',
 'telnet',
 'ntp_u',
 'sql_net',
 'aol',
 'private',
 'gopher',
 'efs',
 'http_2784',
 'ftp_data',
 'nnsp',
 'http',
 'sunrpc',
 'eco_i',
 'harvest',
 'kshell',
 'echo',
 'netbios_ns',
 'pm_dump',
 'printer']

In [28]:
flags

['S0', 'RSTR', 'SH', 'S1', 'S2', 'RSTOS0', 'REJ', 'OTH', 'SF', 'S3', 'RSTO']

# Zakodujme si kategoricke data

Pre jednoduchost to teraz mozeme spravit ako keby to boli ordinalne premenne. V modeli explicitne povieme, ze su to kategoricke premenne aby to nebral ako cisla.

In [31]:
def create_labeled_point(line_split):
    # leave_out = [41] (label)
    clean_line_split = line_split[0:41]
    
    # convert protocol to numeric categorical variable
    try: 
        clean_line_split[1] = protocols.index(clean_line_split[1])
    except:
        clean_line_split[1] = len(protocols)
        
    # convert service to numeric categorical variable
    try:
        clean_line_split[2] = services.index(clean_line_split[2])
    except:
        clean_line_split[2] = len(services)
    
    # convert flag to numeric categorical variable
    try:
        clean_line_split[3] = flags.index(clean_line_split[3])
    except:
        clean_line_split[3] = len(flags)
    
    # convert label to binary label
    attack = 1.0
    if line_split[41]=='normal.':
        attack = 0.0
        
    return LabeledPoint(attack, array([float(x) for x in clean_line_split]))

training_data = csv_data.map(create_labeled_point)
test_data = test_csv_data.map(create_labeled_point)

## A natrenujme rozhodovaci strom

Pouzivame kniznicu **mllib**

In [34]:
from pyspark.mllib.tree import DecisionTree, DecisionTreeModel

t0 = time.time()
tree_model = DecisionTree.trainClassifier(training_data, numClasses=2, 
                                          categoricalFeaturesInfo={1: len(protocols), 2: len(services), 3: len(flags)},
                                          impurity='gini', maxDepth=4, maxBins=100)

print("Classifier trained in {} seconds".format(round(time.time() - t0,3)))

Classifier trained in 272.709 seconds


## Predikcie sa daju spocitat takto

In [35]:
t0 = time.time()
predictions = tree_model.predict(test_data.map(lambda p: p.features))

print("Predictions generated in {} seconds".format(round(time.time() - t0,3)))

Predictions generated in 0.049 seconds


## A uspesnost mozeme vyhodnotit takto

In [38]:
labels_and_preds = test_data.map(lambda p: p.label).zip(predictions)

t0 = time.time()
test_accuracy = labels_and_preds.filter(lambda x: x[0] == x[1]).count() / float(test_data.count())

print("Prediction made in {} seconds. Test accuracy is {}".format(round(time.time() - t0,3), round(test_accuracy,4)))

Prediction made in 23.226 seconds. Test accuracy is 0.916


# Pravidla zo stromu si vieme jednoducho vypisat

In [39]:
print("Learned classification tree model:")
print(tree_model.toDebugString())

Learned classification tree model:
DecisionTreeModel classifier of depth 4 with 27 nodes
  If (feature 22 <= 33.0)
   If (feature 25 <= 0.5)
    If (feature 36 <= 0.48)
     If (feature 34 <= 0.91)
      Predict: 0.0
     Else (feature 34 > 0.91)
      Predict: 1.0
    Else (feature 36 > 0.48)
     If (feature 2 in {0.0,56.0,42.0,52.0,14.0,61.0,38.0,13.0,41.0,2.0,32.0,22.0,44.0,50.0,11.0,23.0,30.0,51.0,19.0,47.0,15.0})
      Predict: 0.0
     Else (feature 2 not in {0.0,56.0,42.0,52.0,14.0,61.0,38.0,13.0,41.0,2.0,32.0,22.0,44.0,50.0,11.0,23.0,30.0,51.0,19.0,47.0,15.0})
      Predict: 1.0
   Else (feature 25 > 0.5)
    If (feature 3 in {5.0,6.0,9.0,3.0,8.0,4.0})
     If (feature 2 in {0.0,61.0,38.0,22.0,59.0,7.0,3.0,50.0,31.0,11.0,40.0,51.0,47.0})
      Predict: 0.0
     Else (feature 2 not in {0.0,61.0,38.0,22.0,59.0,7.0,3.0,50.0,31.0,11.0,40.0,51.0,47.0})
      Predict: 1.0
    Else (feature 3 not in {5.0,6.0,9.0,3.0,8.0,4.0})
     If (feature 38 <= 0.07)
      Predict: 0.0
     Else 

Teraz by som vedel interpretovat cely strom. Staci mi pozriet sa co mam v jednotlivych stlpcoch.  Popis dat je dostupny tu: http://kdd.ics.uci.edu/databases/kddcup99/task.html