#  Wprowadzenie

W notebook'ach zdefiniowana jest zmienna `spark` typu `SparkSession`, która będzie punktem startowym naszej pracy ze Sparkiem. Tego typu konwencja nazewnicza jest bardzo częsta.

In [0]:
spark

# Jak tworzymy Dataframe?
Pierwszym sposobem jest użycie Spark SQL:

In [0]:
uam_offers = spark.sql("select * from uam_offers")

W powyższym przypadku prościej będzie jednak skorzystać z metody `table`:

In [0]:
uam_offers = spark.table("uam_offers")

Metoda `sql` przyjmuje dowolne zapytanie Spark SQL. Np.:

In [0]:
spark.sql("show create table uam_offers").collect()

Możemy również odczytać dane bezpośrednio z plików:

In [0]:
location = "dbfs:/user/hive/warehouse/uam_offers"
uam_offers_from_file = spark.read.format("delta").option("path", location).load()

Drugi sposób wymaga od nas znajomości formatu pliku (w typ przypadku `delta`) oraz ścieżki.


Kolejnym sposobem jest wykorzystanie metody `createDataFrame`:

In [0]:
# Wersja 1: 
# Pierwszy argument: lista tupli
# Drugi (opcjonalny) argument: lista string (nazwy kolumn)
spark.createDataFrame([(1, 2), (11, 22)], ['col1', 'col2'])

In [0]:
# Wersja 2:
# Wykorzystujemy klasę Row, która pozwala nam nazwać odpowiednio kolumny:
from pyspark.sql import Row
spark.createDataFrame([Row(col1 = 1, col2 = 2), Row(col1 = 11, col2 = 22)])

Można korzystać również z źródeł JDBC lub innych dedykowanych bibliotek (MongoDB, Elastic, Cassandra i wiele więcej).


Warto wspomnieć, że metoda `createDataFrame` umożliwia również tworzenie `DataFrame` na bazie `RDD`:

In [0]:
from pyspark.sql import Row
my_rdd = spark.sparkContext.parallelize([Row(col1 = 1, col2 = 2), Row(col1 = 11, col2 = 22)])
df = spark.createDataFrame(my_rdd)

Można oczywiście dokonać konwersji w drugą stronę:

In [0]:
df.rdd

Ćwiczenie


Utwórzmy DataFrame'y *uam_orders* i *uam_categories* analogicznie jak *uam_offers*

Rozwiązanie

In [0]:
uam_orders = spark.table("uam_orders")
uam_categories = spark.table("uam_categories")

Ćwiczenie

Jak utworzyć DF za pomocą metody `createDataFrame` tak, aby zostały nadane domyślne nazwy atrybutów? W jakiej są postaci? Czy nazwy tych atrybutów są czytelne?

Podpowiedź: dokumentację funkcji można sprawdzić dodając `?` na jej końcu (np. `spark.createDataFrame?`).

In [0]:
spark.createDataFrame?

Rozwiązanie

In [0]:
spark.createDataFrame([(1, 2), (11, 22)])

#Odczyt metadanych

DF posiada następujące metody i atrybuty umożliwiające eksplorację metadanych:
1. `printSchema()` - wyświetla schemat danych 
2. `describe(*cols)` - zwraca DF ze statystykami kolumn
3. `dtypes` - zwraca listę par nazwa kolumny, typ kolumny

Klasa `SparkSession` (w naszym przypadku zmienna `spark`) zawiera atrybut `catalog`:

spark.catalog - zwraca obiekt pozwalającym eksplorować metadane schematów i tabel

In [0]:
uam_offers.printSchema()

In [0]:
uam_offers.describe('offer_id', 'seller_id').show(truncate=False)

In [0]:
uam_offers.dtypes

In [0]:
spark.catalog.listDatabases()

In [0]:
spark.catalog.listTables()

In [0]:
spark.catalog.listColumns('uam_categories', 'default')

W notebookach Databricks możemy również łatwo zmienić interpreter na SQL:

In [0]:
%sql
show tables

# SELECT

Należy zwrócić uwagę, która funkcja zwraca kolumny z przekształcanego DataFrame'a oraz nowe, a która zwraca tylko modyfikowane lub wylistowane kolumny.

1. `uam_categories.select('*')` - wybór wszystkich kolumn (pytanie: czy ma to sens?)
2. `uam_categories.select('category_id', 'category_level1')`  - wybór podzbioru kolumn
3. `uam_categories.select(uam_categories.category_id)` - jak wyżej
4. `uam_categories.select(uam_categories.category_id.alias('id'))` - aliasowanie kolumn
5. `uam_categories.selectExpr('category_id as id', '2*1 as const')` - wyrażenie SQL’owe w klauzuli SELECT
6. `from pyspark.sql.functions import lit; uam_categories.select(lit(2).alias('const'))` - przykład funkcji lit - generowanie stałych
7. `uam_categories.withColumn('const', lit(2))` - dodawanie nowych kolumn
8. `uam_categories.withColumnRenamed('category_id', 'id')` - aliasowanie kolumn - sposób nr 2
9. `uam_categories.drop('category_level3')` - usuwanie kolumn

