<a href="https://colab.research.google.com/github/ankawm/Udemy-Data-Science/blob/main/Kodolamacz_Spark_1.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# APACHE SPARK


<div style="text-align: right">
<b>Patryk Pilarski</b><br>
1patryk.pilarski@gmail.com<br>
p.pilarski@sages.com.pl
</div>


### Dzień 1

#### Wprowadzenie + Spark SQL

In [1]:
!pip install pyspark

Collecting pyspark
  Downloading pyspark-3.2.1.tar.gz (281.4 MB)
[K     |████████████████████████████████| 281.4 MB 34 kB/s 
[?25hCollecting py4j==0.10.9.3
  Downloading py4j-0.10.9.3-py2.py3-none-any.whl (198 kB)
[K     |████████████████████████████████| 198 kB 48.5 MB/s 
[?25hBuilding wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.2.1-py2.py3-none-any.whl size=281853642 sha256=508e205205835185c42dd1d08f218ab8b17e9cd772d0db66b5e0f5574def57bf
  Stored in directory: /root/.cache/pip/wheels/9f/f5/07/7cd8017084dce4e93e84e92efd1e1d5334db05f2e83bcef74f
Successfully built pyspark
Installing collected packages: py4j, pyspark
Successfully installed py4j-0.10.9.3 pyspark-3.2.1


Apache Spark to silnik do obliczeń rozproszonych na licencji open-source. Pierwotnie powstał na Berkley, po czym przekazano go do Apache Software Foundation gdzie jest od tamtej pory utrzymywany i rozwijany. Spark oferuje interfejs pozwalający na programowanie obliczeń na klastrach z domyślną paralelizacją oraz odpornością na awarie.

Spark dostępny jest w Scali, Pythonie, Javie oraz R.

**Komponenty Sparka:**

![image.png](attachment:image.png)

* Spark "core" - podstawa Sparka z podstawową abstrakcją danych nazywaną RDD
* Spark SQL - komponent pozwalający na operowanie na ustrukturyzowanych danych z wykorzystaniem operacji znanych z SQL - łatwy w użyciu
* Spark MLlib - komponent zawierający algorytmy ML dostępne w Sparku - ML na skalę klastrów
* Spark Streaming - moduł pozwalający na pracę ze strumnieniami danych
* Spark GraphX - komponent do pracy z grafami

**Architektura Sparka:**

![image-2.png](attachment:image-2.png)

* driver - proces uruchamiający główną funkcję aplikacji i tworzący SparkContext
* executor(y) - proces uruchomiony dla aplikacji w węźle roboczym (worker node), który uruchamia zadania i przechowuje dane w pamięci lub na dysku. Każda aplikacja ma własne executory
* cluster manager - dostępne opcje: YARN, Mesos, Kubernetes, Standalone

**SparkContext:**
* punkt wejścia do pracy ze Sparkiem
* koordynuje procesy na klastrze
* zatrzymanie SparkContextu == zatrzymanie działania aplikacji
* zwykle nazywany `sc`
* kroki niezbędne do utworzenia SparkContextu w pySparku:

> import pyspark

> sc = pyspark.SparkContext(appName="my_app")

**SparkSession:**
* wprowadzony w Spark 2.0
* składa się ze SparkContextu, SQLContextu oraz HiveContext
* zwykle nazywany `spark`
* kroki niezbędne do utworzenia SparkSession w pySparku:

> from pyspark.sql import SparkSession

> spark = SparkSession.builder.appName('my_app').getOrCreate()


**RDD:**
* podstawowa abstrakcja danych w Sparku
* R - resilient
* D - distributed
* D - dataset
* Matei Zharia, et al. `Resilient Distributed Datasets: A Fault-Tolerant Abstraction for In-Memory Cluster Computing`
* immutable
* in-memory
* lazy evaluated
* parallel
* dwa typy operacji: akcje i transformacje
* przykłady użycia:

In [2]:
import pyspark
sc = pyspark.SparkContext(appName="my_app")

In [3]:
sc.parallelize(range(20))\
.map(lambda x: x * 2) \
.filter(lambda x: x != 2) \
.reduce(lambda x,y: x + y)

378

In [4]:
text_input = """Spark is a fast and general processing engine compatible with Hadoop data. 
It can run in Hadoop clusters through YARN or Spark's standalone mode, and it can process data in HDFS, 
HBase, Cassandra, Hive, and any Hadoop InputFormat. It is designed to perform both batch processing 
(similar to MapReduce) and new workloads like streaming, interactive queries, and machine learning."""

sc.parallelize(text_input.split(" ")) \
.map(lambda x: x.lower().strip(",.()")) \
.map(lambda x: (x, 1)) \
.reduceByKey(lambda x,y: x + y) \
.collect()

[('is', 2),
 ('engine', 1),
 ('compatible', 1),
 ('hadoop', 3),
 ('\nit', 1),
 ('run', 1),
 ('in', 2),
 ('clusters', 1),
 ('yarn', 1),
 ("spark's", 1),
 ('mode', 1),
 ('process', 1),
 ('hdfs', 1),
 ('\nhbase', 1),
 ('cassandra', 1),
 ('hive', 1),
 ('designed', 1),
 ('perform', 1),
 ('both', 1),
 ('\n(similar', 1),
 ('new', 1),
 ('like', 1),
 ('streaming', 1),
 ('machine', 1),
 ('learning', 1),
 ('spark', 1),
 ('a', 1),
 ('fast', 1),
 ('and', 5),
 ('general', 1),
 ('processing', 2),
 ('with', 1),
 ('data', 2),
 ('can', 2),
 ('through', 1),
 ('or', 1),
 ('standalone', 1),
 ('it', 2),
 ('any', 1),
 ('inputformat', 1),
 ('to', 2),
 ('batch', 1),
 ('mapreduce', 1),
 ('workloads', 1),
 ('interactive', 1),
 ('queries', 1)]

In [5]:
sc.stop()

Dokumentacja dot. RDD:

https://spark.apache.org/docs/latest/api/python/reference/pyspark.html

**DataFrame:**
* abstrakcja danych z modułu Spark SQL - u podstaw leżą RDD
* immutable
* in-memory
* resilient
* distributed
* parallel
* przechowuje dodatkowe informacje o strukturze danych (schema)
* rozproszona kolekcja wierszy z nazwanymi kolumnami
* optymalizowane przez Catalyst Optymizer
* pozwala na pracę z danymi wykorzysując zapytania znane z SQL/Hive

In [6]:
from pyspark.sql import SparkSession
from pyspark.sql import Row
from pyspark.sql import functions as f

spark = SparkSession.builder.appName('my_app').getOrCreate()

In [7]:
spark

**DataFrame - kolekcja wierszy**

In [8]:
dummy_df = spark.createDataFrame([Row(name='Greg', age=32),
                                  Row(name='Bob', age=27),
                                  Row(name='Alice', age=30)])

In [9]:
dummy_df.show()

+-----+---+
| name|age|
+-----+---+
| Greg| 32|
|  Bob| 27|
|Alice| 30|
+-----+---+



In [10]:
dummy_df2 = spark.createDataFrame([Row(name='Bill', age=26),
                                   Row(name='Carol', age=28),
                                   Row(name='Susan', age=25), 
                                   Row(name='Mark', age=None)])

In [11]:
dummy_df2.show()

+-----+----+
| name| age|
+-----+----+
| Bill|  26|
|Carol|  28|
|Susan|  25|
| Mark|null|
+-----+----+



In [12]:
dummy_df.take(3)

[Row(name='Greg', age=32), Row(name='Bob', age=27), Row(name='Alice', age=30)]

In [14]:
dummy_df2.collect()

[Row(name='Bill', age=26),
 Row(name='Carol', age=28),
 Row(name='Susan', age=25),
 Row(name='Mark', age=None)]

**DataFrame - właściwości**

In [15]:
dummy_df.columns

['name', 'age']

In [16]:
dummy_df.dtypes

[('name', 'string'), ('age', 'bigint')]

In [17]:
dummy_df.printSchema()

root
 |-- name: string (nullable = true)
 |-- age: long (nullable = true)



In [18]:
dummy_df.describe().show()

+-------+-----+------------------+
|summary| name|               age|
+-------+-----+------------------+
|  count|    3|                 3|
|   mean| null|29.666666666666668|
| stddev| null|2.5166114784235836|
|    min|Alice|                27|
|    max| Greg|                32|
+-------+-----+------------------+



In [19]:
dummy_df.count()

3

In [21]:
len(dummy_df.columns)

2

**Sposoby odwoływania się do kolumn**

In [22]:
dummy_df.age

Column<'age'>

In [23]:
dummy_df["age"]

Column<'age'>

In [24]:
dummy_df[1]

Column<'age'>

In [25]:
f.col("age")

Column<'age'>

In [26]:
"age"

'age'

**Składnia inspirowana SQL**

In [27]:
dummy_df.select("age").show()

+---+
|age|
+---+
| 32|
| 27|
| 30|
+---+



In [28]:
dummy_df.where(dummy_df["age"] > 27).show()

+-----+---+
| name|age|
+-----+---+
| Greg| 32|
|Alice| 30|
+-----+---+



In [29]:
dummy_df.agg(f.avg(dummy_df["age"])).show()

+------------------+
|          avg(age)|
+------------------+
|29.666666666666668|
+------------------+



In [30]:
# lub bardziej SQLowo
dummy_df.select(f.avg(dummy_df["age"])).show()

+------------------+
|          avg(age)|
+------------------+
|29.666666666666668|
+------------------+



In [31]:
dummy_df.selectExpr("avg(age)").show()

+------------------+
|          avg(age)|
+------------------+
|29.666666666666668|
+------------------+



In [32]:
dummy_df.union(dummy_df2).show()

+-----+----+
| name| age|
+-----+----+
| Greg|  32|
|  Bob|  27|
|Alice|  30|
| Bill|  26|
|Carol|  28|
|Susan|  25|
| Mark|null|
+-----+----+



**Zapisywanie wyników**

In [33]:
#dummy_df.write.parquet('path/to/location/dummy_df.parquet')
#dummy_df.write.csv('path/to/location/dummy_df.csv')

In [34]:
dummy_df.write.parquet("p", mode="overwrite")

In [35]:
dummy_df.repartition(1).write.csv('csv', mode="overwrite")

In [36]:
dummy_df.toPandas()

Unnamed: 0,name,age
0,Greg,32
1,Bob,27
2,Alice,30


**Wczytywanie danych**

In [None]:
#parquet_df = spark.read.parquet('path/to/parquet')
#csv_df = spark.read.csv('path/to/csv', header=True, inferSchema=True)

In [None]:
spark.read.parquet("p").show()

In [None]:
spark.read.parquet("p").explain()

In [None]:
spark.read.parquet("p").select("age").explain()

**Używanie zapytań SQLowych**

https://spark.apache.org/docs/latest/sql-programming-guide.html#compatibility-with-apache-hive

In [None]:
dummy_df.createOrReplaceTempView('dummy_df')

In [None]:
# query zwraca nowy DataFrame
spark.sql('select * from dummy_df').show()

In [None]:
spark.sql('select name from dummy_df where age > 27').show()

In [None]:
spark.catalog.dropTempView("dummy_df")

### Przykładowe transformacje z wykorzystaniem API Spark SQL oraz selectów SQLowych

**Tworzenie pokazowych DataFrameów**

In [None]:
import random

In [None]:
random.seed(42)

In [None]:
geo_id = [random.choice(["regA","regB","regC","regD","regE","regF"]) for x in range(400)]

In [None]:
prod_id = [random.choice(["prodA","prodB","prodC","prodD","prodE","prodF",
                          "prodG","prodH","prodI","prodJ","prodK","prodL","prodM"]) for x in range(400)]

In [None]:
value = [random.uniform(1000,10000) for x in range(400)]
value[10] = None
value[17] = None
value[123] = None

In [None]:
df = spark.createDataFrame([Row(prod=p, geo=g, val=v) for p,g,v in zip(prod_id, geo_id, value)])

In [None]:
df.show()

In [None]:
df.createOrReplaceTempView('train_df')

In [None]:
geo_df = spark.createDataFrame([Row(geo_id = "regA", geo_name = "Region A"),
                                Row(geo_id = "regB", geo_name = "Region B"),
                                Row(geo_id = "regC", geo_name = "Region C"),
                                Row(geo_id = "regD", geo_name = "Region D"),
                                Row(geo_id = "regE", geo_name = "Region_E")])

In [None]:
geo_df.show()

In [None]:
geo_df.createOrReplaceTempView("geo_df")

**Select**

In [None]:
df.select("prod").show()

In [None]:
spark.sql("select prod from train_df").show()

In [None]:
df.drop("prod").show()

In [None]:
spark.sql("select geo, val from train_df").show()

In [None]:
df.select((df["val"]* 2).alias("val2")).show()

In [None]:
spark.sql("select val * 2 as val2 from train_df").show()

In [None]:
df.selectExpr("val * 2 val2", "val").show()

**Group by**

In [None]:
df.groupBy("prod").sum().show()

In [None]:
df.select("*", df.val.alias("val2")).groupBy("prod").sum().show()

In [None]:
df.select("*", df.val.alias("val2")).groupBy("prod").sum("val2").show()

In [None]:
spark.sql("select prod, sum(val) from train_df group by prod").show()

In [None]:
df.groupBy("prod","geo").sum().show()

In [None]:
spark.sql("select prod, geo, sum(val) from train_df group by 1, 2").show()

**Where (filter)**

In [None]:
df.where(df["prod"] != "prodA").show()

In [None]:
df.where(df["prod"] != "prodA").where(f.col("val") > 7000).show()

In [None]:
df.where((df["prod"] != "prodA") | (f.col("val") > 7000)).show()

In [None]:
spark.sql("select * from train_df where prod != 'prodA'").show()

In [None]:
df.where("prod != 'prodA'").show()

In [None]:
# like
df.where(df.prod.like('%A')).show()

In [None]:
spark.sql("select * from train_df where prod like '%A'").show()

> **ZADANIE**:   
>(punkty rozwiąż na dwa sposoby)
1. Oblicz średnią wartość dla produktów A i B w regionach C i D
2. Oblicz maksymalne wartości dla produktu A w podziale na regiony

**Order by**

In [None]:
df.groupBy("prod","geo").sum().orderBy("geo").show()

In [None]:
df.groupBy("prod","geo").sum().orderBy(f.desc("geo")).show()

In [None]:
spark.sql("select prod, geo, sum(val) from train_df group by 1, 2 order by 2").show()

**Wiele agregacji + aliasy**

In [None]:
df.groupBy("prod","geo").agg(f.sum("val").alias("val_sum"), f.avg("val").alias("val_avg"), 
                             f.count("*"), f.count("val")).show()

In [None]:
q = ("select prod, geo, sum(val) val_sum, avg(val) val_avg, count(*), count(val) " 
     "from train_df group by prod, geo")
spark.sql(q).show()

> **ZADANIE**:   
>(rozwiąż na dwa sposoby)
1. Oblicz różnicę pomiędzy maksymalną i minimalną wartością dla poszczególnych produktów w regionie F, wynik posortuj po nazwach produktów

**Joiny**

In [None]:
# dostępne: inner, cross, outer, full, full_outer, left, left_outer, right, right_outer, left_semi, i left_anti
df.join(geo_df, df.geo == geo_df.geo_id, "inner").show()

In [None]:
df.join(geo_df, df.geo == geo_df.geo_id, "inner").count()

In [None]:
q = """select train_df.prod, train_df.geo, train_df.val, geo_df.geo_name 
from train_df inner join geo_df on train_df.geo == geo_df.geo_id"""
spark.sql(q).show()

In [None]:
df.join(geo_df, df.geo == geo_df.geo_id, "outer").show()

In [None]:
df.join(geo_df, df.geo == geo_df.geo_id, "outer").count()

In [None]:
q = """select train_df.prod, train_df.geo, train_df.val, geo_df.geo_name 
from train_df full outer join geo_df on train_df.geo == geo_df.geo_id"""
spark.sql(q).show()

In [None]:
q = """select train_df.prod, train_df.geo, train_df.val, geo_df.geo_name 
from train_df full outer join geo_df on train_df.geo == geo_df.geo_id"""
spark.sql(q).count()

In [None]:
df_tmp = df.withColumnRenamed("geo", "geo_id")
df_tmp.join(geo_df, "geo_id", "inner").show()

In [None]:
# tak nie robić!
df_tmp.join(geo_df, df_tmp.geo_id == geo_df.geo_id, "inner").show()

**Distinct**

In [None]:
df.select("prod", "geo").distinct().show()

In [None]:
spark.sql("select distinct prod, geo from train_df").show()

In [None]:
# alternatywnie
df.dropDuplicates(["prod", "geo"]).show()

**Usuwanie NULLi**

In [None]:
# "any" or "all"
df.dropna("any").show()

In [None]:
df.dropna("any").count()

In [None]:
q = "select * from train_df where val is not null"
spark.sql(q).show()

In [None]:
q = "select * from train_df where val is not null"
spark.sql(q).count()

In [None]:
df.where(df.val.isNull()).show()

In [None]:
df.where(f.isnull("val")).show()

**Zastępowanie NULLi**

In [None]:
df.fillna(1).show()

In [None]:
df.fillna({"val": 1}).show()

In [None]:
q = "select prod, geo, if(val is null, 1, val) as val from train_df"
spark.sql(q).show()

**Podmiana wartości**

In [None]:
df.replace("prodA", "Product A").show()

In [None]:
df.replace({"prodA": "Product A", "prodB": "Product B"}).show()

In [None]:
spark.sql("select geo, regexp_replace(prod, 'prodA', 'Product A') as prod, val from train_df").show()

> **ZADANIE**:   
>(rozwiąż na dwa sposoby)
1. W dowolny sposób przygotuj df w którym kolumny geo i prod zawierają jedynie litery identyfikujące produkty i regiony

**Zmiana nazw kolumn**

In [None]:
df.withColumnRenamed("val", "volume").show()

In [None]:
df.select(df.val.alias("volume")).show()

In [None]:
spark.sql("select prod, geo, val as volume from train_df").show()

**Tworzenie nowej kolumny**

In [None]:
df.withColumn("val2", df["val"] / 100).show()

In [None]:
df.withColumn("val2", f.lit(100)).show()

In [None]:
df.select("*", (df.val/100).alias("val2")).show()

In [None]:
spark.sql("select *, val/100 as val2 from train_df").show()

**When**

In [None]:
df.select(df.prod, f.when(df.val > 7500, 1).when(df.val < 2500, 3).otherwise(2).alias("out")).show()

In [None]:
df.select(df.prod, df.val, f.when(df.val >= 7500, 1).when(df.val >= 2500, 2)\
          .when(df.val < 2500, 3).alias("out")).show()

In [None]:
q = "select prod, case when val > 7500 then 1 when val < 2500 then 3 else 2 end as out from train_df"
spark.sql(q).show()

**Substring**

In [None]:
df.select(df.prod.substr(2, 4).alias("out")).show()

In [None]:
df.select(f.substring("prod", 2, 4).alias("out")).show()

In [None]:
spark.sql("select substring(prod, 2, 4) as val2 from train_df").show()

**Funkcje analityczne (window functions)**

In [None]:
from pyspark.sql.window import Window

In [None]:
windowSpec = Window.partitionBy('prod')

df.select("prod", "val", f.sum("val").over(windowSpec).alias("prod_val")).show(50)

In [None]:
spark.sql("select prod, val, sum(val) over (partition by prod) as prod_val from train_df").show(50)

In [None]:
spark.stop()

### Ćwiczenia

**Importy i przygotowanie danych**

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql import Row
from pyspark.sql import functions as f
from pyspark.sql.window import Window
import random

In [None]:
spark = SparkSession.builder.appName('my_app').master("local[*]").getOrCreate()

https://www.fordgobike.com/system-data

In [None]:
goBike = spark.read.csv("./2017-fordgobike-tripdata.csv", header=True, inferSchema=True)

In [None]:
goBike.printSchema()

In [None]:
goBike.show(3, vertical=True)

> **ZADANIE 1**: Zapoznaj się z danymi:
1. Pozbądź się wierszy zawierających NULLe
1. Sprawdź rozkład zmiennej "member_gender"
2. Oblicz minimalny, maksymalny i średni wiek osób wypożyczających rowery
3. Oblicz liczbę unikalnych rowerów
4. Oblicz liczbę unikalnych stacji
5. Sprawdź który rower był wypożyczony najdłużej a który najkrócej w ciągu analizowanego okresu (oraz jak długo)
6. Oblicz średni czas pojedynczego wypożyczenia
7. Sprawdź pomiędzy którymi stacjami występował największy ruch (hint: A -> B == B -> A)
8. Sprawdź o której godzinie w ciągu dnia wypożyczano najwięcej rowerów
9. Sprawdź *średnią liczbę wypożyczeń* dla poszczególnych dni tygodnia (hint: java.text.SimpleDateFormat)
10. **⋆** Oblicz średni dystans (w km) pomiędzy stacją początkową a końcową dla wszystkich wypożyczeń

> **ZADANIE 2**: Utwórz DataFrame `dataDaily` zawierający dane zagregowane do poziomu dnia. Zbiór ma zawierać następujące informacje (kolumny): 
- 'date' : data 
- 'avg_duration_sec' : średni czas wypożyczeń danego dnia
- 'n_trips' : liczba wypożyczeń danego dnia
- 'n_bikes' : liczba unikatowych rowerów użytych danego dnia
- 'n_routes' : liczba unikatowych kombinacji stacji (x -> y == y -> x) danego dnia
- 'n_subscriber' : liczba wypożyczeń dokonanych przez subskrybentów danego dnia

**Przygotowanie danych**

In [None]:
random.seed(42)
geo_id = [random.choice(["regA","regB","regC","regD","regE","regF"]) for x in range(400)]
prod_id = [random.choice(["prodA","prodB","prodC","prodD","prodE","prodF",
                          "prodG","prodH","prodI","prodJ","prodK","prodL","prodM"]) for x in range(400)]
date = [random.choice(["2015-","2016-","2017-"]) + 
        random.choice(["01-","02-","03-","04-","05-","06-","07-","08-","09-","10-","11-","12-"]) + "01"
        for x in range(400)]
value = [random.uniform(10000,100000) for x in range(400)]
volume = [random.uniform(1000,10000) for x in range(400)]

In [None]:
df = spark.createDataFrame([Row(prod=p, geo=g, val=v, vol=vl, dt=d) 
                            for p,g,v,vl,d in zip(prod_id, geo_id, value, volume, date)])

In [None]:
df.printSchema()

In [None]:
df = df.withColumn("dt", f.to_date(df["dt"]))

In [None]:
df.printSchema()

In [None]:
df.createOrReplaceTempView("df")

In [None]:
df.show()

> **ZADANIE 3:**
1. Oblicz (globalną) średnią cenę produktów
2. Oblicz wartość całkowitą dla regionów per miesiąc
3. Oblicz udział wolumenu kombinacji region-produkt w całkowitym wolumenie produktu (jaka część całkowitego wolumenu danego produktu generowana jest w danym regionie)
5. Stwórz kolumnę `flag` zwierającą wartość `True` gdy nazwy regionu i produktu kończą się na tą samą literę - wartość `False` w każdym innym przypadku (nadpisz df aby zawierał nową kolumnę)
6. Oblicz iloczyn wartości i wolumenu gdy kolumna `flag` ma wartość `True`, w przeciwnym przypadku zwróć wartość 0 
7. Stwórz kolumnę z rokiem wyciągniętym z daty

### Zaawansowane operacje na oknach

**Ranking**

In [None]:
windowSpec = Window.partitionBy("prod").orderBy("dt")

df.withColumn("ranked", f.rank().over(windowSpec))\
.withColumn("ranked_d", f.dense_rank().over(windowSpec))\
.withColumn("row_num", f.row_number().over(windowSpec)).show(50)

In [None]:
q = "select *, rank() over (partition by prod order by dt) as ranked from df"
spark.sql(q).show()

**Różnica od pierwszej wartości**

In [None]:
windowSpec = Window.partitionBy("prod").orderBy("dt")

df.withColumn("diff_from_first", df.val - f.first(df.val).over(windowSpec)).show(50)

In [None]:
q = "select *, val - first(val) over (partition by prod order by dt) as diff_from_first from df"
spark.sql(q).show()

**Średnia ruchoma**

In [None]:
windowSpec = Window.partitionBy("prod").orderBy("dt").rowsBetween(-1,1)

df.withColumn("moving_avg", f.avg(df.val).over(windowSpec)).show()

In [None]:
q = """select *, 
avg(val) over (partition by prod order by dt rows between 1 preceding and 1 following) as moving_avg from df"""
spark.sql(q).show()

**Suma od pierwszego do bierzącego rekordu**

In [None]:
windowSpec = Window.partitionBy("prod").orderBy("dt")

df.withColumn("sum_from_start", f.sum(df.val).over(windowSpec)).show()

In [None]:
q = "select *, sum(val) over (partition by prod order by dt) as sum_from_start from df"
spark.sql(q).show()

In [None]:
# inne podejście
windowSpec = Window.partitionBy("prod").orderBy("dt").rowsBetween(Window.unboundedPreceding, Window.currentRow)

df.withColumn("sum_from_start", f.sum(df.val).over(windowSpec)).show()

In [None]:
q = """select *, sum(val) over (partition by prod order by dt rows between 
unbounded preceding and current row) as sum_from_start from df"""
spark.sql(q).show()

> **ZADANIE 4:**
1. Pogrupuj dane po dacie i produkcie po czym porównaj wolumen produktów ze średnim wolumenem z trzech wcześniejszych okresów
2. Stwórz kolumnę z rankingiem opartm na dacie dla kombinacji produkt-region
3. Oblicz różnicę w wolumenie pomiędzy następującymi po sobie datami dla regionów
4. Oblicz różnicę rok do roku w wartości i wolumenie dla produktów
5. Oblicz udział w całkowitej wartości poszczególnych produktów w danym roku dla każdego regionu

**Dodatek:**

#### User Defined Functions (UDFs)

(Używaj tylko w ostateczności)

In [None]:
from pyspark.sql.types import IntegerType, StringType, FloatType

In [None]:
def udfPower3(value):
    return value**3

In [None]:
udfPower3(2)

In [None]:
power3 = f.udf(udfPower3, FloatType())

In [None]:
df.select(power3(df.val)).show()

In [None]:
spark.udf.register("power3", udfPower3, FloatType())

In [None]:
spark.sql("select power3(val) from df").show()

In [None]:
def udfTwo_col(x,y):
    return x / y

In [None]:
udfTwo_col(10,2)

In [None]:
two_col = f.udf(udfTwo_col, FloatType())

In [None]:
df.select(two_col(df.vol,df.val)).show()

In [None]:
spark.udf.register("two_col", udfTwo_col, FloatType())

In [None]:
spark.sql("select two_col(vol,val) from df").show()

In [None]:
spark.stop()

#### Pandas UDFs

(lepsza alternatywa)

https://spark.apache.org/docs/latest/sql-pyspark-pandas-with-arrow.html

- wymagają zainstalowania pyarrow (`conda install -c conda-forge pyarrow` lub `pip install pyarrow`)
- wymagają ustawienia parametru `spark.sql.execution.arrow.pyspark.enabled` na `true`
- wymagają napisania funkcji które działają na seriach z biblioteki pandas

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql import functions as f
from pyspark.sql.functions import pandas_udf
from pyspark.sql.types import ArrayType, IntegerType
import pandas as pd
import numpy as np

In [None]:
spark = SparkSession.builder \
    .appName('app') \
    .config("spark.sql.execution.arrow.pyspark.enabled", "true") \
    .getOrCreate()

In [None]:
df_pd = spark.createDataFrame([("x", "one exemplary string", 4, 4), 
                               ("x", "and here too a string", 3, 5), 
                               ("y", "some more also here", 5, 3), 
                               ("y", "and another one", 1, 7)], ["f", "s", "a", "b"])
df_pd = df_pd.withColumn("s", f.split("s", " "))
df_pd.show()

----

##### Funkcje działające na jednym wierszu
- **pandas_udf(f, returnType)** - tworzy Pandas UDF zwracający kolumnę o wartościach typu `returnType`

In [None]:
def array_elem_length(s: pd.Series) -> pd.Series:
    return s.apply(lambda x: [len(y) for y in x])

In [None]:
s = pd.Series([["one", "exemplary", "string"], ["and", "here", "too", "a", "string"]])
s

In [None]:
array_elem_length(s)

In [None]:
lenUDF = pandas_udf(array_elem_length, returnType=ArrayType(IntegerType()))

In [None]:
df_pd.withColumn("s_len", lenUDF(f.col("s"))).show()

In [None]:
@pandas_udf("float")
def pythagoras(a: pd.Series, b: pd.Series) -> pd.Series:
    return pd.concat([a, b], axis=1).apply(lambda x: np.sqrt(np.square(x[0]) + np.square(x[1])), axis=1)

In [None]:
# a = pd.Series([4, 3, 5, 1])
# b = pd.Series([4, 5, 3, 7])
# pythagoras(a, b)

In [None]:
df_pd.withColumn("c", pythagoras(df_pd.a, df_pd.b)).show()

##### Funkcje agregujące

In [None]:
@pandas_udf("float")
def median(v: pd.Series) -> float:
    return v.median()

In [None]:
df_pd.groupby("f").agg(median('a').alias("a_med"), median('b').alias("b_med")).show()