# SPARK RDD

In [1]:
import pyspark
sc = pyspark.SparkContext(appName="RDDstart")

### Tworzenie RDD

#### Lokalna kolekcja

In [2]:
RDDlist = sc.parallelize(list(range(10)))

In [3]:
RDDlist

ParallelCollectionRDD[0] at parallelize at PythonRDD.scala:480

In [4]:
RDDlist.toDebugString()

b'(4) ParallelCollectionRDD[0] at parallelize at PythonRDD.scala:480 []'

In [5]:
RDDlist.collect()

[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]

In [6]:
RDDlist.take(5)

[0, 1, 2, 3, 4]

**_UWAGA: `collect` i `take` to niebezpieczne operacje - mogą doprowadzić do zapchania drivera i przerwania działania aplikacji, zachowaj ostrożność_**

#### Plik

In [7]:
RDDfile = sc.textFile("RDD.txt")

In [8]:
RDDfile.collect()

['0,1,2,3,4,5,6,7,8,9']

In [9]:
RDDfile.take(5)

['0,1,2,3,4,5,6,7,8,9']

#### Inne RDD

In [10]:
RDDother = RDDfile.flatMap(lambda x: x.split(","))

In [11]:
RDDother.collect()

['0', '1', '2', '3', '4', '5', '6', '7', '8', '9']

In [12]:
RDDother.take(5)

['0', '1', '2', '3', '4']

### Transformacje

##### map(fun)

Zwraca nowe RDD po zastosowaniu podanej funkcji na każdym elemencie oryginalnego RDD

In [13]:
RDDmap1 = RDDother.map(lambda x: int(x))
RDDmap1.collect()

[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]

In [14]:
RDDmap1.map(lambda x: x+1).collect()

[1, 2, 3, 4, 5, 6, 7, 8, 9, 10]

In [15]:
RDDmap1.map(lambda x: (x,1)).collect()

[(0, 1),
 (1, 1),
 (2, 1),
 (3, 1),
 (4, 1),
 (5, 1),
 (6, 1),
 (7, 1),
 (8, 1),
 (9, 1)]

In [16]:
RDDlist2 = sc.parallelize([list(range(5)),list(range(5,10))])

In [17]:
RDDlist2.collect()

[[0, 1, 2, 3, 4], [5, 6, 7, 8, 9]]

In [18]:
RDDlist2.map(sum).collect()

[10, 35]

In [19]:
RDDlist2.map(lambda x: [y + 1 for y in x]).collect()

[[1, 2, 3, 4, 5], [6, 7, 8, 9, 10]]

In [20]:
RDDfile.collect()

['0,1,2,3,4,5,6,7,8,9']

In [21]:
RDDfile.map(lambda x: x.split(",")).collect()

[['0', '1', '2', '3', '4', '5', '6', '7', '8', '9']]

> **TODO**: W RDDlist2 zastąp liczby parzyste literą *P* a nieparzyste *N*

In [38]:
#RDDlist2.map(lambda x: [y + 1 for y in x]).collect()

RDDlist2.map(lambda x: ['P' if y % 2 == 0 else 'N' for y in x]).collect()

[['P', 'N', 'P', 'N', 'P'], ['N', 'P', 'N', 'P', 'N']]

In [39]:
def np(x):
    lst = []
    for y in x:
        if y % 2 == 0:
            lst.append("P")
        else:
            lst.append("N")
    return lst

In [40]:
RDDlist2.map(np).collect()

[['P', 'N', 'P', 'N', 'P'], ['N', 'P', 'N', 'P', 'N']]

##### flatMap(func)

Zwraca nowe RDD po zastosowaniu podanej funkcji na każdym elemencie oryginalnego RDD oraz spłaszczeniu rezultatu

In [41]:
RDDlist2.flatMap(lambda x: x).collect()

[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]

In [42]:
RDDlist2.flatMap(lambda x: x*2).collect()

[0, 1, 2, 3, 4, 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 5, 6, 7, 8, 9]

In [43]:
RDDlist2.flatMap(lambda x: [y*2 for y in x]).collect()