Niektóre funkcje mogą wymagać przekazania argumentu jako kolumny. Może to być problematyczne np. po operacji złączenia. Rozwiązaniem jest funkcja `col`:

```
from pyspark.sql.functions import col
uam_categories.select(col('category_level1'))
```
Funkcja `col` przyda się w późniejszych przykładach

Zauważmy, że powyższe metody transformują `DataFrame` na inny `DataFrame` bez zwracania wyników (tzw. transformacja). 

Poniższe metody natomiast powodują zwrócenie wyników do `driver`'a (tzw. akcja):
1. `show(n=20, truncate=True, vertical=False)` - wyświetla wyniki na ekran (`n` - liczba rekordów, `truncate` - czy zawijać wiersze, `vertical` - jeśli `True`, to każda kolumna jest wyświetlana w osobny wierszu)
2. `collect()` - zwraca listę `Row`
3. `display(input)` - działa podobnie jak `show`, ale wyniki są wyświetlane w sformatowanej tabeli 

Istnieje oczywiście szereg akcji powodujących zapis danych - umówimy je później

Samodzielne ćwiczenia

Przetestujmy różne polecenia związane z odczytem danych i ich wyświetlaniem poniżej

In [0]:
# Przykładowo:
uam_categories.select('*').limit(5).show()
uam_categories.select(uam_categories.category_id).limit(5).show()

In [0]:
uam_categories.select('*').limit(5).collect()

In [0]:
display(
  uam_categories.select('*').limit(5)
)

## Funkcje wbudowane

Funkcje są podobne (tj. nazwy, argumenty, działanie) jak w przypadku Spark SQL 

Przykłady znajdują się poniżej:

In [0]:
# tworzymy DF z jedną kolumną i wierszem
from pyspark.sql.functions import *
from pyspark import Row
df = spark.createDataFrame([Row(col='X')])


In [0]:
# funkcje operujące na datach
display(
    df.select(
        current_timestamp(),
        current_date(),
        add_months(lit("2021-01-01"), 1),
        date_add(current_date(), 12),
        datediff(current_date(), lit("2021-01-01")),
    )
)

In [0]:
display(
  df.select(lit('1').cast('int'))
)

Należy zwracać uwagę, kiedy funkcja wymaga podania kolumny albo argumentu innego typu. 

Funkcja `expr` jest podobna do funkcji `selectExpr(col*)`. Obydwie umożliwiają do korzystania z funkcji w sposób taki jak korzysta się z nich w Spark SQL. Dobrym przykładem jest `reflect`, którego brakuje w module `pyspark.sql.functions`

In [0]:
display(df.select(expr("reflect('java.util.Currency', 'getAvailableCurrencies')")))

Przejdźmy do bardziej skomplikowanych zastosowań - spróbujemy sparsować dane w formacie JSON, które znajdują się w kolumnie typu String:

Utwórzmy DF z jednym rekordem  i kolumną o nazwie *json*. Korzystając z funkcji *from_json* utworzymy 
z kolumny zawierającej tekst kolumnę sparsowaną (tj. zgodną z podanym schematem):

In [0]:
from pyspark import Row
from pyspark.sql.functions import from_json
from pyspark.sql.types import *

simple_json = """
{
  "items": [
    {
      "firstItemPrice": {
        "amount": "6.00",
        "currency": "PLN"
      }
    },
    {
      "firstItemPrice": {
        "amount": "8.00",
        "currency": "PLN"
      }
    }
  ]
}
"""
simple_json_df = spark.createDataFrame([Row(json=simple_json)])

In [0]:
# tworzymy schemat dla ceny:
price_schema = StructType(
    [StructField("amount", DecimalType(12, 2)), StructField("currency", StringType())]
)

# schemat dla ceny pojedynczej pozycji
single_item_schema = StructType([StructField("firstItemPrice", price_schema)])

# schemat dla zbioru różnych pozycji
schema = StructType([StructField("items", ArrayType(single_item_schema))])

# gdy mamy gotowy schemat możemy sparsować nasz rekord:
simple_json_df.select(from_json(simple_json_df.json, schema).alias("parsed_json"))

Możemy również wygenerować automatycznie schemat dla danych w formacie JSON. W tym celu musimy:

In [0]:
# przekształcić dane do postaci RDD[string]
as_rdd = simple_json_df.rdd.map( lambda r: r[0])

# skrzystać z metody json
spark.read.json(as_rdd)

Schemat możemy również wykorzystać tworząc DF przy wykorzystaniu metody `SparkSession.createDataFrame(...)`

In [0]:
schema = StructType(
  [StructField("StringField", StringType()), 
   StructField("IntField", IntegerType())])
spark.createDataFrame([("val1", 1), ("val2", 2)], schema).printSchema()

Ćwiczenia

Dana jest zmienna tekstowa w formacie JSON (zob. poniżej). Utwórz DF z jednym rekordem i kolumną o nazwie json. Z cennika dostaw pobierz cenę dostawy jednej sztuki z dowolnej metody dostawy.

