# Warsztaty Apache Spark - 19.04.2016

In [6]:
import os
# to jest ścieżka w VM
os.getcwd()

'/home/vagrant'

In [7]:
# lista plików i folderów 
os.listdir('.') 

['.bash_logout',
 'spark_notebook.py',
 'spark_tutorial.ipynb',
 '.cache',
 'LingaroSparkSQL.ipynb',
 '.ipynb_checkpoints',
 '.bashrc',
 'spark_mooc_version',
 '.config',
 'data',
 '.ipython',
 '.ssh',
 '.profile']

## **Rozpoczęcie pracy**
Na tej wirtualnej maszynie jest już wszystko skonfigurowane i `SparkContext` jest już zaimportowany i utworzony jako `sc`. Jednak gdyby się okazało, że sami musimy skonfigurować Sparka (gdy nie działamy lokalnie, tylko chcemy się połączyć z klastrem) to robimy to następująco:

Importujemy wymagane pakiety (klasy)
```python
from pyspark import SparkContext, SparkConf
```
Określamy konfiguracje Sparka, które zawierają informacje o aplikacji. `appName` to nazwa aplikacji, która będzie widoczna w UI, `master` to url do klastra. 

```python
conf = SparkConf().setAppName(appName).setMaster(master)
sc = SparkContext(conf=conf)
```
U nas wszystko jest już skonfigurowane.

## **Test**
`SparkContext` to klasa, w której utworzony obiekt mówi Sparkowi jak dostać się do klastra. My mamy już obiekt tej klasy automatycznie utworzony pod nazwą `sc`.

In [10]:
# sprawdzamy typ (klasę) obiektu sc
type(sc)

pyspark.context.SparkContext

In [13]:
# atrybuty obiektu sc
dir(sc)