[0, 2, 4, 6, 8, 10, 12, 14, 16, 18]

In [44]:
RDDlist3 = sc.parallelize([[[[0],[1]],[[2],[3]]],[[[4],[5]],[[6],[7]]]])
RDDlist3.collect()

[[[[0], [1]], [[2], [3]]], [[[4], [5]], [[6], [7]]]]

In [45]:
RDDlist3.flatMap(lambda x: x).collect()

[[[0], [1]], [[2], [3]], [[4], [5]], [[6], [7]]]

In [46]:
RDDlist3.flatMap(lambda x: x).flatMap(lambda x: x).collect()

[[0], [1], [2], [3], [4], [5], [6], [7]]

In [47]:
RDDlist3.flatMap(lambda x: x).flatMap(lambda x: x).flatMap(lambda x: x).collect()

[0, 1, 2, 3, 4, 5, 6, 7]

> **TODO**: Osiągnij taki sam wynik jak w komórce powyżej używając na RDDlist3: 1 x map i 2 x flatMap

In [52]:
RDDlist3.flatMap(lambda x: x).flatMap(lambda x: x).map(sum).collect()

[0, 1, 2, 3, 4, 5, 6, 7]

In [53]:
RDDlist3.flatMap(lambda x: x).flatMap(lambda x: x).map(lambda x: x[0]).collect()

[0, 1, 2, 3, 4, 5, 6, 7]

##### mapValues(func)

Zwraca nowe RDD po zastosowaniu podanej funkcji na każdej wartości oryginalnego RDD zawierającego pary klucz-wartość

In [54]:
RDDpair = sc.parallelize([("A",["Adam","Ada","Adrian"]),("B",["Bonifacy","Barnaba"])])
RDDpair.collect()

[('A', ['Adam', 'Ada', 'Adrian']), ('B', ['Bonifacy', 'Barnaba'])]

In [55]:
RDDpair.mapValues(lambda val: len(val)).collect()

[('A', 3), ('B', 2)]

In [56]:
RDDpair.mapValues(lambda val: len(val)).mapValues(lambda val: val*2).collect()

[('A', 6), ('B', 4)]

Te same rezultaty można osiągnąć używając `map`, jednak jest to mniej wygodne i generalnie odradzane

In [57]:
RDDpair.map(lambda x: (x[0],len(x[1]))).collect()

[('A', 3), ('B', 2)]

In [58]:
RDDpair.map(lambda x: (x[0],len(x[1]))).map(lambda x: (x[0],x[1]*2)).collect()

[('A', 6), ('B', 4)]

> **TODO**: Zmodyfikuj wartości w RDDpair tak aby zawierały nie imiona a liczby liter w imionach, następnie zsumuj liczby liter

In [62]:
RDDpair.mapValues(lambda x: [len(y) for y in x]).collect()

[('A', [4, 3, 6]), ('B', [8, 7])]

In [65]:
RDDpair.mapValues(lambda x: [len(y) for y in x]).mapValues(sum).collect()

[('A', 13), ('B', 15)]

##### flatMapValues(func)

Zwraca nowe RDD po zastosowaniu podanej funkcji na każdej wartości oryginalnego RDD zawierającego pary klucz-wartość oraz spłaszczeniu rezultatu

In [66]:
RDDpair.flatMapValues(lambda x: x).collect()

[('A', 'Adam'),
 ('A', 'Ada'),
 ('A', 'Adrian'),
 ('B', 'Bonifacy'),
 ('B', 'Barnaba')]

In [67]:
RDDpair.flatMapValues(lambda x: [y.lower() for y in x]).collect()

[('A', 'adam'),
 ('A', 'ada'),
 ('A', 'adrian'),
 ('B', 'bonifacy'),
 ('B', 'barnaba')]

> **TODO**: Na podstawie RDDpair stwórz RDD o następującej strukturze: (litera, (imię, l. liter))

In [68]:
RDDpair.collect()

[('A', ['Adam', 'Ada', 'Adrian']), ('B', ['Bonifacy', 'Barnaba'])]