In [0]:
json="""
{
  "eventTime": "2018-03-14T03:25:24.516Z",
  "priceList": {
    "shippingRates": null,
    "items": [
      {
        "deliveryMethod": {
          "id": "773167b1-feec-4ae9-b20f-1ed8ccb7b1ed",
          "qeppoId": 6
        },
        "firstItemPrice": {
          "amount": "6.00",
          "currency": "PLN"
        },
        "nextItemPrice": {
          "amount": "0.00",
          "currency": "PLN"
        },
        "packageSize": 5,
        "shippingTime": null
      },
      {
        "deliveryMethod": {
          "id": "758fcd59-fbfa-4453-ae07-4800d72c2ca5",
          "qeppoId": 8
        },
        "firstItemPrice": {
          "amount": "8.00",
          "currency": "PLN"
        },
        "nextItemPrice": {
          "amount": "0.00",
          "currency": "PLN"
        },
        "packageSize": 5,
        "shippingTime": null
      },
      {
        "deliveryMethod": {
          "id": "7203cb90-864c-4cda-bf08-dc883f0c78ad",
          "qeppoId": 9
        },
        "firstItemPrice": {
          "amount": "10.00",
          "currency": "PLN"
        },
        "nextItemPrice": {
          "amount": "0.00",
          "currency": "PLN"
        },
        "packageSize": 10,
        "shippingTime": null
      },
      {
        "deliveryMethod": {
          "id": "845efe05-0c96-47c3-a8cb-aa4699c158ce",
          "qeppoId": 11
        },
        "firstItemPrice": {
          "amount": "12.00",
          "currency": "PLN"
        },
        "nextItemPrice": {
          "amount": "0.00",
          "currency": "PLN"
        },
        "packageSize": 10,
        "shippingTime": null
      }
    ],
    "location": {
      "city": "Internet",
      "state": "zachodniopomorskie",
      "postcode": "00-000",
      "country": "PL"
    },
    "sendingAbroad": false,
    "estimatedShippingTime": "PT0S",
    "additionalInfo": "",
    "id": "5459a18b-4da0-45ef-a910-6472700bde06",
    "lastModified": "2018-03-14T04:25:24.515+01:00",
    "flags": {
      "freeReturn": false,
      "freeDelivery": false,
      "useCostPerWeight": false
    },
    "weight": null,
    "templateId": null,
    "shippingCost": {
      "lowest": {
        "amount": "6.00",
        "currency": "PLN"
      },
      "freeDelivery": false
    }
  },
  "updatedPaths": []
}
"""

In [0]:
#miejsce na rozwiązanie
#podpowiedź: skorzystać z funkcji  `get_json_object`
spark.sql("desc function get_json_object").show(truncate=False)

Rozwiązanie

In [0]:
from pyspark import Row
from pyspark.sql.functions import get_json_object

df = spark.createDataFrame([Row(json=json)])
display(
    df.select(get_json_object(df.json, "$.priceList.items[0].firstItemPrice.amount"))
)

Ćwiczenie

Wyświetl liczbę atrybutów dla kilku przykładowych ofert.

In [0]:
# miejsce na rozwiązanie
# podpowiedź: skorzystać z funkcji  `size`
display(spark.sql("desc function size"))

Rozwiązanie

In [0]:
uam_offers.select(uam_offers.offer_id,  size(uam_offers.attributes)).show(truncate=False)

Przykład `explode`

Znajdź wszystkie oferty (identyfikator, nazwę oraz typ). Każdy typ powinien się znaleźć w nowym wierszu

In [0]:
from pyspark.sql.functions import explode

display(
    uam_offers.select(
        explode(uam_offers.types).alias("types_"),
        uam_offers.offer_id,
        uam_offers.offer_name,
    )
)

In [0]:
TODO - tu skończyłem
TODO - popraw wszystkie `show`

## User Defined Function (UDF)

W przypadku, gdy nie ma możliwości skorzystania z funkcji wbudowanych możemy rejestrować własne. Gdy będziemy je wykorzystywać w Spark SQL możemy to zrobić na dwa sposoby:

In [0]:
# Bez określenia zwracanego typu:
def total(price, item_quantity):
    return price * item_quantity

spark.udf.register("total_udf", total)
spark.sql("select total_udf(1, 2)").show()

In [0]:
# z określeniem zwracanego typu:
from pyspark.sql.types import DecimalType

spark.udf.register("total_udf_typed", total, DecimalType(12, 2))
spark.sql("select total_udf_typed(1, 2)").show()

Przykładowe zapytania:

In [0]:
total_query = "select total_udf(unit_price, quantity) from uam_orders"
spark.sql(total_query).show()

In [0]:
total_typed_query = "select total_udf_typed( cast(unit_price as decimal(12, 2)), cast(quantity as decimal(12, 2))) from uam_orders"
spark.sql(total_typed_query).show()

Ale to nie zadziała zgodnie z oczekiwaniem:

In [0]:
total_typed_query2 = "select total_udf_typed(unit_price,  quantity), price, quantity from uam_orders"
spark.sql(total_typed_query2).show()