['PACKAGE_EXTENSIONS',
 '__class__',
 '__delattr__',
 '__dict__',
 '__doc__',
 '__enter__',
 '__exit__',
 '__format__',
 '__getattribute__',
 '__getnewargs__',
 '__hash__',
 '__init__',
 '__module__',
 '__new__',
 '__reduce__',
 '__reduce_ex__',
 '__repr__',
 '__setattr__',
 '__sizeof__',
 '__str__',
 '__subclasshook__',
 '__weakref__',
 '_accumulatorServer',
 '_active_spark_context',
 '_batchSize',
 '_callsite',
 '_checkpointFile',
 '_conf',
 '_dictToJavaMap',
 '_do_init',
 '_ensure_initialized',
 '_gateway',
 '_getJavaStorageLevel',
 '_initialize_context',
 '_javaAccumulator',
 '_jsc',
 '_jvm',
 '_lock',
 '_next_accum_id',
 '_pickled_broadcast_vars',
 '_python_includes',
 '_temp_dir',
 '_unbatched_serializer',
 'accumulator',
 'addFile',
 'addPyFile',
 'appName',
 'binaryFiles',
 'binaryRecords',
 'broadcast',
 'cancelAllJobs',
 'cancelJobGroup',
 'clearFiles',
 'defaultMinPartitions',
 'defaultParallelism',
 'dump_profiles',
 'environment',
 'getLocalProperty',
 'hadoopFile',
 'hado

Nas interesować będą tylko dwie metody - `parallelize` oraz `textFile`. Służą one do tworzenia RDD odpowiednio poprzez rozproszenie istniejącego już pliku lub wczytania pliku z zewnętrznych zasobów.

## **Tworzenie prostego RDD**
Na początek utworzymy RDD wykorzystując istniejący już obiekt (listę pythonową). Do tego służy nam metoda `parallelize`. Możemy przeczytać o niej w pomocy.
Zbiór jest kopiowany i przetwarzany na postać RDD. W ten sposób możemy na nim wykonywać operacje równolegle.

In [14]:
help(sc.parallelize)

Help on method parallelize in module pyspark.context:

parallelize(self, c, numSlices=None) method of pyspark.context.SparkContext instance
    Distribute a local Python collection to form an RDD. Using xrange
    is recommended if the input represents a range for performance.
    
    >>> sc.parallelize([0, 2, 3, 4, 6], 5).glom().collect()
    [[0], [2], [3], [4], [6]]
    >>> sc.parallelize(xrange(0, 6, 2), 5).glom().collect()
    [[], [0], [], [2], [4]]



Algument `c` to liczba partycji, na jakie ma zostać podzielony zbiór. Jeżeli go nie podamy Spark zrobi to automatycznie.

In [2]:
numberList = [12, 45, 9, 8, 66, 34, 89, 56, 2,  8]

In [3]:
numberRDD = sc.parallelize(numberList)

In [24]:
# nie widać co tam jest...
print numberRDD

ParallelCollectionRDD[1] at parallelize at PythonRDD.scala:392


In [17]:
# sprawdzamy typ
type(numberRDD)

pyspark.rdd.RDD

In [21]:
# lista atrybutów dla RDD - dużo akcji i transformacji
dir(numberRDD)

['__add__',
 '__class__',
 '__delattr__',
 '__dict__',
 '__doc__',
 '__format__',
 '__getattribute__',
 '__getnewargs__',
 '__hash__',
 '__init__',
 '__module__',
 '__new__',
 '__reduce__',
 '__reduce_ex__',
 '__repr__',
 '__setattr__',
 '__sizeof__',
 '__str__',
 '__subclasshook__',
 '__weakref__',
 '_computeFractionForSampleSize',
 '_defaultReducePartitions',
 '_id',
 '_jrdd',
 '_jrdd_deserializer',
 '_pickled',
 '_reserialize',
 '_to_java_object_rdd',
 'aggregate',
 'aggregateByKey',
 'cache',
 'cartesian',
 'checkpoint',
 'coalesce',
 'cogroup',
 'collect',
 'collectAsMap',
 'combineByKey',
 'context',
 'count',
 'countApprox',
 'countApproxDistinct',
 'countByKey',
 'countByValue',
 'ctx',
 'distinct',
 'filter',
 'first',
 'flatMap',
 'flatMapValues',
 'fold',
 'foldByKey',
 'foreach',
 'foreachPartition',
 'fullOuterJoin',
 'getCheckpointFile',
 'getNumPartitions',
 'getStorageLevel',
 'glom',
 'groupBy',
 'groupByKey',
 'groupWith',
 'histogram',
 'id',
 'intersection',
 'isChe

## **Akcje i transformacje**
Akcje to operacje, które zwracają nam wynik, tzn. zwraca obiekt innego typu niż RDD. Transformacje jak sama nazwa wskazuje są operacjami, które tworzą nowe RDD przekształcając już istniejące. Transformacje nie są wykonywane dopóki nie muszą, to znaczy gdy nie następuje po nich akcja.

Żeby zobaczyć wynik transformacji musimy wykonać akcję. Może się to wydawać nieintuicyjne, ale to ma sens. Na przykład wczytujemy plik, przefiltrujemy linijmy wg danego kryterium i chcemy zwrócić pierwszą linijkę wyniku. Spark nie wczyta więc nawet całego pliku, nie będzie filtrował wszystkich linijek, zamiast tego będzie wczytywał po linijce i filtrował dopóki nie znajdzie pierwszej linii, która odpowiada warunkowi i ją zwróci po czym skończy obliczenia.

## Akcje

### collect()
Zwraca *wszystkie* elementy zbioru jako tablicę. 

In [27]:
numberRDD.collect()

[12, 45, 9, 8, 66, 34, 89, 56, 2, 8]

In [29]:
# lista pythonowa!
type(numberRDD.collect())

list

Czyli wykonujemy sobie na RDD różne operacje, szybko i równolegle. Gdy chcemy spowrotem mieć zbiór jako "zwyczajny" obiekt żeby do zapisać na dysk itd. to wtedy robimy akcję `collect`.

### take(num), first()
Analogicznie jak `collect` z tym że teraz nie zwracamy wszystkiego ale odpwiednio kilka pierwszych wierszy lub pojedynczy pierwszy lub ostatni.

In [34]:
numberRDD.take(3)

[12, 45, 9]

In [36]:
# jeżeli podamy za dużą liczbę to zwróci wszystkie
numberRDD.take(100)

[12, 45, 9, 8, 66, 34, 89, 56, 2, 8]

In [40]:
numberRDD.first()

12

### takeSample(withReplacement, num, [seed]), takeOrdered(n, [ordering)
`takeSample()` zwraca losową próbę z kolekcji. Pierwszy argument to indykator czy losować ze zwracaniem czy bez, drugi to wielkośc próby, trzeci (nie obligatoryjny) to ziarno.

`takeOrdered()` wzraca $n$ (gdzie $n$ to argument) pierwszych elementów uporządkowanych wg naturalnego porządku. Jako drugi argument możemy podać funkcję porządkową (jeżeli chcemy mieć inny porządek).

In [69]:
numberRDD.takeSample(False, 3)

[2, 34, 56]

In [68]:
# zwykłe sortowanie
numberRDD.takeOrdered(4)

[2, 8, 8, 9]

In [74]:
# porządek malejący - trochę nieintuicyjnie
numberRDD.takeOrdered(5, lambda x: -x)

[89, 66, 56, 45, 34]

### count()
Zwraca ilość elementów, które są w kolekcji.

In [58]:
numberRDD.count()

10

### reduce(func)
Agreguje elementy kolekcji względem funkcji danej jako argument (funkcja przyjmuje dwa argumenty a zwraca jedną wartość). Mamy raczej bardzo ograniczoną liczbę funkcji - muszą to być funkcje agregujące.

In [76]:
# suma elementów
numberRDD.reduce(lambda a, b: a + b)

329

Sprawdzamy poprawność operując na pythonowej liście.

In [62]:
sum(numberList)

329

## Transformacje

### map(func)
Zwraca nowe RDD utworzone poprzez wykonanie na każdym elemencie kolekcji funkcji danej w argumencie.

In [42]:
def add_two(x):
    return x + 2

In [44]:
numberRDD_addtwo = numberRDD.map(add_two)

In [45]:
numberRDD_addtwo.collect()

[14, 47, 11, 10, 68, 36, 91, 58, 4, 10]

Możemy też zastosować funkcje lambda.

**Wskazówka** - akcje i transformacje możemy wykonywać łańcuchowo.

In [48]:
numberRDD.map(lambda x: x * 2).collect()

[24, 90, 18, 16, 132, 68, 178, 112, 4, 16]

### flatMap(func)
Działa podobnie jak `map()`, ale funkcja dane w argumencie może zwracać np. listę zamiast pojedynczej wartości. Wtedy wyniki są "spłaszczane", tzn. scalane w jedną liste.

In [91]:
def one_two_three(x):
    return [x, x * 2, x * 3]

In [94]:
numberRDD.flatMap(one_two_three).take(6)

[12, 24, 36, 45, 90, 135]

### union(otherDataset), intersection(otherDataset)
Funkcje zwracają odpowiednio sumę i przekrój RDD i innego RDD podanego jako argument.

In [100]:
numberRDD.union(sc.parallelize(["ala", "ma", "kota"])).collect()

[12, 45, 9, 8, 66, 34, 89, 56, 2, 8, 'ala', 'ma', 'kota']

In [101]:
numberRDD.intersection(sc.parallelize([12,8,33])).collect()

[8, 12]

### filter(func)
Zwraca przefiltowany zbiór danych, tzn. tylko elementy, dla których wykonana na nich funkcja w argumencie zwraca wartość `True`.

In [51]:
# tylko nieparzyste liczby
numberRDD.filter(lambda x: x%2 != 0).collect()

[45, 9, 89]

**Zadanie** Zwróć tylko liczby dwucyfrowe

In [56]:
numberRDD.filter(lambda x: len(str(x)) == 2).collect()

[12, 45, 66, 34, 89, 56]

In [57]:
numberRDD.count()

10

### distinct()
Zwraca RDD bez duplikatów.

In [102]:
# 8 powtarza się dwa razy
numberRDD.collect()

[12, 45, 9, 8, 66, 34, 89, 56, 2, 8]

In [103]:
numberRDD.distinct().collect()

[66, 8, 9, 2, 12, 34, 45, 56, 89]

## **Caching (persisting)**
Wcześniej wspomnieliśmy, że RDD są leniwie transformowane, tzn. transformacja nie wykona się dopóki nie wykonamy po niej akcji. Może sę jednak zdarzyć, że RDD utworzone po transformacji chcemy użyć kilkukrotnie (raz od razu wykonać np. akcję `collect()` a za drugim dokonać po tej transformacji jeszcze kilka i dopiero potem wykonać akcję).

Spark zrobi więc tak: za każdym razem gdy wywołamy akcję będzie wykonywał obliczenia związane z daną transformacją. To może być szczególnie niewydajne przy iteratywnych algorytmach. Co możemy zrobić? Żeby uniknąć wielokrotnych obliczeń tego samego, wykonujemy operaję `cache()` lub `persist()`.

In [5]:
# zostanie wykonana tylko raz i gdy trzeba będzie jej użyć ponownie będzie już "policzona"
numberRDD_distinct = numberRDD.distinct().cache()

## **Pary klucz-wartość**

Transformacje powyżej są zupełnie podstawowe. Teraz zajmiemy się RDD, które zawierają pary (klucz, wartość), na których możemy wykonywać szereg dodatkowych transformacji (sortowanie i grupowanie po kluczu lub wartość itd.)

Popularna jest np. ekstrakcja z RDD części określającej klucz, np. customerID, czas zdarzenia...) i użycie jej jako klucz. W ten sposób możemy wykonywać operacje np. na zgrupowanych wg klucza danych.

In [7]:
wordList = ["ala", "ma", "kota", "kot", "ma", "ale"]

In [8]:
wordRDD = sc.parallelize(wordList)

Utworzymy sobie pair RDD, tzn. RDD, w których każdy element jest kropką postaci $(klucz, wartość)$. W poniższym przykładzie utworzymy RDD, które dla każdego słowa zwróci krotkę $(słowo, 1)$.


** Zadanie ** Utwórz wordPairsRDD, które jest przekształceniem wordRDD, w taki sposób, ze każdemu słowu przypisuje krotkę $(słowo, 1)$.

In [9]:
wordPairsRDD = wordRDD.map(lambda x: (x, 1))

In [10]:
wordPairsRDD.collect()

[('ala', 1), ('ma', 1), ('kota', 1), ('kot', 1), ('ma', 1), ('ale', 1)]

Teraz możemy policzyć ile razy dane słowo pojawiło się w RDD. Mamy kilka sposobów żeby to zrobić.

### groupByKey()
Dla pary $(K, V)$ zwraca $(K, iterable<V>)$, gdzie $iterable<V>$ to obiekt, po którym można iterować utworzony z wartości dla danego klucza.

In [11]:
wordGroupedRDD = wordPairsRDD.groupByKey()

In [12]:
wordGroupedRDD.collect()

[('ala', <pyspark.resultiterable.ResultIterable at 0xb0f41aec>),
 ('ale', <pyspark.resultiterable.ResultIterable at 0xb0f417ec>),
 ('ma', <pyspark.resultiterable.ResultIterable at 0xb0f41f2c>),
 ('kot', <pyspark.resultiterable.ResultIterable at 0xb0f418cc>),
 ('kota', <pyspark.resultiterable.ResultIterable at 0xb0f417cc>)]

Powyżej mamy na razie tylko definicję, że to jest obiekt po którym można iterować. Jak to wyświetlić?

In [110]:
for key, value in wordGroupedRDD.collect():
    print key, list(value) # z obiektu iterującego roimy listę

ala [1]
ale [1]
ma [1, 1]
kot [1]
kota [1]


** Zadanie ** Korzystając z `wordGroupedRDD` utworzyć RDD w postaci kolekcji krotek (słowo, suma wystąpień).

In [111]:
wordCountRDD = wordGroupedRDD.map(lambda (k, v): (k, sum(v)))

In [112]:
wordCountRDD.collect()

[('ala', 1), ('ale', 1), ('ma', 2), ('kot', 1), ('kota', 1)]

### reduceByKey(func)
Dla pary $(K, V)$ zwraca zbiór par $(K, V')$, gdzie $V'$ to zbiór powstały poprzez transformacje $V$ funkcją agregacji daną w argumencie.

In [113]:
wordPairsRDD.reduceByKey(lambda x, y: x + y).collect()

[('ala', 1), ('ale', 1), ('ma', 2), ('kot', 1), ('kota', 1)]

### sortByKey([ascending])
Zwraca RDD, które jest posortowane wg klucza.

In [114]:
wordCountRDD.sortByKey().collect()

[('ala', 1), ('ale', 1), ('kot', 1), ('kota', 1), ('ma', 2)]

In [115]:
# w porządku malejącym
wordCountRDD.sortByKey(ascending = False).collect()

[('ma', 2), ('kota', 1), ('kot', 1), ('ale', 1), ('ala', 1)]

** Zadanie ** Policz liczbę unikalnych słów w wordRDD. Można korzystać z innych już utworzonych RDD i wcześniej poznanych transformacji.

In [116]:
# najlepiej skorzystać z wordCountRDD, bo mamy zliczoną ilośc wystąpień danego słowa i zastosować filter
wordCountRDD.filter(lambda (x, y): y == 1).count()

4

## Analiza pliku tekstowego
Za pomoca poznanych funkcji będziemy analizować Sonety Szekspira znajdującego się w pliku `shakespeare.txt`. Najpierw musimy utworzyć z pliku tekstowego RDD. Do tego służy nam  metoda SparkCntext `textFile()`.

In [42]:
import os
fileDir = os.path.join('data', 'cs100', 'lab1', 'shakespeare.txt')
print fileDir

data/cs100/lab1/shakespeare.txt


In [43]:
shakeRDD = sc.textFile(fileDir)

In [44]:
type(shakeRDD)

pyspark.rdd.RDD

In [45]:
# sprawdzamy w jaki sposób RDD zostało utworzone
shakeRDD.take(15)

[u'1609',
 u'',
 u'THE SONNETS',
 u'',
 u'by William Shakespeare',
 u'',
 u'',
 u'',
 u'                     1',
 u'  From fairest creatures we desire increase,',
 u"  That thereby beauty's rose might never die,",
 u'  But as the riper should by time decease,',
 u'  His tender heir might bear his memory:',
 u'  But thou contracted to thine own bright eyes,',
 u"  Feed'st thy light's flame with self-substantial fuel,"]

** Zadanie ** Policz ile linii ma utwór.

In [46]:
shakeRDD.count()

122395

** Zadanie ** Policz ile niepustych linii ma utwór i nadpisz tak uwtorzone RDD do `shakeRDD`.

In [47]:
shakeRDD.filter(lambda x: x != '').count()

112902

In [48]:
shakeRDD = shakeRDD.filter(lambda x: x != '')

In [49]:
# sprawdzamy
shakeRDD.take(15)

[u'1609',
 u'THE SONNETS',
 u'by William Shakespeare',
 u'                     1',
 u'  From fairest creatures we desire increase,',
 u"  That thereby beauty's rose might never die,",
 u'  But as the riper should by time decease,',
 u'  His tender heir might bear his memory:',
 u'  But thou contracted to thine own bright eyes,',
 u"  Feed'st thy light's flame with self-substantial fuel,",
 u'  Making a famine where abundance lies,',
 u'  Thy self thy foe, to thy sweet self too cruel:',
 u"  Thou that art now the world's fresh ornament,",
 u'  And only herald to the gaudy spring,',
 u'  Within thine own bud buriest thy content,']

** Zadanie ** Usuń znaki interpunkcyjne, sprowadź tekst do małych liter i podziel zbiór na słowa.

In [50]:
import string
print string.punctuation # wszystkie znaki interpunkcyjnev

!"#$%&'()*+,-./:;<=>?@[\]^_`{|}~


In [51]:
def remove_punct(text):
    for char in string.punctuation:
        text = text.replace(char, "")
    return text

In [52]:
shakeWordsRDD = shakeRDD.map(remove_punct)\
                        .map(lambda x: x.lower())\
                        .map(lambda x : x.split())\
                        .flatMap(lambda x: x)# lista list ze slowami -> jedna duża lista ze słowami

In [53]:
shakeWordsRDD.take(20)

[u'1609',
 u'the',
 u'sonnets',
 u'by',
 u'william',
 u'shakespeare',
 u'1',
 u'from',
 u'fairest',
 u'creatures',
 u'we',
 u'desire',
 u'increase',
 u'that',
 u'thereby',
 u'beautys',
 u'rose',
 u'might',
 u'never',
 u'die']

** Zadanie ** Stwórz RDD, które będzie zawierało krotki $(słowo, liczba\_wystąpień)$.

In [54]:
shakePairsRDD = shakeWordsRDD.map(lambda x: (x, 1)).reduceByKey(lambda x, y: x + y)

** Zadanie ** Uporządkuj słowa wg ilości wystąpień, wyświetl pierwsze 20.

In [55]:
shakePairsRDD.takeOrdered(20, lambda x: -x[1])

[(u'the', 27361),
 (u'and', 26028),
 (u'i', 20681),
 (u'to', 19150),
 (u'of', 17463),
 (u'a', 14593),
 (u'you', 13615),
 (u'my', 12481),
 (u'in', 10956),
 (u'that', 10890),
 (u'is', 9134),
 (u'not', 8497),
 (u'with', 7771),
 (u'me', 7769),
 (u'it', 7678),
 (u'for', 7558),
 (u'be', 6857),
 (u'his', 6857),
 (u'your', 6655),
 (u'this', 6602)]

In [56]:
# drugi sposób, jednak brzydki, bo co gdy nowe klucze (czyli liczba wystąpień) nie jest unikalna?
# musimy tymczasowo zamienić miejscami klucze i wartości 
shakeSortedRDD = shakePairsRDD.map(lambda (x, y): (y, x)).sortByKey(ascending = False).map(lambda (x, y): (y, x))

In [57]:
shakeSortedRDD.take(20)

[(u'the', 27361),
 (u'and', 26028),
 (u'i', 20681),
 (u'to', 19150),
 (u'of', 17463),
 (u'a', 14593),
 (u'you', 13615),
 (u'my', 12481),
 (u'in', 10956),
 (u'that', 10890),
 (u'is', 9134),
 (u'not', 8497),
 (u'with', 7771),
 (u'me', 7769),
 (u'it', 7678),
 (u'for', 7558),
 (u'be', 6857),
 (u'his', 6857),
 (u'your', 6655),
 (u'this', 6602)]

** Zadanie ** Do krotki dołącz trzecią współrzędną, ktora jest równa `True` jeżeli słowo jest stopwordem i `False` w przeciwnym przypadku.

In [58]:
# wczytajmy najpierw plik zawierający stopwordsy
import os
fileDir = os.path.join('data', 'cs100', 'lab3', 'stopwords.txt')
print fileDir

data/cs100/lab3/stopwords.txt


In [59]:
stopwords = sc.textFile(fileDir).collect()

In [60]:
shakeStopRDD = shakeSortedRDD.map(lambda (x, y): (x, y, True) if x in stopwords else (x, y, False))

In [61]:
shakeStopRDD.take(15)

[(u'the', 27361, True),
 (u'and', 26028, True),
 (u'i', 20681, True),
 (u'to', 19150, True),
 (u'of', 17463, True),
 (u'a', 14593, True),
 (u'you', 13615, True),
 (u'my', 12481, True),
 (u'in', 10956, True),
 (u'that', 10890, True),
 (u'is', 9134, True),
 (u'not', 8497, True),
 (u'with', 7771, True),
 (u'me', 7769, True),
 (u'it', 7678, True)]

In [62]:
# bez funkcji lambda
def third_coord(x):
    if x[0] in stopwords:
        return (x[0], x[1], True)
    else:
        return (x[0], x[1], False)

In [63]:
shakeSortedRDD.map(third_coord).take(15)

[(u'the', 27361, True),
 (u'and', 26028, True),
 (u'i', 20681, True),
 (u'to', 19150, True),
 (u'of', 17463, True),
 (u'a', 14593, True),
 (u'you', 13615, True),
 (u'my', 12481, True),
 (u'in', 10956, True),
 (u'that', 10890, True),
 (u'is', 9134, True),
 (u'not', 8497, True),
 (u'with', 7771, True),
 (u'me', 7769, True),
 (u'it', 7678, True)]

** Zadanie ** Policz ile procent wszystkich słów to stopwordsy:
- procent uniklanych stopwordsów wśród uniklanych słów
- procent jako suma wszystkich stopwordsow/suma wszyskich slow

In [64]:
shakeStopRDD.filter(lambda (x, y, z): z == True).count()/shakeStopRDD.count() # python2

0

In [65]:
float(shakeStopRDD.filter(lambda (x, y, z): z == True).count())/shakeStopRDD.count() 

0.004512026148435002

In [66]:
#drugie
ileStopwords = shakeStopRDD.filter(lambda (x, y, z): z == True).map(lambda (x, y, z): y).reduce(lambda x, y: x + y)

In [67]:
ileAll = shakeStopRDD.map(lambda (x, y, z): y).reduce(lambda x, y: x + y)

In [68]:
float(ileStopwords)/ileAll

0.46270198279493907

** Zadanie ** Usuń stopwordsy i wyświetl jeszcze raz 20 najpopularniejszych i najrzadszych słów.

In [69]:
shakeWithoutStopRDD = shakeStopRDD.filter(lambda (x, y, z): z == False).map(lambda (x, y, z): (x, y))

In [70]:
shakeWithoutStopRDD.take(20)

[(u'thou', 5485),
 (u'thy', 4032),
 (u'shall', 3591),
 (u'thee', 3178),
 (u'lord', 3059),
 (u'king', 2861),
 (u'good', 2812),
 (u'sir', 2754),
 (u'o', 2607),
 (u'come', 2507),
 (u'well', 2462),
 (u'would', 2293),
 (u'let', 2099),
 (u'enter', 2098),
 (u'love', 2053),
 (u'ill', 1972),
 (u'hath', 1941),
 (u'man', 1835),
 (u'one', 1779),
 (u'go', 1733)]

In [71]:
shakeWithoutStopRDD.takeOrdered(20, key = lambda x: x[1])

[(u'redeemst', 1),
 (u'chameleons', 1),
 (u'offendeth', 1),
 (u'beadsmen', 1),
 (u'opener', 1),
 (u'swoopstake', 1),
 (u'slothful', 1),
 (u'appropriation', 1),
 (u'selfreproving', 1),
 (u'sooty', 1),
 (u'roundwombd', 1),
 (u'maythat', 1),
 (u'paphos', 1),
 (u'razeth', 1),
 (u'committst', 1),
 (u'sunbeams', 1),
 (u'china', 1),
 (u'climbed', 1),
 (u'stringless', 1),
 (u'ycleped', 1)]

## ** Spark SQL **
Spark SQL zapewnia dwie podstawowe funkcjonalności:
1. Wprowadza pojęcie `DataFrame`, który to obiekt ułatwia pracowanie z ustrukturyzowanymi zbiorami. `DataFrame` możemy utożsamiać z tabelą w bazie danych.
2. Ma możliwosć czytania danych z różnych źródeł i formatów (JSON, Hive Tables, przekształcenie obiektu pythonowego)

Żeby rozpocząć pracę z SQL w Sparku trzeba utworzyć `SQLContext`. `SQLContext` opakowuje `SparkContext`, dzięki temu mamy dodatkowe funkcje, które pozwalają nam pracować z ustrukturyzowanymi danymi. Możemy utworzyć też `HQLContext`, którego funkcjonalność jest rozszerzona (możemy korzystać z HiveSQ i czytać tabelki Hive).

Za pomocą `SQLContext` możemy utworzyć DataFrame, czyli obiekt na którym dalej będzie wykonywać operacje SQL.

In [1]:
from pyspark.sql import SQLContext
sqlContext = SQLContext(sc)

### Tworzenie DataFrame
`DataFrame` zawiera obiekty RDD typu `Row`, każdy z nich reprezentuje rekord. `DataFrames` przechowują dane efektywnie niż "zwykłe RDD", zatem jeżeli znamy schemat danych to warto ich używać. Ponadto, ramki zapewniają nowe operacje niedostępne pod zwykłym RDD, takie jak np. zapytania SQL.

#### moviesDF

In [72]:
import os
moviesPath = os.path.join("data", "cs100", "lab4", "small", "movies.dat")

In [73]:
# najpierw tworzymy sobie zwyczajnie RDD
moviesRDD = sc.textFile(moviesPath)
# każdy wiersz to string, komórki oddzielone ciągiem znaków "::"
moviesRDD.take(5)

[u"1::Toy Story (1995)::Animation|Children's|Comedy",
 u"2::Jumanji (1995)::Adventure|Children's|Fantasy",
 u'3::Grumpier Old Men (1995)::Comedy|Romance',
 u'4::Waiting to Exhale (1995)::Comedy|Drama',
 u'5::Father of the Bride Part II (1995)::Comedy']

In [74]:
# podzielmy najpierw każdy wiersz na 3 części względem "::"
moviesRDD = moviesRDD.map(lambda x: x.split("::"))

In [75]:
moviesRDD.take(5)

[[u'1', u'Toy Story (1995)', u"Animation|Children's|Comedy"],
 [u'2', u'Jumanji (1995)', u"Adventure|Children's|Fantasy"],
 [u'3', u'Grumpier Old Men (1995)', u'Comedy|Romance'],
 [u'4', u'Waiting to Exhale (1995)', u'Comedy|Drama'],
 [u'5', u'Father of the Bride Part II (1995)', u'Comedy']]

In [76]:
# importujemy Row i przekształcamy każdy wiersz na obiket typu Row - przy okazji zmieniamy typ danych na odpowiedni
from pyspark.sql import Row
moviesRows = moviesRDD.map(lambda x: Row(movieid = int(x[0]), title = x[1], genre = x[2]))

In [77]:
# to jest wciąż RDD
moviesRows.take(5)

[Row(genre=u"Animation|Children's|Comedy", movieid=1, title=u'Toy Story (1995)'),
 Row(genre=u"Adventure|Children's|Fantasy", movieid=2, title=u'Jumanji (1995)'),
 Row(genre=u'Comedy|Romance', movieid=3, title=u'Grumpier Old Men (1995)'),
 Row(genre=u'Comedy|Drama', movieid=4, title=u'Waiting to Exhale (1995)'),
 Row(genre=u'Comedy', movieid=5, title=u'Father of the Bride Part II (1995)')]

In [78]:
# wreszcie tworzymy DataFrame
moviesDF = sqlContext.createDataFrame(moviesRows)

In [79]:
moviesDF.show(5)

genre                movieid title               
Animation|Childre... 1       Toy Story (1995)    
Adventure|Childre... 2       Jumanji (1995)      
Comedy|Romance       3       Grumpier Old Men ...
Comedy|Drama         4       Waiting to Exhale...
Comedy               5       Father of the Bri...


In [80]:
type(moviesDF)

pyspark.sql.dataframe.DataFrame

#### ratingsDF

** Zadanie ** Stwórz DataFrame z pliku ratings.

In [81]:
ratingsPath = os.path.join("data", "cs100", "lab4", "small", "ratings.dat.gz")

In [82]:
# plik ratings.dat jest spakowany, więc musimy go wczytać następująco
import gzip
gzf = gzip.GzipFile(ratingsPath);
ratingsList = gzf.readlines();
gzf.close();

In [83]:
# podpowiedz - nazwy kolumn to userid, movieid, rating, timestamp
ratingsList[0:3]

['1::1193::5::978300760\n', '1::661::3::978302109\n', '1::914::3::978301968\n']

In [84]:
# ratingsList jest listą (istniejący już obiekt więc korzystamy z parallelize
ratingsRDD = sc.parallelize(ratingsList)

In [85]:
ratingsRDD.take(5)

['1::1193::5::978300760\n',
 '1::661::3::978302109\n',
 '1::914::3::978301968\n',
 '1::3408::4::978300275\n',
 '1::2355::5::978824291\n']

In [86]:
ratingsRDD = ratingsRDD.map(lambda x: x.split("::"))

In [87]:
ratingsRDD.take(5)

[['1', '1193', '5', '978300760\n'],
 ['1', '661', '3', '978302109\n'],
 ['1', '914', '3', '978301968\n'],
 ['1', '3408', '4', '978300275\n'],
 ['1', '2355', '5', '978824291\n']]

In [88]:
# odrzucamy ostatnią kolumnę, gdyż nie jest nam potrzebna
ratingsRows = ratingsRDD.map(lambda x: Row(userid = int(x[0]), movieid = int(x[1]), rating = float(x[2])))

In [89]:
ratingsRows.take(5)

[Row(movieid=1193, rating=5.0, userid=1),
 Row(movieid=661, rating=3.0, userid=1),
 Row(movieid=914, rating=3.0, userid=1),
 Row(movieid=3408, rating=4.0, userid=1),
 Row(movieid=2355, rating=5.0, userid=1)]

In [90]:
ratingsDF = sqlContext.createDataFrame(ratingsRows)

In [91]:
ratingsDF.show(5)

movieid rating userid
1193    5.0    1     
661     3.0    1     
914     3.0    1     
3408    4.0    1     
2355    5.0    1     


### Zapytania SQL
Operacje na DataFrame możemy wykonywać na dwa sposoby (analogicznie jak uczyliśmy się na zajęciach z Pythona), tzn. korzystając z zapytania SQL podanego jako string lub z metod, które mają ramki danych.

Żeby korzystać z języka SQL musimy "zarejestrować" tabelkę. Natomiat jeżeli chcemy korzystać z metod Data Frame nie musimy tego robić.

In [92]:
moviesDF.registerTempTable("movies")
ratingsDF.registerTempTable("ratings")

In [93]:
# test
sqlContext.sql('SELECT COUNT(*) FROM ratings').show()

_c0   
487650


In [94]:
sqlContext.sql('SELECT COUNT(*) FROM movies').show()

_c0 
3883


### select()

In [171]:
moviesDF.select("movieid").show(5)

movieid
1      
2      
3      
4      
5      


In [172]:
moviesDF.select("movieid", "title").show(5)

movieid title               
1       Toy Story (1995)    
2       Jumanji (1995)      
3       Grumpier Old Men ...
4       Waiting to Exhale...
5       Father of the Bri...


In [173]:
moviesDF.select("*").show(5)

genre                movieid title               
Animation|Childre... 1       Toy Story (1995)    
Adventure|Childre... 2       Jumanji (1995)      
Comedy|Romance       3       Grumpier Old Men ...
Comedy|Drama         4       Waiting to Exhale...
Comedy               5       Father of the Bri...


In [174]:
# jeżeli chcemy wybrać np. movieid ale jakos go zmodyfikować, wtedy piszemy następująco
moviesDF.select(moviesDF["movieid"] + 1, "title").show(5)

(movieid + 1) title               
2             Toy Story (1995)    
3             Jumanji (1995)      
4             Grumpier Old Men ...
5             Waiting to Exhale...
6             Father of the Bri...


### filter()

In [175]:
ratingsDF.filter(ratingsDF["rating"] >= 4.5).show(5)

movieid rating userid
1193    5.0    1     
2355    5.0    1     
1287    5.0    1     
2804    5.0    1     
595     5.0    1     


In [176]:
# oczywiście operacje możemy wkonywać łańcuchowo
ratingsDF.filter(ratingsDF["rating"] >= 4.5).select("movieid").show(5)

movieid
1193   
2355   
1287   
2804   
595    


### groupBy()
Po tym musi nastąpić agregacja np. `min()`, `max()`, `mean()` czy `count()`.

In [177]:
# po groupBy musimy użyć jakiejś metody agregującej
ratingsDF.groupBy("rating").count().show()

rating count 
1.0    27472 
3.0    127216
5.0    108545
4.0    170579
2.0    53838 


### distinct()

In [185]:
moviesDF.select("genre").distinct().show()

genre               
Action|Adventure|...
Action|Adventure|...
Action|Drama|Thri...
Animation|Childre...
Action|Adventure|...
Adventure|Animati...
Action|Drama        
Comedy|Horror|Mus...
Comedy|Documentary  
Action|Adventure|...
Comedy|Drama        
Children's|Comedy...
Children's|Fantas...
Comedy|Crime|Fantasy
Crime|Film-Noir     
Action|Comedy|Drama 
Adventure|Comedy    
Adventure|Fantasy...
Fantasy             
Adventure|Comedy|...


### orderBy()

In [187]:
moviesDF.orderBy("title").limit(10).show()

genre                movieid title               
Children's|Comedy    2031    $1,000,000 Duck (...
Drama                3112    'Night Mother (1986)
Drama|Romance        779     'Til There Was Yo...
Comedy               2072    'burbs, The (1989)  
Drama|Thriller       3420    ...And Justice fo...
Romance              889     1-900 (1994)        
Comedy|Romance       2572    10 Things I Hate ...
Animation|Children's 2085    101 Dalmatians (1...
Children's|Comedy    1367    101 Dalmatians (1...
Drama                1203    12 Angry Men (1957) 


### map()
Zwraca RDD poprzez zastosowanie do każdego wiersza funkcji dane w argumencie.

In [102]:
# tylko pierwszy gatunek filmu
# to jest znów zwykłe RDD, więc musielibyśmy utworzyć z tego nową ramkę danych
moviesDF.map(lambda x: (x.movieid, x.genre.split("|")[0])).take(5)

[(1, u'Animation'),
 (2, u'Adventure'),
 (3, u'Comedy'),
 (4, u'Comedy'),
 (5, u'Comedy')]

**Zadanie** Wykonaj te same operacje co w SQL za pomocą metod dla `DataFrame`.

In [107]:
# liczba filmów oznaczone jako dramat (tylko jeden gatunek)
sqlContext.sql("select COUNT(*) from movies where genre=='Drama'").show()

_c0
843


In [109]:
moviesDF.filter("genre = 'Drama'").count()

843L

In [110]:
moviesDF.filter(moviesDF.genre == 'Drama').count()

843L

In [14]:
# policz ile jest filmów, które są oznaczone jako dramat (moze być jeszcze jakiś inny gatunek)
sqlContext.sql("select COUNT(*) from movies where genre like '%Drama%' ").show()

c0  
1603


In [105]:
moviesDF.filter("genre like '%Drama%'").count()

1603L

**Zadanie** Korzystając z tabeli ratingsDF pogrupuj filmy wg moviesID i policz srednia ocen

In [120]:
resultSQL = sqlContext.sql("SELECT movieID, avg(rating) AS avg_rating FROM ratings GROUP BY movieID")

In [121]:
resultSQL.show(5)

movieID avg_rating        
3031    2.0869565217391304
831     3.7               
631     2.0               
1031    3.4615384615384617
2631    1.0               


In [None]:
# za pomocą metod DataFrame

In [126]:
avg_rating = ratingsDF.groupBy("movieid").mean("rating")

In [127]:
avg_rating.show(5)

movieid AVG(rating)       
3031    2.0869565217391304
831     3.7               
631     2.0               
1031    3.4615384615384617
2631    1.0               


## join()

**Zadanie** Połącz tabelkę ze średnimi ocenami z tytułami filmów.

In [129]:
joinedDF = avg_rating.join(moviesDF, moviesDF.movieid == ratingsDF.movieid)

In [130]:
joinedDF.show(10)

movieid AVG(rating)        genre                movieid title               
3031    2.0869565217391304 Comedy               3031    Repossessed (1990)  
831     3.7                Drama                831     Stonewall (1995)    
631     2.0                Animation|Childre... 631     All Dogs Go to He...
1031    3.4615384615384617 Adventure|Childre... 1031    Bedknobs and Broo...
2631    1.0                Comedy|Film-Noir|... 2631    Frogs for Snakes ...
31      3.2419354838709675 Drama                31      Dangerous Minds (...
2031    3.15               Children's|Comedy    2031    $1,000,000 Duck (...
3431    2.3333333333333335 Action|Drama         3431    Death Wish II (1982)
231     3.1651090342679127 Comedy               231     Dumb & Dumber (1994)
2431    3.123222748815166  Comedy|Drama         2431    Patch Adams (1998)  


In [135]:
joinedDF.select("title", "AVG(rating)", "genre").show(10)

title                AVG(rating)        genre               
Repossessed (1990)   2.0869565217391304 Comedy              
Stonewall (1995)     3.7                Drama               
All Dogs Go to He... 2.0                Animation|Childre...
Bedknobs and Broo... 3.4615384615384617 Adventure|Childre...
Frogs for Snakes ... 1.0                Comedy|Film-Noir|...
Dangerous Minds (... 3.2419354838709675 Drama               
$1,000,000 Duck (... 3.15               Children's|Comedy   
Death Wish II (1982) 2.3333333333333335 Action|Drama        
Dumb & Dumber (1994) 3.1651090342679127 Comedy              
Patch Adams (1998)   3.123222748815166  Comedy|Drama        