In [78]:
RDDpair.flatMapValues(lambda x: [(y, len(y)) for y in x]).collect()

[('A', ('Adam', 4)),
 ('A', ('Ada', 3)),
 ('A', ('Adrian', 6)),
 ('B', ('Bonifacy', 8)),
 ('B', ('Barnaba', 7))]

##### keys(), values()

Metody te tworzą nowe RDD odpowiednio z kluczy i wartości oryginalnego RDD (klucz, wartość)

In [79]:
RDDpair.values().collect()

[['Adam', 'Ada', 'Adrian'], ['Bonifacy', 'Barnaba']]

In [80]:
RDDpair.keys().collect()

['A', 'B']

##### filter(func)

Zwraca nowe RDD zawierające jedynie elementy które spełniają predykat

In [81]:
RDDmap1.filter(lambda x: x > 3).collect()

[4, 5, 6, 7, 8, 9]

In [82]:
RDDpair.filter(lambda x: x[0] == "A").collect()

[('A', ['Adam', 'Ada', 'Adrian'])]

Dobrą praktyką optymalizującą działanie programu jest filtrowanie RDD możliwie jak najwcześniej. Z poniższych dwóch komórek z kodem to druga jest lepsza.

In [83]:
RDDpair.flatMapValues(lambda x: x).filter(lambda x: x[0] != "B").collect()

[('A', 'Adam'), ('A', 'Ada'), ('A', 'Adrian')]

In [84]:
RDDpair.filter(lambda x: x[0] != "B").flatMapValues(lambda x: x).collect()

[('A', 'Adam'), ('A', 'Ada'), ('A', 'Adrian')]

> **TODO**: Odfiltruj parzyste liczby z RDDmap1

In [93]:
RDDmap1.filter(lambda x: x % 2 != 0).collect()

[1, 3, 5, 7, 9]

##### join(RDD)

Zwraca RDD zawierające pary elementów z identycznymi kluczami w łączonych RDD.

In [96]:
RDDpair.collect()

[('A', ['Adam', 'Ada', 'Adrian']), ('B', ['Bonifacy', 'Barnaba'])]

In [94]:
RDDpair.join(RDDpair.flatMapValues(lambda x: x)).collect()

[('B', (['Bonifacy', 'Barnaba'], 'Bonifacy')),
 ('B', (['Bonifacy', 'Barnaba'], 'Barnaba')),
 ('A', (['Adam', 'Ada', 'Adrian'], 'Adam')),
 ('A', (['Adam', 'Ada', 'Adrian'], 'Ada')),
 ('A', (['Adam', 'Ada', 'Adrian'], 'Adrian'))]

##### union(RDD)

Zwraca RDD wynikłe z połączenia dwóch RDD

In [99]:
RDDmap1.union(RDDlist).collect()

[0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 0, 1, 2, 3, 4, 5, 6, 7, 8, 9]

In [100]:
RDDlist.union(RDDother).collect()

[0,
 1,
 2,
 3,
 4,
 5,
 6,
 7,
 8,
 9,
 '0',
 '1',
 '2',
 '3',
 '4',
 '5',
 '6',
 '7',
 '8',
 '9']

##### distinct()

Zwraca RDD zawierające jedynie unikalne wartości z oryginalnego RDD

In [101]:
RDDmap1.union(RDDlist).distinct().collect()

[0, 6, 1, 7, 2, 8, 3, 9, 4, 5]

##### groupBy(func)

Zwraca RDD z pogrupowanymi elementami

In [107]:
RDDlist.collect()

[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]

In [102]:
RDDlist.groupBy(lambda x: x % 2).collect()

[(0, <pyspark.resultiterable.ResultIterable at 0x7fa41a4897f0>),
 (1, <pyspark.resultiterable.ResultIterable at 0x7fa41a489710>)]

In [103]:
RDDlist.groupBy(lambda x: x % 2).mapValues(lambda x: list(x)).collect()

[(0, [0, 2, 4, 6, 8]), (1, [1, 3, 5, 7, 9])]