Różnicę widać dopiero po wykonaniu `printSchema`

Dla poprawnego wywołania zadziałała niejawna konwersja double->decimal (dot. *quantity*)

W przypadku zapytania, gdzie wynik jest niezgodny z oczekiwanym, mamy mnożenie dwóch liczb typu `double`. Oczekujemy, że funkcja zwróci `decimal`, ale zwraca `double`, co jest konwertowane na wartość `null`.

Jeśli korzystamy z DataFrame API, to mamy następujące możliwości w kwestii UDF:

In [0]:
# 1 
from pyspark.sql.functions import udf
def total(price, item_quantity):
    return price * item_quantity

total_typed_udf = udf(total, DecimalType(12,2))

In [0]:
#2 dekorator udf
from pyspark.sql.functions import udf

@udf('decimal(12,2)')
def total_typed_udf(price, item_quantity):
    return price * item_quantity 

W obydwu przypadkach możemy pominąć typ zwracany przez UDF:

`total_udf = udf(total)`

Lub w wersji z dekoratorem:

```
@udf
def total_udf(price, item_quantity):
    return price * item_quantity * 1.0
```

Ostatecznie możemy skorzystać analogicznie jak z innych funkcji:

In [0]:
uam_orders.select(total_typed_udf(uam_orders.price.cast('decimal(12,2)'), uam_orders.quantity.cast('decimal(12,2)'))).show()

UDF - przykład

Napisz funkcję, która dla 3 poziomów drzewa kategorii utworzy jedną nazwę. 

W przypadku, gdy kategoria na danym poziomie nie jest wypełniona należy użyć nazwy kategorii na wyższym poziomie. Jeśli kategoria w ogóle nie jest wypełniona, to można wpisać dowolny ciąg znaków. Przykłady:

*Elektronika - RTV i AGD - RTV i AGD*

*Elektronika - Elektronika - Elektronika*

*Brak nazwy - Brak nazwy - Brak nazwy*

*Kolekcje i sztuka - Kolekcje - Pocztówki *

In [0]:
@udf
def format_category_tree(cat1,  cat2, cat3):
    cat1_or_default = cat1 or "Brak nazwy"
    cat2_or_default = cat2 or cat1_or_default
    cat3_or_default = cat3 or cat2_or_default
    return cat1_or_default + " - " + cat2_or_default + " - " + cat3_or_default

uam_categories.select(format_category_tree(uam_categories.category_level1, uam_categories.category_level2, uam_categories.category_level3)).show(truncate=False)


Zadanie

Zamień ciągi znaków w nazwach ofert tak aby były z wielkich liter:
1) Skorzystaj z funkcji wbudowanych
2) Stwórz UDF'a

In [0]:
from pyspark.sql.functions import upper
uam_offers.select(upper(uam_offers.offer_name)).show()

In [0]:
from pyspark.sql.functions import udf

@udf
def to_upper(s: str):
  return s.upper()

uam_offers.select(to_upper(uam_offers.offer_name)).show()

UDF a wydajność:

UDF w PySpark są wolniejsze niż w Scali oraz Javie ze względu na konieczność serializacji i deserializacji danych przy wykonywaniu funkcji pythonowych (dwukrotnie). 
https://blog.cloudera.com/blog/2017/02/working-with-udfs-in-apache-spark/

Od wersji 2.3 wprowadzono tzw. Pandas UDF, zaś począwszy od wersji 3.0 wprowadzone nowe uporządkowane API:

https://databricks.com/blog/2020/05/20/new-pandas-udfs-and-python-type-hints-in-the-upcoming-release-of-apache-spark-3-0.html

Przykład znajduje się poniżej:

In [0]:
from pyspark.sql.functions import pandas_udf, PandasUDFType
import pandas as pd

@pandas_udf("decimal(12,2)")
def total_typed_udf(price: pd.Series, item_quantity: pd.Series) -> pd.Series:
    return price * item_quantity
  
uam_orders.select(total_typed_udf(uam_orders.price.cast('decimal(12,2)'), uam_orders.quantity.cast('decimal(12,2)'))).show()

# WHERE

Działanie metody `where` jest bardzo intuicyjne:

In [0]:
# Znajdźmy transakcje dokonane po 2018-01-01 23:00:00
uam_orders.where(uam_orders.buyingTime > '2018-01-01 23:00:00').show()

In [0]:
# Znajdźmy oferty z jedną odsłoną
uam_offers.where(uam_offers.pv == 1).show()

Intuicyjne również są operatory logiczne, chociaż warto zwrócić uwagę, na potrzebę umieszczania wyrażeń w nawiasach ze względu na kolejność wykonywania operatorów:

In [0]:
# Koniunkcja

# Znajdźmy kategorie, które na 3. poziomie mają wartość różną od NULL, a na 1. "Moda i uroda"
uam_categories.where((uam_categories.category_level3.isNotNull()) & (uam_categories.category_level1 == 'Moda i uroda')).show()


In [0]:
# Alternatywa

# Znajdźmy oferty z jedną odsłoną lub liczbą odsłon większą niż 10
uam_offers.where((uam_offers.pv == 1) | (uam_offers.pv > 10)).show()


