![spark](https://www.dataversity.net/wp-content/uploads/2015/06/spark-logo.png)

# Krótkie wprowadzenie

## Spark

Apache Spark to silnik do przetwarzania danych. Umożliwia on przetwarzanie danych w klastrze komputerów. Obecnie jest jednym z najpopularniejszych narzędzi do Big Data. Obsługuje wiele języków programowania (Python, Java, Scala i R).

## Główne zalety

- Szybkość – potrafi być nawet 100 szybszy niż Hadoop MapReduce
- Wsparcie analizy danych – Spark zawiera narzędzia i biblioteki umożliwiające przesyłanie strumieniowe, uczenie maszynowe (MLlib), tworzenie zapytań SQL, grupowanie i łączenie danych

## RDD - Resilient Distributed Dataset

RDD jest rozproszoną kolekcją umieszczoną w pamięci na węzłach przetwarzających. RDD wspiera in-memory processing computation. Na RDD można przeprowadzać operacje transformacji takie jak map, filter czy reduce – w wyniku otrzymujemy nowy RDD zawierający przekształcone dane. Wynika również z tego, że kolekcja RDD jest niezmienna, a dodatkowo operacje tworzące RDD są “leniwe”, to znaczy wykonanie ich w zaprogramowanym łańcuchu następuje dopiero w momencie wykonania kolejnej operacji nie zwracającej RDD.

# Przykłady przetwarzania danych

## Najpierw przygotowujemy środowisko do pracy ze Sparkiem w Pythonie

In [1]:
import findspark
findspark.init()



In [2]:
import pyspark

In [3]:
from pyspark.sql import SparkSession

In [4]:
spark = SparkSession.builder.appName("test").getOrCreate()
sc = spark.sparkContext
sc

## Pierwszy przykłd - kilka operacji na liczbach

Tworzymy listę iczb od 0 do 1 000 000.

In [5]:
nums = list(range(0, 1000001))

Teraz za pomocą funkcji parallelize zapiszemy te dane jako RDD i będziemy już mogli wykonywać na nich operacje
przy użyciu Spark'a.

In [6]:
nums_rdd = sc.parallelize(nums)
nums_rdd

ParallelCollectionRDD[0] at readRDDFromFile at PythonRDD.scala:274

Możemy podejrzeć kilka pierwszych pozycji na naszej liście.

In [7]:
nums_rdd.take(5) # wyświetlamy 5 pierwszych liczb

[0, 1, 2, 3, 4]

Używając funkcji ***map*** możemy wykonywać operacje na elementach listy.

In [8]:
squared_nums_rdd = nums_rdd.map(lambda x: x ** 2) # podnosimy każdy element z listy do kwadratu 
squared_nums_rdd.take(10)

[0, 1, 4, 9, 16, 25, 36, 49, 64, 81]

In [9]:
pairs = squared_nums_rdd.map(lambda x: (x, len(str(x)))) # tworzymy pary obserwacji (liczba, liczba cyfr w liczbie)
pairs.take(15)

[(0, 1),
 (1, 1),
 (4, 1),
 (9, 1),
 (16, 2),
 (25, 2),
 (36, 2),
 (49, 2),
 (64, 2),
 (81, 2),
 (100, 3),
 (121, 3),
 (144, 3),
 (169, 3),
 (196, 3)]

Używając funkcji ***filter*** możemy filtrować nasze dane.

In [10]:
even_pairs = pairs.filter(lambda x: x[1] % 2 == 0) # zostawiamy tylko pary z parzystą liczbą cyfr w liczbie
even_pairs.take(15)

[(16, 2),
 (25, 2),
 (36, 2),
 (49, 2),
 (64, 2),
 (81, 2),
 (1024, 4),
 (1089, 4),
 (1156, 4),
 (1225, 4),
 (1296, 4),
 (1369, 4),
 (1444, 4),
 (1521, 4),
 (1600, 4)]

Pary te traktowane są jako pary **(klucz, wartość)**, chcemy żeby liczba cyfr w liczbie spełniała u nas funkcje klucza,
dlatego zamienimy pozycje w tych parach.

In [11]:
flipped_pairs = even_pairs.map(lambda x: (int(x[1]), x[0])) # zamiana pozycji liczby i liczby cyfr w parze, 
                                                            # teraz liczba cyfr będzie kluczem
flipped_pairs.take(15)

[(2, 16),
 (2, 25),
 (2, 36),
 (2, 49),
 (2, 64),
 (2, 81),
 (4, 1024),
 (4, 1089),
 (4, 1156),
 (4, 1225),
 (4, 1296),
 (4, 1369),
 (4, 1444),
 (4, 1521),
 (4, 1600)]

Z pomocą Sparka możemy w prosty sposób grupować i sortować dane.

Pogrupujemy nasze dane po kluczu wykorzystując funkcję ***groupByKey***.

In [12]:
grouped_pairs = flipped_pairs.groupByKey() # grupujemy dane po naszym kluczu (liczba cyfr)
grouped_pairs.collect()

[(8, <pyspark.resultiterable.ResultIterable at 0x1cb89cbdfd0>),
 (2, <pyspark.resultiterable.ResultIterable at 0x1cb89e08730>),
 (10, <pyspark.resultiterable.ResultIterable at 0x1cb89e08940>),
 (4, <pyspark.resultiterable.ResultIterable at 0x1cb89e08910>),
 (12, <pyspark.resultiterable.ResultIterable at 0x1cb89e089d0>),
 (6, <pyspark.resultiterable.ResultIterable at 0x1cb8e79d070>)]

Jak widać dane zostały pogrupowane, jednak żeby mieć lepszy pogląd na to jak one wyglądają, przekonwertujemy wynik tego grupowania na parę **(klucz, lista liczb)**.

In [13]:
grouped_pairs = grouped_pairs.map(lambda x: (x[0], list(x[1])))

Teraz posiadamy listę par w których pierwsza wartość to liczba cyfr, a druga to lista skłądająca się z n-cyfrowych liczb.

Wyżej widać, że nasza lista nie jest posortowana, więc przed jej wyświetlenie posortujemy wyniki rosnąco używając funkcji ***sortByKey***.

In [14]:
grouped_pairs = grouped_pairs.sortByKey(ascending= True) # sortujemy rosnąco według liczby cyfr w liczbie
grouped_pairs.take(2) # wyświetlamy tylko 2 pierwsz pozycję, ponieważ zajmuje to trochę miejsca

[(2, [16, 25, 36, 49, 64, 81]),
 (4,
  [1024,
   1089,
   1156,
   1225,
   1296,
   1369,
   1444,
   1521,
   1600,
   1681,
   1764,
   1849,
   1936,
   2025,
   2116,
   2209,
   2304,
   2401,
   2500,
   2601,
   2704,
   2809,
   2916,
   3025,
   3136,
   3249,
   3364,
   3481,
   3600,
   3721,
   3844,
   3969,
   4096,
   4225,
   4356,
   4489,
   4624,
   4761,
   4900,
   5041,
   5184,
   5329,
   5476,
   5625,
   5776,
   5929,
   6084,
   6241,
   6400,
   6561,
   6724,
   6889,
   7056,
   7225,
   7396,
   7569,
   7744,
   7921,
   8100,
   8281,
   8464,
   8649,
   8836,
   9025,
   9216,
   9409,
   9604,
   9801])]

Na koniec możemy sprawdzić ile liczb występuje w każdej z naszych kategorii.

In [15]:
how_many_numbers = grouped_pairs.map(lambda x: (x[0], len(x[1]))) # lista par - (liczba cyfr, ile taich liczb po naszych operacjach)
how_many_numbers.collect()

[(2, 6), (4, 68), (6, 683), (8, 6837), (10, 68377), (12, 683772)]

Jak widać po wykonanych przez nas operacjach na naszej liście zostało 6 liczb dwucyfrowych, 68 czterocyfrowych itd.

## Drugi przykład - liczenie słów

Do tego przykładu pobrałem plik tesktowy w języku angielskim.

Wczytujemy nasz tekst.

In [16]:
text = sc.textFile("tekst.txt")

Możemy go teraz wyśietlić.

In [17]:
text.collect()

['word count from Wikipedia the free encyclopedia',
 'the word count is the number of words in a document or passage of text Word counting may be needed when a text',
 'is required to stay within certain numbers of words This may particularly be the case in academia legal',
 'proceedings journalism and advertising Word count is commonly used by translators to determine the price for',
 'the translation job Word counts may also be used to calculate measures of readability and to measure typing',
 'and reading speeds usually in words per minute When converting character counts to words a measure of five or',
 'six characters to a word is generally used Contents Details and variations of definition Software In fiction',
 'In non fiction See also References Sources External links Details and variations of definition',
 'This section does not cite any references or sources Please help improve this section by adding citations to',
 'reliable sources Unsourced material may be challenged and r

Jak widać każda linjka traktowana jest jaka osobny string.

Możemy zliczyć te linijki za pomocą funkcji ***count***.

In [18]:
text.count()

44

Używajac funkcji ***map***, a wewnątrz niej funkcji języka python ***split*** możey rozdzielić pojedyńcze wyrazy.

In [19]:
text2 = text.map(lambda x: x.split("\t"))
text2.take(2)


[['word count from Wikipedia the free encyclopedia'],
 ['the word count is the number of words in a document or passage of text Word counting may be needed when a text']]

In [20]:
words = text2.map(lambda x: x[0].split(" "))
words.take(2)

[['word', 'count', 'from', 'Wikipedia', 'the', 'free', 'encyclopedia'],
 ['the',
  'word',
  'count',
  'is',
  'the',
  'number',
  'of',
  'words',
  'in',
  'a',
  'document',
  'or',
  'passage',
  'of',
  'text',
  'Word',
  'counting',
  'may',
  'be',
  'needed',
  'when',
  'a',
  'text']]

Jak widać wyrazy zostały rozdzielone, jednak zamiast jednej listy wyrazów mamy listę składającą się z list wyrazów, które zostały utworzone dla każdej lnijki tekstu.

Możemy je jednak łatwo "rozpakować" korzystając z funkcji ***flatMap***, ja przy okazji postanowiłem, że nie chcę rozróżniać wyrazów pisanych z małej i wielkiej liter, więc skorzystałem z funkji ***lower***.

In [21]:
listOfWords = words.flatMap(lambda x: x).map(lambda x: x.lower())
listOfWords.take(30)

['word',
 'count',
 'from',
 'wikipedia',
 'the',
 'free',
 'encyclopedia',
 'the',
 'word',
 'count',
 'is',
 'the',
 'number',
 'of',
 'words',
 'in',
 'a',
 'document',
 'or',
 'passage',
 'of',
 'text',
 'word',
 'counting',
 'may',
 'be',
 'needed',
 'when',
 'a',
 'text']

Teraz mamy już jedną listę sładającą się z pojedyńczych słów.

Używając funkcji ***map*** i ***reduceByKey*** możemy policzyć ile razy w tekście wystąpiło każde słowo. Njapierw wykorzystując funkcję ***map*** tworzymy dla każdego słowa parę **(słowo, 1)**. Następnie powstałą listę par grupujmey po kluczu **(słowie)** z wykorzystaniem funkcji ***reduceByKey***, która pozwala nam określić co ma się stać z wartościami do nich przypisanymi (my dodajemy je do siebię, czyli sumujemy wcześniej dodane 1-nki, co daje nam liczbę wystąpień każdego słowa). 

In [22]:
words_frequency = listOfWords.map(lambda x: (x, 1))\
                  .reduceByKey(lambda x,y: x+y)
words_frequency.take(30)

[('count', 10),
 ('wikipedia', 1),
 ('free', 1),
 ('is', 19),
 ('of', 25),
 ('in', 15),
 ('counting', 6),
 ('may', 8),
 ('needed', 1),
 ('when', 3),
 ('certain', 2),
 ('numbers', 1),
 ('this', 4),
 ('particularly', 1),
 ('legal', 1),
 ('used', 4),
 ('price', 1),
 ('job', 1),
 ('counts', 3),
 ('calculate', 1),
 ('readability', 1),
 ('measure', 2),
 ('typing', 1),
 ('speeds', 1),
 ('converting', 1),
 ('characters', 2),
 ('contents', 1),
 ('details', 4),
 ('references', 2),
 ('external', 1)]

Na koniec możemy posortować nasze słowa od największej do najmniejszej liczy wystąpień i wyświetlić pełną listę

Moglibyśmy zamienić miejscami, słowo i liczbę wystąpień tak jak zrobliśmy w poprzednim przykładzie z liczbami, aby wykorzystać funkcję ***sortByKey***. Jednak w tym przpadku chemy, żeby układ był nastęujacy **(słowo, liczba wystąień)**. Wykorzystamy więc funkcję ***SortBy*** i ustawimy ją tak, by sortowała po liczbie wystąpień.

In [23]:
words_frequencyS = words_frequency.sortBy(lambda x: x[1], ascending = False)
words_frequencyS.collect()

[('the', 43),
 ('a', 28),
 ('word', 27),
 ('of', 25),
 ('and', 23),
 ('is', 19),
 ('words', 17),
 ('to', 17),
 ('in', 15),
 ('as', 11),
 ('or', 11),
 ('count', 10),
 ('for', 10),
 ('may', 8),
 ('text', 8),
 ('be', 8),
 ('on', 7),
 ('such', 7),
 ('counting', 6),
 ('rules', 5),
 ('length', 5),
 ('novel', 5),
 ('by', 5),
 ('also', 5),
 ('fiction', 5),
 ('can', 5),
 ('most', 5),
 ('that', 5),
 ('this', 4),
 ('used', 4),
 ('details', 4),
 ('are', 4),
 ('an', 4),
 ('rule', 4),
 ('but', 4),
 ('usually', 4),
 ('software', 4),
 ('which', 4),
 ('processing', 4),
 ('segmentation', 4),
 ('when', 3),
 ('counts', 3),
 ('however', 3),
 ('these', 3),
 ('line', 3),
 ('space', 3),
 ('at', 3),
 ('number', 3),
 ('per', 3),
 ('variations', 3),
 ('definition', 3),
 ('sources', 3),
 ('definitions', 3),
 ('how', 3),
 ('consensus', 3),
 ('were', 3),
 ('often', 3),
 ('novels', 3),
 ('while', 3),
 ('its', 3),
 ('certain', 2),
 ('measure', 2),
 ('characters', 2),
 ('references', 2),
 ('namely', 2),
 ('toward', 2)

Możemy zobaczyć, że najcześciej, czyli 43 razy, występowało w tekście słowo 'the'.

# Autor: Przemysław Robak