In [104]:
RDDlist.groupBy(lambda x: x % 2).mapValues(lambda x: [y*3 for y in x]).collect()

[(0, [0, 6, 12, 18, 24]), (1, [3, 9, 15, 21, 27])]

In [109]:
RDDpair2 = sc.parallelize([('Adam', 4), ('Ada', 3), ('Adrian', 6), ('Bonifacy', 8), ('Barnaba', 7)])

In [110]:
RDDpair2.groupBy(lambda x: "l.l.>4" if x[1] > 4 else "l.l.<=4").mapValues(lambda x: list(x)).collect()

[('l.l.<=4', [('Adam', 4), ('Ada', 3)]),
 ('l.l.>4', [('Adrian', 6), ('Bonifacy', 8), ('Barnaba', 7)])]

> **TODO**: Pogrupuj RDDpair2 ze względu na pierwszą literę imienia

In [108]:
RDDpair2.collect()

[('Adam', 4), ('Ada', 3), ('Adrian', 6), ('Bonifacy', 8), ('Barnaba', 7)]

In [115]:
RDDpair2.groupBy(lambda x: x[0][0]).mapValues(list).collect()


[('A', [('Adam', 4), ('Ada', 3), ('Adrian', 6)]),
 ('B', [('Bonifacy', 8), ('Barnaba', 7)])]

In [117]:
RDDpair2.sortByKey().collect()

[('Ada', 3), ('Adam', 4), ('Adrian', 6), ('Barnaba', 7), ('Bonifacy', 8)]

In [118]:
RDDpair2.sortByKey(lambda x: x[0][0]).collect()

[('Ada', 3), ('Adam', 4), ('Adrian', 6), ('Barnaba', 7), ('Bonifacy', 8)]

##### groupByKey()

Zwraca RDD z wartościami pogrupowanymi w pojedynczą sekwencję dla każdego klucza. (_Jeśli grupowanie wykonywane jest w celu przeprowadzenia agregacji dla każdego klucza, optymalniejsze będzie wykorzystanie `reduceByKey` lub `aggregateByKey`._)

In [122]:
RDDpair3 = sc.parallelize([('A', 'Adam'),('A', 'Ada'),('A', 'Adrian'),('B', 'Bonifacy'),('B', 'Barnaba')])

In [123]:
RDDpair3.groupByKey().collect()

[('A', <pyspark.resultiterable.ResultIterable at 0x7fa41a41f828>),
 ('B', <pyspark.resultiterable.ResultIterable at 0x7fa41a41f860>)]

In [124]:
RDDpair3.groupByKey().mapValues(list).collect()

[('A', ['Adam', 'Ada', 'Adrian']), ('B', ['Bonifacy', 'Barnaba'])]

> **TODO**: Na podstawie RDDpair stwórz RDD o następującej strukturze: (litera, (imię, l. liter)), następnie pogrupuj je po literze (zamień nowe wartości na listę tak aby były czytelne po użyciu `collect`)

In [125]:
RDDpair.collect()

[('A', ['Adam', 'Ada', 'Adrian']), ('B', ['Bonifacy', 'Barnaba'])]

In [133]:
RDDpair.flatMapValues(lambda x: [(y, len(y)) for y in x]).groupByKey().mapValues(list).collect()

[('A', [('Adam', 4), ('Ada', 3), ('Adrian', 6)]),
 ('B', [('Bonifacy', 8), ('Barnaba', 7)])]

##### reduceByKey(func)

Zwraca RDD z połączonymi wartościami dla każdego klucza. Funkcja redukująca musi być asocjacyjna _[(a x b) x c = a x (b x c)]_ i przemienna _[a x b = b x a]_. 

In [134]:
RDDpair4 = RDDpair3.mapValues(len)
RDDpair4.collect()

[('A', 4), ('A', 3), ('A', 6), ('B', 8), ('B', 7)]

In [135]:
RDDpair4.reduceByKey(lambda x,y: x+y).collect()

[('A', 13), ('B', 15)]