Istnieje wiele funkcji dostępnych na kolumnach, które zwracają typ `boolean`. Np.:

In [0]:
# Znajdźmy kategorie na 1. poziomie, które nie składają się ze słów rozdzielonych spójnikiem “i”
uam_categories.where(~uam_categories.category_level1.like('% i %')).show()


Kolejnym przykładem takich funkcji są `isNull()` oraz `isNotNull()`:

In [0]:
uam_orders.where(uam_orders.userAgent.isNull()).show()
uam_orders.where(uam_orders.userAgent.isNotNull()).show()

Intuicyjne są również operatory porównania:

In [0]:
uam_offers.where(uam_offers.pv > 1)
uam_offers.where(uam_offers.pv >= 1)
uam_offers.where(uam_offers.pv < 1)
uam_offers.where(uam_offers.pv <= 1)
uam_offers.where(uam_offers.pv != 1)

Przykład

Znajdźmy identyfikatory ofert wraz z dostępnymi rozmiarami. Informacje  o rozmiarach znajdują się w kolumnie *attributes*. Rozmiar jest jednym z rodzajów atrybutów.

In [0]:
from pyspark.sql.functions import explode, col
(
  uam_offers.
   select(uam_offers.offer_id, explode(uam_offers.attributes).alias('attribute')).
   where(col('attribute.name') == 'rozmiar').
   select(uam_offers.offer_id, explode(col('attribute.values')).alias('attribute_value')).
   show()
)

Zadanie 

Znajdźmy oferty "Kup teraz" (typ zawiera BUY_NOW), które są niedostępne (tj. stan magazynowy *stock_current_quantity* jest równy 0).

Rozwiązania

In [0]:
from pyspark.sql.functions import array_contains
uam_offers.where((array_contains(uam_offers.types, 'BUY_NOW')) & (uam_offers.stock_current_quantity == 0)).select('offer_id').show()

In [0]:
from pyspark.sql.functions import col, explode
uam_offers.withColumn("type", explode(uam_offers.types)).where((col("type") == 'BUY_NOW') & (uam_offers.stock_current_quantity == 0)).select('offer_id').show()

# JOIN

Do operacji złączenia służy metoda `join` zdefiniowana na DataFrame:

`join(other, on=None, how=None)`

Znaczenie parametrów:
1. `other` - DF, z którym przeprowadzamy operację złączenia (prawa strona)
2. `on` - string, lista kolumn lub wyrażenie bazujące na kolumnach
3. `how` -  *inner* (domyślnie), *cross, outer, full, full_outer, left, left_outer, right, right_outer, left_semi, and left_anti*

Przykład 

Połącz oferty z kategoriami. Wybierz tylko te oferty, które są droższe niż 100 zł i należą do kategorii "Kolekcje i sztuka".

In [0]:
uam_offers.join(uam_categories, uam_offers.category_leaf == uam_categories.category_id).\
where((uam_offers.buynow_price > 100) & (uam_categories.category_level1 == 'Kolekcje i sztuka')). \
select(uam_offers.offer_id, uam_offers.offer_name, uam_categories.category_level1, uam_categories.category_level2, uam_categories.category_level3).\
show()

Ćwiczenie

Sporządź zestawienie ofert sprzedanych danego dnia (identyfikator, nazwa). Wybierz tylko te oferty, które są droższe niż 1000 zł.

Rozwiązania

In [0]:
uam_offers.join(uam_orders, uam_offers.offer_id == uam_orders.offer_id).\
where(uam_offers.buynow_price > 1000).\
select(uam_offers.offer_id, uam_offers.offer_name). \
show()

uam_offers.join(uam_orders, 'offer_id').\
where(uam_offers.buynow_price > 1000).\
select(uam_offers.offer_id, uam_offers.offer_name).\
show()

uam_offers.join(uam_orders, ['offer_id']).\
where(uam_offers.buynow_price > 1000).\
select(uam_offers.offer_id, uam_offers.offer_name).\
show()


# Grupowanie i agregacja

Odpowiednikiem klauzuli `GROUP BY` jest DataFrame'owa metoda `groupBy(*cols)`, natomiast do wyliczania agregatów służy metoda `agg(*exprs)` zdefiniowana w klasie `GroupedData`. 

Dla hipotycznego DataFrame'a *df* kombinacja wywołań w/w metod będzie następująca:

`df.groupBy(*cols).agg(*exprs)`, gdzie

1. `*cols` - lista kolumn lub string (nazwy kolumn)
2. `*exprs` - lista wyrażeń grupujących kolumny lub słownik, gdzie kluczem jest nazwa kolumny a wartością funkcja grupująca

Przykład

Znajdź maksymalną i minimalną cenę ofert w każdej z kategorii na 1. poziomie

In [0]:
from pyspark.sql.functions import min, max
uam_offers.join(uam_categories, uam_categories.category_id == uam_offers.category_leaf).\
groupBy(uam_categories.category_level1). \
agg(min(uam_offers.buynow_price.cast('double')).alias('min_price'), max(uam_offers.buynow_price.cast('double')).alias('max_price')).\
show()

