# PySpark - wprowadzenie

Notatnik ten ma na celu zapoznanie Cię z podstawami pracy z Apache Spark z wykorzystaniem języka Python. Zawiera wszystkie podstawowe operacje, czyli transformacje, filtrowanie, łączenie, grupowanie i agregację. Służy także jako wprowadzenie do późniejszych ćwiczeń.

## Organizacja notatnika

Notatnik zawiera kilka sekcji, z których każda zawiera krótkie wprowadzenie do określonego tematu, przykładowy kod PySpark i krótkie ćwiczenia, w których możesz bezpośrednio zastosować nowo poznany materiał.

## Wymagania wstępne

Do wykonania ćwiczeń potrzebujesz środowiska Jupyter Notebook z istniejącym kontekstem Sparka (ang. Spark Context). Środowisko już powienieneś mieć przygotowane, bazowane na kontenerach Docker. Ponadto, niektóre dane testowe muszą znajdować się w określonej lokalizacji dostępnej dla środowiska Spark.

## Instalacja Sparka na platformie Colab

----------

In [1]:
#Instalacja Sparka na colabie
!git clone https://github.com/djkormo/colab-examples.git

Cloning into 'colab-examples'...
remote: Enumerating objects: 121, done.[K
remote: Counting objects: 100% (121/121), done.[K
remote: Compressing objects: 100% (85/85), done.[K
remote: Total 121 (delta 44), reused 107 (delta 30), pack-reused 0
Receiving objects: 100% (121/121), 6.84 MiB | 4.70 MiB/s, done.
Resolving deltas: 100% (44/44), done.


In [2]:
!cat colab-examples/spark/install.bash

#!/bin/bash

cd /

#mkdir content

cd content/ 

apt-get install openjdk-8-jdk-headless -qq > /dev/null

wget http://apache.crihan.fr/dist/spark/spark-2.4.4/spark-2.4.4-bin-hadoop2.7.tgz

tar xf spark-2.4.4-bin-hadoop2.7.tgz

pip install -q findspark

pip install spark-nlp


export JAVA_HOME=/usr/lib/jvm/java-8-openjdk-amd64
export SPARK_HOME=/content/spark-2.4.4-bin-hadoop2.7

# testing installation 
#python ./spark-test.py


In [4]:
!bash colab-examples/spark/install.bash

--2019-10-15 06:39:19--  http://apache.crihan.fr/dist/spark/spark-2.4.4/spark-2.4.4-bin-hadoop2.7.tgz
Resolving apache.crihan.fr (apache.crihan.fr)... 195.221.21.36, 2001:660:7401:211::36
Connecting to apache.crihan.fr (apache.crihan.fr)|195.221.21.36|:80... connected.
HTTP request sent, awaiting response... 200 OK
Length: 230091034 (219M) [application/x-gzip]
Saving to: ‘spark-2.4.4-bin-hadoop2.7.tgz.1’


2019-10-15 06:39:41 (10.2 MB/s) - ‘spark-2.4.4-bin-hadoop2.7.tgz.1’ saved [230091034/230091034]



In [6]:
import os
os.environ['JAVA_HOME'] = '/usr/lib/jvm/java-8-openjdk-amd64'
os.environ['SPARK_ME'] = '/content/spark-2.4.4-bin-hadoop2.7'
!python colab-examples/spark/spark-test.py

19/10/15 06:40:12 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
+--------+------+
|  Google| Spark|
+--------+------+
|   Colab| Scala|
|Dataproc|Python|
+--------+------+



In [8]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_ME"] = "/content/spark-2.4.4-bin-hadoop2.7"
# Create a local Spark Session

import findspark
findspark.init("spark-2.4.4-bin-hadoop2.7")
from pyspark.sql import SparkSession

from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local[*]").getOrCreate()

# Test Installation 

df = spark.createDataFrame([{"Google": "Colab","Spark": "Scala"} ,{"Google": "Dataproc","Spark":"Python"}])
df.show()




+--------+------+
|  Google| Spark|
+--------+------+
|   Colab| Scala|
|Dataproc|Python|
+--------+------+



In [0]:
# based on https://www.tutorialspoint.com/pyspark/pyspark_sparkcontext.htm

import os
os.environ['JAVA_HOME'] = '/usr/lib/jvm/java-8-openjdk-amd64'
os.environ['SPARK_ME'] = '/content/spark-2.4.4-bin-hadoop2.7'

import findspark
findspark.init("spark-2.4.4-bin-hadoop2.7")

from pyspark.sql import SparkSession

spark = SparkSession \
    .builder \
    .appName("First App") \
    .master("local[*]")\
    .getOrCreate()



In [0]:
from pyspark import SparkContext
sc = SparkContext.getOrCreate()

In [11]:
print(type(sc))

<class 'pyspark.context.SparkContext'>


## Weryfikacja środowiska, wstępne ustawienia

In [12]:
sc.pythonVer

'3.6'

In [13]:
sc.master

'local[*]'

In [14]:
print(sc.version)

2.4.4


In [0]:
spark.version

'2.4.4'

In [0]:
import matplotlib

In [0]:
matplotlib.__version__

'3.0.3'

In [0]:
# Ustaw katalog bazowy zgodnie z podanymi ustawieniami

basedir = "colab-examples/spark/data/"

In [0]:
!cd colab-examples/ssh/data/


# 1. Odczyt danych

Aby pracować z danymi, musimy najpierw pozyskać do nich dostęp. Spark obsługuje różne formaty plików - w poniższym przykładzie użyjemy formatu CSV. Więcej na temat obsługiwanych formatów plików danych [tutaj](https://databricks.com/blog/2017/02/23/working-complex-data-formats-structured-streaming-apache-spark-2-1.html)

Punktem wyjścia do tworzenia kolejnych obiektów w środowisku Spark jest obiekt o nazwie "spark", który jest dostarczany przez konfigurację środowiska dla obecnego notatnika i jest on gotowy do użycia. Wczytajmy plik CSV zawierający informacje na temat kilku osób. Wczytane dane będą stanowić podstawę dla kolejnych przykładów.

In [0]:
persons = spark.read \
    .option("inferSchema", True) \
    .option("header", True) \
    .csv(basedir + "persons_header.csv")

## 1.1. Sprawdzanie wczytanych danych

Operacja odczytu zwraca obiekt Spark *DataFrame*. Obiekt ten jest podobny do tabeli, zawiera wiersze rekordów, które są zgodne ze wspólnym schematem, z nazwanymi kolumnami i określonymi typami. Na pierwszy rzut oka, koncepcja Spark DataFrame jest zapożyczona z Pandas DataFrame lub R DataFrame, chociaż składnia i wiele operacji mogą zasadniczo się różnić.

W pierwszym kroku chcemy zobaczyć zawartość DataFrame. Można to łatwo zrobić za pomocą metody `show()`.

In [0]:
persons.show()

+---+------+-------+------+
|age|height|   name|   sex|
+---+------+-------+------+
| 23|   156|  Alice|female|
| 21|   181|    Bob|  male|
| 27|   176|Charlie|  male|
| 24|   167|    Eve|female|
| 19|   172|Frances|female|
| 31|   191| George|female|
+---+------+-------+------+



## 1.2. Sprawdzenie schematu danych
Interesujące może być nie tylko bezpośrednie badanie danych, ale także sprawdzenie ich schematu. Schemat zawiera meta informacje o tym, które kolumny są obecne i jakie typy są używane w tych kolumnach. Możesz bezpośrednio pracować z obiektem schematu, używając zmiennej `schema` w DataFrame, lub możesz wyświetlić schemat za pomocą metody `printSchema()`w następujący sposób:

In [0]:
persons.schema[2]

StructField(name,StringType,true)

In [0]:
persons.printSchema()

root
 |-- age: integer (nullable = true)
 |-- height: integer (nullable = true)
 |-- name: string (nullable = true)
 |-- sex: string (nullable = true)



## 1.3. Konwersja to Pandas DataFrame
Spark zapewnia również wygodną metodę konwertowania typu Spark DataFrame do Pandas DataFrame. Jest to przydatne nie tylko przy stosowaniu algorytmów z biblioteki Pandas, ale również w przypadku notatników Jupyter, które mają wbudowaną obsługę wyświetlania obiektów Pandas DataFrame. Dlatego użycie metody `toPandas()` jest preferowane do wyświetlania zawartości DataFrame zamiast korzystania z powyższej metody `show()`.

In [0]:
persons.limit(5).toPandas()

Unnamed: 0,age,height,name,sex
0,23,156,Alice,female
1,21,181,Bob,male
2,27,176,Charlie,male
3,24,167,Eve,female
4,19,172,Frances,female


### Uwaga: uważaj na duże obiekty Spark DataFrame!
Nie zapominaj, że Apache Spark został zaprojektowany i zbudowany w celu obsługi naprawdę dużych zbiorów danych, które nie muszą pasować do pamięci komputera. Spark DataFrames może zawierać miliardy wierszy i jest przechowywany w sposób rozproszony na wielu węzłach w klastrze. W rzeczywistości zawartość nie musi nawet być fizycznie obecna, o ile dane wejściowe są dostępne.

Natomiast wywołanie metody `toPandas()` spowoduje przeniesienie wszystkich rekordów do pamięci jednego komputera (na którym działa notatnik Jupyter) - ale może się zdarzyć, że ten komputer nie ma wystarczającej ilości pamięci do przechowywania wszystkich danych. W takim przypadku ryzykujesz awarię notatnika z błędem braku pamięci (ang. Out-Of-Memory error, OOM). Dlatego powinieneś używać metody `toPandas()` tylko wtedy, gdy masz całkowitą pewność, że obiekt Spark DataFrame zawiera ilość rekordów ograniczoną do rozmiaru pamięci maszyny, na której pracuje notatnik.

## Ćwiczenie 1 

1. Załaduj plik "persons.json". Plik zawiera dokładnie takie same dane jak poprzedni plik, z tą różnicą, że są one zapisane w formacie JSON w miejsce formatu CSV.
2. Spradź schemat danych.
3. Pokaż zawartość pliku
4. Przekonwertuj obiekt Spark DataFrame na Pandas DataFrame

In [0]:
persons_json = spark.read \
    .option("inferSchema", True) \
    .option("header", True) \
    .json(basedir + "persons.json")

In [0]:
persons_json.printSchema()

root
 |-- age: long (nullable = true)
 |-- height: long (nullable = true)
 |-- name: string (nullable = true)
 |-- sex: string (nullable = true)



In [0]:
persons_json.show()

+---+------+-------+------+
|age|height|   name|   sex|
+---+------+-------+------+
| 14|   156|  Alice|female|
| 21|   181|    Bob|  male|
| 27|   176|Charlie|  male|
| 24|   167|    Eve|female|
| 19|   172|Frances|female|
| 31|   191| George|  male|
+---+------+-------+------+



In [0]:
persons_json.toPandas()

Unnamed: 0,age,height,name,sex
0,14,156,Alice,female
1,21,181,Bob,male
2,27,176,Charlie,male
3,24,167,Eve,female
4,19,172,Frances,female
5,31,191,George,male


# 2. Podstawowe transformacje

## 2.1. Rzutowanie (ang. projections)

Najprostszą rzeczą do zrobienia jest utworzenie nowego obiektu DataFrame jako podzbioru dostępnych kolumn z obiektu macierzystego.

Poniższy przykład pokazuje różne sposoby wskazania, które kolumny mają stanowić zawartość nowego obiektu.

In [0]:
from pyspark.sql.functions import *

result = persons.select(persons.name, persons['sex'], col('age'))
result.toPandas()

Unnamed: 0,name,sex,age
0,Alice,female,23
1,Bob,male,21
2,Charlie,male,27
3,Eve,female,24
4,Frances,female,19
5,George,female,31


Jedną z istotnych koncepcji Sparka jest to, że każda transformacja zwróci nowy obiekt typu DataFrame. Oryginalna obiekt DataFrame pozostaje niezmieniony. Spowodowane jest to założeniem architektonicznym Sparka, które upraszcza przetwarzanie równoległe.

## 2.2. Adresowanie kolumn

Spark supports multiple different ways for *addressing* a columns. We just saw one way, but also the following methods are supported for specifying a column:

Spark obsługuje wiele różnych sposobów *adresowania* kolumn. Zostały one użyte w przykładzie powyżej, podsumujmy:

```
df.column_name
df['column_name']
col('column_name')
```

Wszystkie te metody zwracają obiekt `Column`, który jest abstrakcyjnym reprezentantem danych w kolumnie. Jak się wkrótce przekonamy, transformacje można zastosować do `Column` w celu wyprowadzenia nowych wartości.

## Ćwiczenie 2

Użyj wszystkich trzech różnych metod adresowania kolumn i wybierz następujące kolumny:
* name
* age
* height

In [0]:
persons.select(persons.name).show()

+-------+
|   name|
+-------+
|  Alice|
|    Bob|
|Charlie|
|    Eve|
|Frances|
| George|
+-------+



In [0]:
persons.select(persons['age']).show()

+---+
|age|
+---+
| 23|
| 21|
| 27|
| 24|
| 19|
| 31|
+---+



In [0]:
persons.select(col('height')).show()

+------+
|height|
+------+
|   156|
|   181|
|   176|
|   167|
|   172|
|   191|
+------+



## 2.3. Transformacje

Metoda `select()` akceptuje jako argument dowolny *obiekt* typu kolumna (ang. column). Obiekt *column* reprezentuje koncepcyjnie kolumnę w DataFrame. Kolumna może odnosić się bezpośrednio do istniejącej kolumny wejściowej DataFrame lub może reprezentować wynik obliczenia lub transformacji jednej lub wielu kolumn wejściowej DataFrame. Na przykład, jeśli chcemy po prostu zmienić imię (kolumna *name*) na wielkie litery, możemy to zrobić za pomocą funkcji `upper` udostępnianej przez PySpark.

In [0]:
from pyspark.sql.functions import *

result = persons.select(
  upper(persons.name)
)

result.toPandas()

Unnamed: 0,upper(name)
0,ALICE
1,BOB
2,CHARLIE
3,EVE
4,FRANCES
5,GEORGE


Spójrzmy na inny przykład, w którym chcemy stworzyć nowy obiekt DataFrame z odpowiednią formą grzecznościową przed imieniem. Osiągamy to za pomocą metody `select()` i odpowiedniej składni:

In [0]:
from pyspark.sql.functions import *

result = persons.select(concat(when(persons.sex == 'female', "Pani ").otherwise("Pan "), persons.name))
result.toPandas()

Unnamed: 0,"concat(CASE WHEN (sex = female) THEN Pani ELSE Pan END, name)"
0,Pani Alice
1,Pan Bob
2,Pan Charlie
3,Pani Eve
4,Pani Frances
5,Pani George


## 2.4. Pozostałe używane funkcje

Możesz znaleźć pełną listę dostępnych funkcji na [PySpark SQL Module](http://spark.apache.org/docs/latest/api/python/pyspark.sql.html#module-pyspark.sql.functions). Najcześciej używane funkcje:

* [`concat(*cols)`](http://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.functions.concat) - Łączy wiele kolumn wejściowych w jedną kolumnę.
* [`substring(str, pos, len)`](http://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.functions.substring) - Substring() rozpoczyna się od pozycji `pos` i ma długość `len`, gdy `str` jest typem String lub zwraca fragment tablicy bajtów, który zaczyna się od `pos` w bajcie i ma długość `len`, gdy `str` jest typu binarnego.
* [`instr(col,substr)`](http://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.functions.instr) - Zlokalizuj pozycję pierwszego wystąpienia kolumny substr w danym ciągu znaków. Zwraca wartość null, jeśli jeden z argumentów ma wartość null.
* [`locate(substr, str, pos=1)`](http://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.functions.locate) - Lokalizuje pozycję pierwszego wystąpienia argumentu `substr` w kolumnie str, po pozycji `pos`.
* [`length(col)`](http://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.functions.length) - Oblicza długość znaków w ciągach danych lub liczbę bajtów danych binarnych. 
* [`upper(col)`](http://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.functions.upper) - Konwertuje kolumnę zawierającą dane typu string na wielkie litery.
* [`lower(col)`](http://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.functions.lower) - Konwertuje kolumnę zawierającą dane typu string na małe litery.
* [`coalesce(*cols)`](http://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.functions.coalesce) - Zwraca pierwszą kolumnę, która nie jest pusta.
* [`isnull(col)`](http://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.functions.isnull) - Wyrażenie, które zwraca wartość true, jeśli kolumna ma wartość null.
* [`isnan(col)`](http://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.functions.isnan) - Wyrażenie, które zwraca wartość true, jeśli kolumna ma wartość NaN.
* [`hash(cols*)`](http://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.functions.hash) - Oblicza kod skrótu dla podanych kolumn.

Parametry:	
substr – łańcuch znaków
str – koumna typu pyspark.sql.types.StringType
pos – pozycja początkowa (rozpoczynając od zera)

Spark obsługuje również wyrażenia warunkowe, takie jak składnia SQL `CASE WHEN`
* [`when(condition, value)`](http://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.functions.when) - Ocenia listę warunków i zwraca jedno z wielu możliwych wyrażeń wynikowych.

Często są też wymagane specjalne funkcje:
* [`col(str)`](http://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.functions.col) - Zwraca kolumnę na podstawie podanej nazwy kolumny.
* [`lit(val)`](http://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.functions.lit) - Tworzy kolumnę z bezpośrednio podanych wartości.
* [`expr(str)`](http://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.functions.expr) - Parsuje łańcuch znaków wyrażenia w kolumnie, która je reprezentuje.

## 2.5. Funkcje zdefiniowane przez użytkownika
Niestety nie można bezpośrednio użyć standardowych funkcji Pythona do transformacji kolumn obiektu Spark DataFrame. Chociaż PySpark zapewnia już wiele użytecznych funkcji, nie zawsze to wystarcza. Ale na szczęście można *przekonwertować* standardową funkcję Pythona do funkcji PySpark, definiując w ten sposób tzw. *Funkcję zdefiniowaną przez użytkownika* (ang. User Defined Function, UDF).

## 2.6. Definiowanie nowych nazw kolumn
W poprzenim przykładzie, obiekt DataFrame który powstał, ma odpowiedni schemat, ale nazwa kolumny nie jest tym czego chcemy. 
Używając metody `alias()` należącą do obiektu typu `Column`, możesz zmienić nowo-utworzoną kolumnę tak jak można zrobić w SQL `SELECT complex_operation(...) AS nice_name FROM ...`. 

Technicznie rzecz biorąc, określenie nowej nazwy dla wynikowej kolumny nie jest wymagane (jak już widzieliśmy powyżej) - jeśli nazwa nie jest określona, PySpark wygeneruje nazwę z wyrażenia. Ale ponieważ ta wygenerowana nazwa jest raczej długa i zawiera logikę zamiast określenia, zdecydowanie zaleca się jawne podawanie nazwy wynikowej kolumny za pomocą `AS`.

In [0]:
result = persons.select(concat(when(persons.sex == 'male', "Pan ").otherwise("Pani "), persons.name).alias("pelne_imie"))
result.toPandas()

Unnamed: 0,pelne_imie
0,Pani Alice
1,Pan Bob
2,Pan Charlie
3,Pani Eve
4,Pani Frances
5,Pani George


## 2.7. Dodawanie kolumn

Specjalnym wariantem instrukcji `select` jest metoda` withColumn`. Podczas gdy instrukcja `select` wymaga, aby wszystkie kolumny wynikowe były zdefiniowane jako argumenty, metoda` withColumn` zachowuje wszystkie istniejące kolumny i dodaje nową. Ta operacja jest całkiem przydatna, ponieważ w wielu przypadkach nowe kolumny są wyprowadzane z istniejących, a stare nadal powinny być zawarte w wyniku.

Rzućmy okiem na prosty przykład, który dodaje tylko zwrot grzecznościowy jako nową kolumnę:

In [0]:
result = persons.withColumn("salutation", when(persons.sex == 'female', "Pani ").otherwise("Pan "))
result.toPandas()

Unnamed: 0,age,height,name,sex,salutation
0,23,156,Alice,female,Pani
1,21,181,Bob,male,Pan
2,27,176,Charlie,male,Pan
3,24,167,Eve,female,Pani
4,19,172,Frances,female,Pani
5,31,191,George,female,Pani


Jak widać na powyższym przykładzie, `withColumn` zawsze przyjmuje dwa argumenty: pierwszy to nazwa nowej kolumny (i musi to być ciąg znaków), a drugi argument jest wyrażeniem zawierającym logikę do wyznaczenia zawartości nowej kolumny.

## 2.8. Usuwanie kolumn

PySpark obsługuje również operację odwrotną, która po prostu usuwa niektóre kolumny z obiektu DataFrame. Jest to przydatne, np. w przypadku jeśli trzeba usunąć poufne dane przed zapisaniem ich na dysku:

In [0]:
result = persons.drop("name")
result.toPandas()

Unnamed: 0,age,height,sex
0,23,156,female
1,21,181,male
2,27,176,male
3,24,167,female
4,19,172,female
5,31,191,female


## Ćwiczenie 3

Korzystając z obiektu DataFrame pod nazwą `persons`, wykonaj następujące operacje:
* Dodaj nową kolumnę `status`, która powinna zwiarać wartość `dziecko`, jeśli dana osoba jest młodsza niż 18 lat, zaś `dorosły` w innym przypadku
* Dodaj nową kolumną o nazwie `hashed_name` zawierającą wartość hash imienia z kolumny `name`  
* Usuń kolumnę "sex"

In [0]:
result = persons.withColumn("status", when(persons.age >= 18, "dorosły").otherwise("dziecko"))
result.show()

+---+------+-------+------+-------+
|age|height|   name|   sex| status|
+---+------+-------+------+-------+
| 23|   156|  Alice|female|dorosły|
| 21|   181|    Bob|  male|dorosły|
| 27|   176|Charlie|  male|dorosły|
| 24|   167|    Eve|female|dorosły|
| 19|   172|Frances|female|dorosły|
| 31|   191| George|female|dorosły|
+---+------+-------+------+-------+



In [0]:
result = persons.withColumn("hashed_name",hash(persons.name))
result.show()

+---+------+-------+------+-----------+
|age|height|   name|   sex|hashed_name|
+---+------+-------+------+-----------+
| 23|   156|  Alice|female|-1597477876|
| 21|   181|    Bob|  male|  549595493|
| 27|   176|Charlie|  male|  906014075|
| 24|   167|    Eve|female|-1915804868|
| 19|   172|Frances|female| 1222495376|
| 31|   191| George|female| -454693442|
+---+------+-------+------+-----------+



In [0]:
result = persons.drop('sex')
result.show()

+---+------+-------+
|age|height|   name|
+---+------+-------+
| 23|   156|  Alice|
| 21|   181|    Bob|
| 27|   176|Charlie|
| 24|   167|    Eve|
| 19|   172|Frances|
| 31|   191| George|
+---+------+-------+



# 3. Filtrowanie
*Filtrowanie* oznacza proces utrzymywania tylko wierszy spełniających określone kryteria filtrowania. PySpark obsługuje dwa różne podejścia. Pierwsze podejście określa wyrażenie filtrujące jako wyrażenie PySpark przy użyciu kolumn:

In [0]:
result = persons.filter(persons.age > 20)
result.show()

+---+------+-------+------+
|age|height|   name|   sex|
+---+------+-------+------+
| 23|   156|  Alice|female|
| 21|   181|    Bob|  male|
| 27|   176|Charlie|  male|
| 24|   167|    Eve|female|
| 31|   191| George|female|
+---+------+-------+------+



Drugie podejście używa po prostu łańcucha znaków zawierającego wyrażenie SQL:

In [0]:
result = persons.filter("age > 20")
result.show()

+---+------+-------+------+
|age|height|   name|   sex|
+---+------+-------+------+
| 23|   156|  Alice|female|
| 21|   181|    Bob|  male|
| 27|   176|Charlie|  male|
| 24|   167|    Eve|female|
| 31|   191| George|female|
+---+------+-------+------+



Oczywiście możesz również łączyć wiele warunków używając `&` (i) oraz `|` (lub) z pierwszym podejściem lub używając słów SQL `AND` i` OR` w drugim podejściu.

In [0]:
result = persons.filter((persons.age > 20) & (persons.sex == "male"))
result.show()

+---+------+-------+----+
|age|height|   name| sex|
+---+------+-------+----+
| 21|   181|    Bob|male|
| 27|   176|Charlie|male|
+---+------+-------+----+



## Ćwiczenie 4
Wykonaj różne operacje filtrowania:
* Wybierz wszystkie kobiety o wzroście co najmniej 160 cm
* Wybierz wszystkie osoby, które są młodsze niż 20 lat lub starsze niż 30 lat

In [0]:
result = persons.filter((persons.sex == "female") & (persons.height >= 160))
result.show()

+---+------+-------+------+
|age|height|   name|   sex|
+---+------+-------+------+
| 24|   167|    Eve|female|
| 19|   172|Frances|female|
| 31|   191| George|female|
+---+------+-------+------+



In [0]:
result = persons.filter((persons.age < 20) | (persons.age > 30))
result.show()

+---+------+-------+------+
|age|height|   name|   sex|
+---+------+-------+------+
| 19|   172|Frances|female|
| 31|   191| George|female|
+---+------+-------+------+



# 4. Grupowanie i agregacja
Ważną klasą operacji jest grupowanie i agregacja, co jest równoważne instrukcji SQL `SELECT agregacja GROUP BY grupowanie`. W PySpark, grupowanie i agregacja są zawsze wykonywane przez uprzednie tworzenie grup za pomocą `groupBy` , bezpośrednio po tym występuje wyrażenie agregacji wewnątrz metody` agg`. (W rzeczywistości istnieją również pewne predefiniowane agregacje, które mogą być użyte zamiast "agg", ale nie oferują one wymaganej elastyczności).

Zauważ, że w metodzie `agg` należy tylko określić wyrażenie agregacji, kolumny grupowania są automatycznie dodawane przez PySpark do wynikowego obiektu DataFrame.

In [0]:
result = persons.groupBy(persons.sex).\
    agg(avg(persons.age).alias('avg_age'),
        min(persons.height).alias('min_height'),
        max(persons.height).alias('max_height'))
result.toPandas()

Unnamed: 0,sex,avg_age,min_height,max_height
0,female,24.25,156,191
1,male,24.0,176,181


## 4.1. Funkcje agragujące

PySpark obsługuje wiele funkcji agregujących, które można znaleźć w dokumentacji na stronie [PySpark Function Documentation](http://spark.apache.org/docs/latest/api/python/pyspark.sql.html#module-pyspark.sql.functions). Funkcje agregacji są oznaczone w dokumentacji, niestety nie ma jednolitego, zwięzłego ich przeglądu. Wśród typowych funkcji agregujących są na przykład:

* count
* sum
* avg
* stddev
* variance
* corr
* first
* last
* collect_set

## Ćwiczenie 5
Korzystając z DataFrame `persons`, oblicz średni wzrost i liczbę rekordów dla każdej płci.

In [0]:
result = persons.groupBy(persons.sex).\
    agg(avg(persons.height).alias('avg_height'),
        count(persons.height).alias('count'))
result.toPandas()

Unnamed: 0,sex,avg_height,count
0,female,171.5,4
1,male,178.5,2


# 5. Sortowanie

PySpark obsługuje również sortowanie danych za pomocą metody `orderBy`. Na przykład możemy posortować wszystkie osoby według ich imion w następujący sposób:

In [0]:
result = persons.orderBy(persons.name)
result.toPandas()

Unnamed: 0,age,height,name,sex
0,23,156,Alice,female
1,21,181,Bob,male
2,27,176,Charlie,male
3,24,167,Eve,female
4,19,172,Frances,female
5,31,191,George,female


Jeśli nic więcej nie zostanie określone, PySpark będzie sortował rekordy kolumny sortowania w porządku rosnącym. Jeśli potrzebny jest porządek malejący, można to określić, manipulując kolumną sortowania przy użyciu metody `desc()` w następujący sposób:

In [0]:
result = persons.orderBy(persons.age.desc())
result.toPandas()

## Ćwiczenie 6
W ćwiczeniu chcemy najpierw uporządkować wszystkie osoby według ich płci, a następnie ich wieku w kierunku malejącym. Sortowanie według wielu kolumn można łatwo osiągnąć, podając wiele kolumn jako argumenty w metodzie `orderBy`.

In [0]:
result = persons.orderBy(persons.sex, persons.age.desc())
result.toPandas()

Unnamed: 0,age,height,name,sex
0,31,191,George,female
1,24,167,Eve,female
2,23,156,Alice,female
3,19,172,Frances,female
4,27,176,Charlie,male
5,21,181,Bob,male


# 6. Łączenie danych

Każda algebra relacji zawiera również operacje łączenia, które umożliwiają łączenie wielu tabel według pasującego kryterium. PySpark obsługuje również łączenie wielu obiektów typu DataFrames. Aby rzucić na to trochę światła, potrzebujemy drugiego obiektu DataFrame, oprócz `persons`. W tym celu ładujemy dane adresowe w następujący sposób:


In [0]:
addresses = spark.read.json(basedir + "addresses.json")
addresses.toPandas()

Unnamed: 0,city,name
0,Hamburg,Alice
1,Frankfurt,Bob
2,Berlin,Henry


Teraz, gdy mamy obiekt DataFrame `addresses`, chcemy go połączyć z ramką danych `persons` w taki sposób, aby miasto każdej osoby zostało dodane jako nowa kolumna. Osiąga się to za pomocą metody `join()`, która zasadniczo przyjmuje dwa parametry: pierwszy parametr określa drugą DataFrame do połączenia, a drugi parametr określa warunek łączenia. W tym przypadku chcemy połączyć wszystkie rekordy, w których pasuje kolumna `name`.

In [0]:
result = persons.join(addresses,persons.name == addresses.name)
result.toPandas()

Unnamed: 0,age,height,name,sex,city,name.1
0,23,156,Alice,female,Hamburg,Alice
1,21,181,Bob,male,Frankfurt,Bob


Kilka uwag:
* Wynikowy DataFrame zawiera teraz dwie kolumny `name` - jedna pochodzi z DataFrame `persons`, druga `address`. Ponieważ warunek łączenia mógł być bardziej złożony, PySpark nie zakłada, że wszystkie sprzężenia używają po prostu jakiejś wartości kolumny. Na przykład, mogliśmy przekształcić kolumnę w locie, zmieniając nazwę na wielką literę bezpośrednio w warunku łączenia.
* Wynik zawiera tylko osoby, których adres został znaleziony, chociaż oryginalny DF `persons` zawierał więcej osób.
* Nie ma wpisów adresów bez żadnej osoby, mimo że DataFrame `addresses` zawiera informacje o niektórych osobach niedostępnych w DataFrame` persons`.

Możemy łatwo pozbyć się skopiowanej kolumny `name`, wykonując jawny wybór żądanych kolumn lub opuszczając zduplikowane kolumny. Zduplikowane kolumny `name` mogą być adresowane przez ich oryginalny DataFrame nawet po operacji łączenia:

In [0]:
result = persons.join(addresses,persons.name == addresses.name).select(persons.name,persons.age,addresses.city)
result.toPandas()

Unnamed: 0,name,age,city
0,Alice,23,Hamburg
1,Bob,21,Frankfurt


Teraz wyjaśnijmy dwie ostatnie obserwacje. Wynika to z zastosowanego typu łączenia, który był tak zwanym sprzężeniem wewnętrznym (ang. *inner* join). W takim przypadku w wyniku uwzględniane są tylko rekordy zawierające informacje z obu obiektów DataFrame.

Oprócz łączenia *wewnętrznego* PySpark obsługuje również kilka dodatkowych połączeń:
* *łączenie zewnętrzne* będzie zawierać rekordy dla wszystkich elementów z obu obiektów DataFrames. Jeśli lewy lub prawy obiekt DataFrames nie zawiera żadnych informacji, wynik będzie zawierał wartości `None` (= wartość `NULL`) dla odpowiednich kolumn.
* W *prawostronnym łączeniu*, drugim elementem jest DataFrame (prawy DataFrame) określony jako argument i jest elementem wiodącym. Wynik będzie zawierał rekordy dla każdego rekordu z tego obiektu DataFrame.
* W *lewostronnym łączeniu* pierwszy wyspecyfikowany obiekt DataFrame jest elementem wiodącym (lewy obiekt DataFrame). Wynik będzie zawierał rekordy dla każdego rekordu z tej DataFrame.

In [0]:
result = persons.join(addresses,persons.name == addresses.name, how="outer")
result.toPandas()

Unnamed: 0,age,height,name,sex,city,name.1
0,27.0,176.0,Charlie,male,,
1,21.0,181.0,Bob,male,Frankfurt,Bob
2,23.0,156.0,Alice,female,Hamburg,Alice
3,24.0,167.0,Eve,female,,
4,31.0,191.0,George,female,,
5,19.0,172.0,Frances,female,,
6,,,,,Berlin,Henry


In [0]:
result = persons.join(addresses,persons.name == addresses.name, how="right")
result.toPandas()

Unnamed: 0,age,height,name,sex,city,name.1
0,23.0,156.0,Alice,female,Hamburg,Alice
1,21.0,181.0,Bob,male,Frankfurt,Bob
2,,,,,Berlin,Henry


In [0]:
result = persons.join(addresses,persons.name == addresses.name, how="left")
result.toPandas()

Unnamed: 0,age,height,name,sex,city,name.1
0,23,156,Alice,female,Hamburg,Alice
1,21,181,Bob,male,Frankfurt,Bob
2,27,176,Charlie,male,,
3,24,167,Eve,female,,
4,19,172,Frances,female,,
5,31,191,George,female,,


## Ćwiczenie 7

Do ćwiczenia używamy innego DataFrame załadowanego z pliku o nazwie `lastnames.json`, który można ponownie połączyć z obiektem `persons`:

In [0]:
lastnames = spark.read.json(basedir + "lastnames.json")
lastnames.toPandas()

Unnamed: 0,last_name,name
0,Liddell,Alice
1,Baumeister,Bob
2,Gates,Bob


Następnie łączymy DataFrame `lastnames` do DataFrame `persons` dla każdego przypadku, gdy dopasowana jest kolumna `name` dla obu DataFrames. Zwróć uwagę, co się dzieje, ponieważ mamy dwa nazwiska dla imienia `Bob`

In [0]:
result = persons.join(lastnames,lastnames.name == persons.name, how="inner")
result.toPandas()

Unnamed: 0,age,height,name,sex,last_name,name.1
0,23,156,Alice,female,Liddell,Alice
1,21,181,Bob,male,Gates,Bob
2,21,181,Bob,male,Baumeister,Bob


# Co dalej?

Właśnie przejrzeliśmy najważniejsze i typowe operacje wykonywane za pomocą PySpark. Dalej prześledzimy więcej szczegółów związanych z tymi operacjami i przerobimy następujące tematy:
* Więcej na temat RDD
* Jak wygląda plan wykonania
* User Defined Functions (UDFs), Vectorized Pandas User Defined Functions
* Zapis danych - formaty plików
* Zastosowanie języka SQL
* Metody uczenia maszynowego
* Wstęp do głębokiego uczenia
* ...

In [0]:
#... przykład wywołania planu wykonania ... 
result.explain(True)

== Parsed Logical Plan ==
Join Inner, (name#484 = name#12)
:- Relation[age#10,height#11,name#12,sex#13] csv
+- Relation[last_name#483,name#484] json

== Analyzed Logical Plan ==
age: int, height: int, name: string, sex: string, last_name: string, name: string
Join Inner, (name#484 = name#12)
:- Relation[age#10,height#11,name#12,sex#13] csv
+- Relation[last_name#483,name#484] json

== Optimized Logical Plan ==
Join Inner, (name#484 = name#12)
:- Filter isnotnull(name#12)
:  +- Relation[age#10,height#11,name#12,sex#13] csv
+- Filter isnotnull(name#484)
   +- Relation[last_name#483,name#484] json

== Physical Plan ==
*(2) BroadcastHashJoin [name#12], [name#484], Inner, BuildRight
:- *(2) Project [age#10, height#11, name#12, sex#13]
:  +- *(2) Filter isnotnull(name#12)
:     +- *(2) FileScan csv [age#10,height#11,name#12,sex#13] Batched: false, Format: CSV, Location: InMemoryFileIndex[file:/content/colab-examples/spark/data/persons_header.csv], PartitionFilters: [], PushedFilters: [IsNotNu