> **TODO**: Uzyskaj iloczyn dla każdego klucza w RDDpair4

In [136]:
RDDpair4.reduceByKey(lambda x,y: x*y).collect()

[('A', 72), ('B', 56)]

In [137]:
RDDpair3.collect()

[('A', 'Adam'),
 ('A', 'Ada'),
 ('A', 'Adrian'),
 ('B', 'Bonifacy'),
 ('B', 'Barnaba')]

##### aggregateByKey(zeroValue, seqFunc, combFunc)

Rozbudowana wersja `reduceByKey` pozwalająca na zwrócenie wartości o innym typie niż oryginalne. W związku z tym konieczne jest podanie trzech parametrów:
- zeroValue - domyślna wartość neutralna dla agregacji (dodawanie: 0, mnożenie: 1, tworzenie zbioru unikatowych wartości: pusty zbiór, itd.),
- seqFunc - funkcja agregująca wartości w oryginalnym RDD, przyjmuje dwa parametry, gdzie drugi jest włączany (dodawany itp.) do pierwszego
- combFunc - funkcja łącząca wartości uzyskane z pierwszej funkcji dla kluczy

In [138]:
RDDpair3.aggregateByKey(0, (lambda acc,x: acc+len(x)), (lambda acc1,acc2: acc1+acc2)).collect()

[('A', 13), ('B', 15)]

Co się wydarzyło powyżej?

|RDDpair3 | seqFunc | seqFunc out. | combFunc | combFunc out. |
|:--------|:--------|:-------------|:---------|:------------- |
|('A', 'Adam') => | ('A', 0 + len('Adam')) => | ('A', 4) |  |  |
|('A', 'Ada') => | ('A', 0 + len('Ada')) => | ('A', 3) |  |  |
|('A', 'Adrian') => | ('A', 0 + len('Adrian')) => | ('A', 6) => | ('A', 4 + 3 + 6) => | ('A', 13) |
|('B', 'Bonifacy') => | ('B', 0 + len('Bonifacy')) => | ('B', 8) => | ('B', 8 + 7) => | ('B', 15) |
|('B', 'Barnaba') => | ('B', 0 + len('Barnaba')) => | ('B', 7) |  |  |

In [139]:
RDDpair3.aggregateByKey((0.,0.), (lambda acc,x: (acc[0]+len(x),acc[1]+1)), \
                        (lambda acc1,acc2: (acc1[0]+acc2[0],acc1[1]+acc2[1]))).collect()

[('A', (13.0, 3.0)), ('B', (15.0, 2.0))]

In [140]:
RDDpair3.aggregateByKey((0.,0.), (lambda acc,x: (acc[0]+len(x),acc[1]+1)), \
                        (lambda acc1,acc2: (acc1[0]+acc2[0],acc1[1]+acc2[1]))).mapValues(lambda x: x[0]/x[1]).collect()

[('A', 4.333333333333333), ('B', 7.5)]

> **TODO**: Z RDDpair3 uzyskaj RDD (klucz, iloczyn długości imion)

In [147]:
RDDpair3.aggregateByKey(1, (lambda acc,x: acc+len(x)), (lambda acc1,acc2: acc1*acc2)).collect()

[('A', 140), ('B', 16)]

### Akcje

**_UWAGA: `collect`, `take` i ich wariacje to niebezpieczne akcje - mogą doprowadzić do zapchania drivera i przerwania działania aplikacji, zachowaj ostrożność_**

##### collect()

Zwraca elementy zbioru na driver.

In [148]:
RDDlist.collect()

[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]

##### collectAsMap()

Zwraca RDD (K, V) jako słownik.

In [149]:
RDDpair2.collectAsMap()

{'Ada': 3, 'Adam': 4, 'Adrian': 6, 'Barnaba': 7, 'Bonifacy': 8}

##### take(n)

Zwraca `n` pierwszych elementów zbioru na driver.

In [150]:
RDDlist.take(2)

[0, 1]

##### takeSample(withReplacement, num)

Zwraca losową próbę `num` elementów ze zwracaniem lub bez.