In [0]:
#inne rozwiązania:

# lista kolumn w groupBy
uam_offers.join(uam_categories, uam_categories.category_id == uam_offers.category_leaf).\
groupBy([uam_categories.category_level1]).\
agg(min(uam_offers.buynow_price.cast('double')), max(uam_offers.buynow_price.cast('double'))).\
show()

# string w groupBy
uam_offers.join(uam_categories, uam_categories.category_id == uam_offers.category_leaf).\
groupBy('uam_categories.category_level1').\
agg(min(uam_offers.buynow_price.cast('double')), max(uam_offers.buynow_price.cast('double'))).\
show()

 # lista string'ów w groupBy 
uam_offers.join(uam_categories, uam_categories.category_id == uam_offers.category_leaf).\
groupBy(['uam_categories.category_level1']).\
agg(min(uam_offers.buynow_price.cast('double')), max(uam_offers.buynow_price.cast('double'))).\
show()

# słownik w agg
uam_offers.join(uam_categories, uam_categories.category_id == uam_offers.category_leaf).\
withColumn('buynow_price_as_double1', uam_offers.buynow_price.cast('double')). \
withColumn('buynow_price_as_double2', uam_offers.buynow_price.cast('double')). \
groupBy('uam_categories.category_level1').\
agg({'buynow_price_as_double1': 'min', 'buynow_price_as_double2': 'max'}).\
show()


Funkcje grupujące nazywają się podobnie jak w Spark SQL (różniące się zostały pogrubione):

*avg*

*collect_list*

*collect_set*

*corr*

*first*

*kurtosis*

*last*

*max*

*mean*

*min*

*skewness*

*stddev_pop*

*stddev_samp*

*stddev*

*var_pop*

*var_samp*

*variance*

**approx_count_distinct**

*count*

**countDistinct**

*sum*

**sumDistinct**

Niektóre funkcje grupujące są również metodami `pyspark.sql.group.GroupedData` (tj. pomijamy wywołanie `agg(...)`), np.:

In [0]:
gdf = uam_offers.groupBy(uam_offers.duration)
gdf.max('pv', 'stock_initial_quantity')
# Pozostałe funkcje:
gdf.avg('pv', 'stock_initial_quantity')
gdf.count()
gdf.mean('pv', 'stock_initial_quantity')
gdf.min('pv', 'stock_initial_quantity')
gdf.sum('pv', 'stock_initial_quantity')

Funkcja `pivot` umożliwia zamianę wartości w wierszach wybranej kolumny na nazwę nowych kolumn oraz przeprowadzenie odpowiedniej agregacji. Przykładowo, jeśli chcemy pogrupować sumę odsłon oferty w przekroju kategorii (1. poziom) oraz czasu trwania, tak aby czas trwania był nowych kolumnach, to musimy:

In [0]:
df.groupBy(df.category_level1, df.duration).sum('pv')

In [0]:
df = uam_offers.join(uam_categories, uam_categories.category_id == uam_offers.category_leaf)
df.groupBy(df.category_level1).pivot('duration', values=['PT168H', 'PT240H']).sum('pv')

Uwaga: funkcja pivot przyjmuje opcjonalny argument `values`. Jeśli jest wypełniony, to zostaną utworzone te kolumny, które znajdują się na liście. 
Pytanie: która wersja pivot jest wydajniejsza? Dlaczego?

Grupowanie (`cube/rollup`) oraz sortowanie - przykład

Znajdźmy sumę odsłon oraz liczbę ofert w podziale na kategorię (1. poziom) oraz czas trwania oferty. Posortujmy wyniki ze względu na poziom grupowania:

In [0]:
df = uam_offers.join(uam_categories, uam_categories.category_id == uam_offers.category_leaf)

from pyspark.sql.functions import grouping_id, grouping, sum, count, col

df.cube('category_level1', 'duration').\
agg(grouping_id(), grouping('category_level1'), grouping('duration'), \
sum(df.pv), count('*')).\
orderBy(col('grouping_id()')).show(n=100)

W odróżnieniu od Spark SQL nie ma odpowiednika funkcji `grouping sets`

Dodatkowe funkcje `grouping_id()` oraz `grouping(col)` służą odpowiednio do określania poziomu agregacji oraz określenia, czy wartość w kolumnie jest wynikiem agregacji, czy jest wartością występującą w zbiorze wynikowym (jest to szczególnie ważne w przypadku wartości *NULL*)

Funkcja `rollup` w odróżnieniu od `cube` nie generuje wszystkich możliwych podsumowań, ale takie które wynikają z kolejności podanych kolumn. Np. `rollup(col1, col2)` zwróci całościowe podsumowanie oraz na poziomie kolumny *col1* oraz pary *(col1, col2)*. Nie zostanie wygenerowane podsumowanie na poziomie *col2*.

In [0]:
df = uam_offers.join(uam_orders, uam_orders.offer_id == uam_offers.offer_id, "left_anti")