In [151]:
RDDlist.takeSample(True,12)

[2, 6, 5, 2, 6, 3, 5, 1, 4, 0, 7, 5]

In [152]:
RDDlist.takeSample(False, 5)

[6, 5, 1, 7, 2]

##### takeOrdered(n, [key])

Zwraca `n` pierwszych elementów zbioru stosując naturalny porządek lub inny wskazany.

In [153]:
RDDlist.takeOrdered(5)

[0, 1, 2, 3, 4]

In [154]:
RDDlist.takeOrdered(5, (lambda x: -x))

[9, 8, 7, 6, 5]

##### first()

Zwraca pierwszy element zbioru. Podobne do `take(1)`.

In [155]:
RDDlist.first()

0

##### count()

Zwraca liczbę elementów w zbiorze.

In [156]:
RDDlist.count()

10

##### sum()

Zwraca sumę elementów w RDD

In [157]:
RDDlist.sum()

45

##### countByKey()

Dla RDD (K, V) zwraca słownik (hashmap) typu (K, Int) z liczbą wystąpień kluczy.

In [158]:
RDDpair4.countByKey()

defaultdict(int, {'A': 3, 'B': 2})

In [159]:
RDDpair4.countByKey()["A"]

3

##### saveAsTextFile(path)

Zapisuje elementy zbioru do pliku (plików) tekstowych w podanym katalogu. Spark wywoła metodę `toString` na każdym elemencie RDD aby przekształcić je na linijkę teksu w pliku.

In [None]:
#RDDlist.saveAsTextFile("...")

##### reduce(func)

Agreguje elementy zbioru wykorzystując podaną funkcję. Funkcja redukująca musi być asocjacyjna [(a x b) x c = a x (b x c)] i przemienna [a x b = b x a].

In [160]:
RDDlist.reduce(lambda x,y: x+y)

45

In [161]:
RDDlist2.reduce(lambda x,y: x+y)

[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]

##### aggregate(zeroValue, seqOp, combOp)

Działa podobnie do `aggregateByKey`

In [165]:
RDDother.aggregate(0, (lambda acc,x: acc + int(x)), (lambda acc1,acc2: acc1 + acc2))

TypeError: aggregate() takes 4 positional arguments but 5 were given

> **TODO**: Po odfiltrowaniu "0" oblicz iloczyn wartości RDDother

In [172]:
RDDother.filter(lambda x: x != "0").aggregate(1, (lambda acc,x: acc * int(x)), (lambda acc1,acc2: acc1 * acc2))

362880

##### max(), mean(), min(), stdev(), variance(), stats()

In [173]:
RDDlist.max()

9

In [174]:
RDDlist.mean()

4.5

In [175]:
RDDlist.min()

0

In [176]:
RDDlist.stdev()

2.8722813232690143

In [177]:
RDDlist.variance()

8.25

In [178]:
RDDlist.stats()

(count: 10, mean: 4.5, stdev: 2.87228132327, max: 9.0, min: 0.0)

##### foreach(func)

 Wykonuje funkcję `func` na każdym elemencie zbioru. Najczęściej wykorzystywane do aktualizowania akumulatorów.

In [179]:
RDDlist.foreach(print)

In [180]:
accum = sc.accumulator(0)

In [181]:
RDDlist.foreach(lambda x: accum.add(x))

In [182]:
accum.value

45

##############################

#### Akumulatory
Zmienne które podlegają aktualizacji poprzez przemienne _[a x b = b x a]_ i asocjacyjne _[(a x b) x c = a x (b x c)]_ funkcje. Wykorzystywane do zliczania i sumowania wartości w celu monitorowania działania aplikacji.

#### Broadcast variable
Pozwalają na umieszczanie na każdej maszynie "skaszowanych" zmiennych tylko do odczytu zamiast wysyłania kopii zmiennych wraz z każdym taskiem. Służą optymalizacji działania programów poprzez zmniejszanie kosztów komunikacji.

In [183]:
bList = sc.broadcast([78,42,13])