display(
  df.groupBy().agg(max(df.buynow_price.cast("double").alias("max_price")))
)

# Funkcje analityczne 

Przeanalizujmy działanie funkcji analitycznych na przykładzie:

Znajdźmy kategorię (1. poziom), w której użytkownik dokonał 1. transakcji (jako kupujący)

In [0]:
from pyspark.sql import Window
from pyspark.sql.functions import rank, col

df = uam_orders.\
join(uam_offers, 'offer_id').\
join(uam_categories, uam_categories.category_id == uam_offers.category_leaf)

window = Window.partitionBy(df.buyer_id).orderBy(df.buyingTime)

df.withColumn('rank_', rank().over(window)).\
where(col('rank_') == 1).\
select(df.buyer_id, df.category_level1).show()

A teraz trochę teorii dla uporządkowania:

Funkcje analityczne używamy analogicznie jak inne funkcje. Jedyną różnicą jest to, że po nazwie funkcji analitycznej wywołujemy metodę `over(window_spec)`, gdzie `window_spec` jest definicją okna.

`pyspark.sql.Window` posiada następujące metody:
1. `partitionBy(cols*)` - określenie kolumn, które wchodzą w skład definicji partycji
2. `orderBy(cols*)` - określenie sortowania wewnątrz partycji
3. `rangeBetween(start, end)` - definicja okna względem wartości danego wiersza
4. `rowsBetween(start, end)` - definicja okna względem pozycji danego wiersza

Stałe użyte w metodach `rangeBetween`, `rowsBetween` oznaczają: 
1. `Window.currentRow` -  bieżący wiersz
2. `Window.unboundedPreceding` - wszystkie wiersze poprzedzające bieżący wiersz
3. `Window.unboundedFollowing` - wszystkie wiersze następujące po bieżącym wierszu

Wybrane funkcje analityczne:

```
cume_dist()
dense_rank()
lag(col, count=1, default=None)
last(col, ignorenulls=False)
lead(col, count=1, default=None)
ntile(n)
percent_rank()
row_number()
rank()
```

Funkcje agregujące z definicją okna (np. `sum`, `max`, `count` itd.)

# Operacje na zbiorach 

Działania na zbiorach najlepiej zobrazują poniższe przykłady:

In [0]:
# Tworzymy następujące DF’y:
df1 = spark.range(0, 10, 2) # liczby parzyste
df2 = spark.range(10) # liczby naturalne

print("Suma zbiorów (może zawierać duplikaty)")
df1.union(df2).show()
print("Suma zbiorów (bez duplikatów)" )
df1.union(df2).distinct().show()
print("Część wspólna zbiorów")
df2.intersect(df1).show()
print("Różnica zbiorów")
df2.subtract(df1).show()

Ćwiczenie

Znajdź oferty, które nie znalazły nabywcy (identyfikatory) korzystając z operacji na zbiorach

Rozwiązanie

In [0]:
uam_offers.select(uam_offers.offer_id).subtract(uam_orders.select(uam_orders.offer_id)).show()

# Operacja `cache`

Metoda `cache`/ `persist` służy do przechowywania danych w pamięci operacyjnej. Dzięki temu możemy uniknąć wykonywania tych samych akcji wielokrotnie. W PySparku możemy używać `cache` na poziomie dowolnego DataFrame'a w odróżnieniu do Spark SQL, gdzie działaliśmy na poziomie tabeli. Przejdźmy do przykładów.

Aby wyczyścić wszystkie dane z pamięci podręcznej możemy wykonać:

In [0]:
spark.catalog.clearCache()

Utwórzmy DataFrame'a, który zawiera oferty będące aukcjami:

In [0]:
from pyspark.sql.functions import array_contains, col
auctions_stats_df = uam_offers.where(array_contains(col("types"), "AUCTION")).groupBy(col("duration")).count()

In [0]:
display(auctions_stats_df)

In [0]:
Przeanalizujemy plan wykonania przed `cache`:

In [0]:
auctions_stats_df.explain()

Następnie po operacji cache:

In [0]:
cached_df = auctions_stats_df.cache()
display(cached_df)

Zauważmy, że optimalizator Sparka jest w stanie stwierdzić, które dane są w cache'u na poziomie analizy zapytania. W tym przypadku użyliśmy DataFrame'a, na którym nie wykonano operacji `cache`, ale mimo wszystko optymalizator znalazł odpowiednie dane w pamięci podręcznej i je wykorzystał:

In [0]:
display(auctions_stats_df)

Po usunięciu danych z pamięci podręcznej plan zapytania wygląda tak jak wcześniej:

In [0]:
uncached_df = cached_df.unpersist()
display(uncached_df)

W związku z tym, że Spark korzysta z zaawansowanych mechanizmów takich jak:
  * CBO - Cost Based Optimizer (optymalizator kosztowy)
  * AQE - Adaptive Query Execution (adaptacyjne wykonanie zapytania)
  * Delta cache - pamięć podręczna dla plików w formacie Delta

plan zapytania może się różnić w zależności od wolumenów danych i parametryzacji Spark'a.

# DataFrame jako widok tymczasowy

A co jeśli odpowiada nam bardziej SQL API? Możemy zarejestrować DF jako widok tymczasowy:

In [0]:
unsoldOffersDf = uam_offers.select(uam_offers.offer_id).subtract(uam_orders.select(uam_orders.offer_id))
unsoldOffersDf.createOrReplaceTempView ('unsoldOffers')

In [0]:
%sql
select count(1) from unsoldOffers

In [0]:
spark.sql("select count(1) from unsoldOffers").show()

In [0]:
%sql
show tables

Może to być bardzo wygodne przy wykonywaniu zapytań z różnych źródeł (np. HDFS/DBFS i JDBC)

# Zapis danych

Do zapisu danych służy klasa `pyspark.sql.DataFrameWriter`, która jest tworzona poprzez wywołanie metody `write()` klasy `DataFrame`. Najważniejsze metody to:

`saveAsTable(name, format=None, mode=None, partitionBy=None, **options)`
1. `name` - nazwa tabeli
2. `mode` - tryb zapisu: 

 *append*: dodaje nowe wiersze
 
 *overwrite*: nadpisuje dane
 
 *ignore*: jeśli dane istnieją w tabeli nie robi nic
 
 *error* (domyślnie): zwraca wyjątek, jeśli dane istnieją
 
3. *partitionBy* - nazwy kolumn z partycjami
4. `**options` - inne opcje

`insertInto(tableName, overwrite=False)`
1. `tableName` - nazwa tabeli
2. `overwrite` - czy nadpisywać dane

Przeanalizujmy następujące przykłady

In [0]:
%fs
rm -r /user/hive/warehouse/uam_categories_sample

In [0]:
spark.sql("drop table if exists default.uam_categories_sample")

# zapis danych do nowej tabeli
uam_categories.sample(fraction=0.1).\
write.saveAsTable('default.uam_categories_sample', format='orc')
# Dodanie nowych rekordów do tabeli
uam_categories.sample(fraction=0.1).write.insertInto('default.uam_categories_sample')

# Zapis danych do tabeli partycjonowanej
uam_categories.sample(fraction=0.1).\
write.saveAsTable('default.uam_categories_sample', format='orc', mode='overwrite', partitionedBy=['category_level1'])

Dane możemy również zapisywać bezpośrednio do plików:

In [0]:
uam_offers.sample(False, 0.01).write.\
partitionBy('duration').\
mode('overwrite').\
parquet(path='dbfs:/tmp/uam_offers_sample')


Analogicznie zapis może być dokonany do formatów: *orc, csv, text, json* oraz do źródeł danych *JDBC* (metoda `jdbc`)

W przypadku, gdy mamy zdefiniowaną tabelę w metastore, ale zdecydowaliśmy się pisać bezpośrednio do plików (jak w powyższym przykładzie), to dane mogę nie być widoczne. Poniższe polecenie sprawi, że metastore odświeży swoje metadane i uwzględni nowe pliki:
```python
spark.catalog.recoverPartitions(nazwa_tabeli)
```
Powyższe polecenie jest odpowiednikiem SQL'owego `MSCK REPAIR TABLE ...`
<br><br><br>
Czasami może się zdarzyć, że odczytujemy dane z pewnej tabeli, potem ją modyfikujemy (albo robi inny program), a potem znowu próbujemy ją odczytać. W przypadku, gdy spark zwróci wyjątek należy wykonać:
```python
spark.catalog.refreshTable(nazwa_tabeli)
```
Jest to odpowiednik SQL'owego `REFRESH TABLE ...`

# Pandas & Koalas

Biblioteka pandas jest jedną z najpolularniejszych bibliotek pythonowych do analizy danych (zob.: https://pandas.pydata.org/). Aby z niech korzystać wystarczy wywołać metodą `toPandas`:

In [0]:
uam_categories.limit(10).toPandas()

Warto pamiętać że `toPandas` jest akcją. Oznacza to, że wszystkie dane są zbierane do pamięcie drivera (działa podobnie jak `collect`). Dlatego powstała biblioteka koalas, która od wersji Spark 3.2 będzie już częścią Spark'a. Zapewnia ono podobne API jak pandas, ale przetwarzania mają charakter rozproszony. Oznacza to, że nie ogranicza nas pamięć driver'a :)

Przykład:

In [0]:
# to było potrzebne przed spark 3.2
# import databricks.koalas as ks

import pyspark.pandas as ps
koalasDf = uam_categories.to_koalas()

# Porównanie różnych API

In [0]:
# 1) RDD - imperatywne API
uam_categories.rdd.filter(lambda r: r.category_level1 == 'Elektronika' ).map(lambda r: (r.category_level2, 1)).reduceByKey(lambda v1, v2: v1+v2).take(10)


In [0]:
# 2) Spark SQL - deklaratywne API
spark.sql("select category_level2,  count(1) from uam_categories where category_level1 = 'Elektronika' group by category_level2").show()

In [0]:
# 3) DataFrame - deklaratywne API
uam_categories.where(uam_categories.category_level1 == 'Elektronika').groupBy('category_level2').count().show()