# SparkSession

U uvodnom tekstu smo ukratko spomenuli Spark ulaznu točku zvanu SparkContext koja predstavlja konekciju sa Spark klasterom - SparkSession je nadskup toga. Okružuje SparkContext i pruža mogućnost interakcije sa Spark SQL API-jem koji sadrži DF koji ćemo koristiti u većini naših programa. 

Prije pokretanja PySpark koda potrebno je kreirati SparkSession instancu. Dok je pokrenuta možete dohvatiti pregled vašeg Spark klastera i svih operacija na http://localhost:4040.

SparkSession objektu ćete dodijeliti određene parametre kako biste postavili i konfigurirali Spark aplikaciju - neki od parametara uključuju naziv aplikacije, URL i postavke alokacije memorije.Tako naša aplikacija postavlja sljedeće parametre:
* `builder` - objekt za kreiranje i konfiguraciju SparkSessiona
* `master("local[*]")` - lokalno izvršavanje aplikacije gdje * označava izrvšavanje aplikacije na svim dostupnim jezgrama - ovdje možete upisati i broj jezgri
* `appName("SparkVjezba")` - ime aplikacije
* `getOrCreate()` - kreira novi SparkSession ako ne postoji ili vraća postojeći ako se već izvodi te time osigurava da je samo jedan SparkSession objekt kreiran za aplikaciju što je važno za efikasnu upotrebu resursa. 

In [2]:
import findspark
from pyspark.sql import SparkSession

scala_version = '2.12'
spark_version = '3.4.0'
packages = [
    f'org.apache.spark:spark-sql-kafka-0-10_{scala_version}:{spark_version}',
    'org.apache.kafka:kafka-clients:3.2.1'
]

findspark.add_packages(','.join(packages))
findspark.init()

spark = (
    SparkSession.builder
    .master("local[*]")
    .appName("SparkVjezba")
    .getOrCreate()
)

# DataFrame

Ako ste korisitli Pandas biblioteku u Pythonu vjerojatno ste upoznati s konceptom DataFramea. Ipak, važno je naglasiti da DataFrame u Pandasu i Sparku nisu isti; neke od razlika su:
| Aspekt | Pandas DataFrame | Spark DataFrame |
|:--------:|:-----------------:|:----------------:|
| Izvršavanje | Manji setovi podataka i izvršavanje na jednom stroju | Veliki setovi podataka i izvršavanje na više storjeva |
| Evaluacija | Agilna | Lijena |
| Brzina | Brži za manje skupove podataka| Brži za veće skupove podataka |
| Izvori podataka | Zahtijeva dodatne bibioloteke za čitanje i pisanje raznih izvora podataka | Ugrađena potpora za čitanje i pisanje raznih izvora podataka| |

<br/>
Dataframe u Sparku možete shvatiti kao tablicu s redovima i stupcima koja sadrži veliku količinu podataka. Obrada podataka u Dataframeu se vrši korištenjem transformacija i akcija, a rezultat se može spremiti u novi Dataframe. Dataframe je također vrlo fleksibilan i može se koristiti za razne obrade podataka kao što su spajanje, filtriranje, sortiranje, agregacija i grupiranje podataka. 

## Kreiranje i korištenje DataFramea 

In [None]:
# kreacija DataFramea
data = [(1,2), (3,4)]
schema = "neparni int, parni int"
df_list = spark.createDataFrame(data, schema)

# prikaz DataFramea
df_list.show()

# prikaz scheme DataFramea
df_list.printSchema()

In [None]:
# selekcija stupaca
df_list.select(['neparni']).show()

In [None]:
# dodavanje stupaca
from pyspark.sql.functions import col

df_list.withColumn("zbroj", col("neparni") + col("parni")).show()

Prilikom stvaranja DF-a ukoliko nema zaglavlja (ili se ona ne čitaju) zadana vrijednost ukoliko se DF stvara iz CSV datoteke je _cn - gdje n predstavlja broj stupca počevši od nule, dok ukoliko se stvara iz liste zadana vrijednost je _n - gdje je n ekvivalent prethodnom n-u.

In [None]:
# preimenovanje stupaca
df_list_n = spark.createDataFrame(data)
df_list_n.withColumnRenamed('_1', 'neparni') \
         .withColumnRenamed('_2', 'parni') \
         .show()

## Spark SQL i Window funkcije

Windowing u Sparku omogućuje obradu podataka u prozorima; odnosno, funkcije se mogu primijeniti i na više elemenata, a ne samo na pojedinačni element.

In [None]:
employee_data = (("John", "Engineering", 7000),
                 ("Jane", "Marketing", 5000),
                 ("Bob", "Engineering", 6000),
                 ("Mary", "Sales", 5500),
                 ("Alex", "Marketing", 4500),
                 ("Mike", "Engineering", 7500),
                 ("Julie", "Sales", 6000),
                 ("Sam", "Finance", 6500),
                 ("Lisa", "Marketing", 4500),
                 ("Tom", "Engineering", 8000))

columns = ["name", "department", "salary"]

df_employees = spark.createDataFrame(employee_data, columns)

### Funkcije rangiranja

Funkcija row_number koristi se za dodjelu slijednog broja retka koji počinje od 1 - svaki broj koristi samo jednom neovisno o tome je li došlo do izjenačenja.

In [None]:
from pyspark.sql.window import Window
from pyspark.sql.functions import row_number

window_order = Window.partitionBy("department").orderBy("salary")

df_employees.withColumn("row_number", row_number().over(window_order)).show()

Funkcije rank i dense_rank dodjeljuju rang rezultatu unutar prozora. Dok rank ostavlja prazna mjesta ako dolazi do izjednačenja, dense_rank preskače praznine.

In [None]:
from pyspark.sql.functions import rank, dense_rank

df_employees.withColumn("rank", rank().over(window_order)) \
            .withColumn("dense_rank", dense_rank().over(window_order)) \
            .show()

Funkcija percent_rank namijenjena je za izračunavanje percentila za svaki redak unutar prozora.

In [None]:
from pyspark.sql.functions import percent_rank

df_employees.withColumn("percent_rank", percent_rank().over(window_order)).show()

Funkcija prozora ntile koristi se za podjelu rezultata unutar prozora u n jednakih dijelova.

In [None]:
from pyspark.sql.functions import ntile

df_employees.withColumn("ntile", ntile(2).over(window_order)).show()

### Analitičke funkcije 

Funkcija cume_dist koristi se za izračun kumulativne distribucije vrijednosti unutar svakog prozora.

In [None]:
from pyspark.sql.functions import cume_dist

df_employees.withColumn("cume_dist", cume_dist().over(window_order)).show()

Funkcija lag se koristi za dobivanje n-tog prethodnika reda unutar prozora, dok ćete funkciju lead iskoristiti kada želite dobit n-tog sljedbenika reda.

In [None]:
from pyspark.sql.functions import lag, lead

df_employees.withColumn("lag", lag("salary", 1).over(window_order)) \
            .withColumn("lead", lead("salary", 1).over(window_order)) \
            .show()

### Agregatne funkcije



In [None]:
from pyspark.sql.functions import avg, sum, min, max

window_aggregation  = Window.partitionBy("department")

df_employees.withColumn("row", row_number().over(window_order)) \
            .withColumn("avg", avg(col("salary")).over(window_aggregation)) \
            .withColumn("sum", sum(col("salary")).over(window_aggregation)) \
            .withColumn("min", min(col("salary")).over(window_aggregation)) \
            .withColumn("max", max(col("salary")).over(window_aggregation)) \
            .where(col("row") == 1).select("department", "avg", "sum", "min", "max") \
            .show()

## Čitanje iz datoteke
PySpark posjeduje kapacitet otkrivanja scheme - možete uključiti tu opciju postavljajući inferSchema na True. Ovaj opcionalni parametar tjera PySpark da prođe kroz podatke dvaput - prvi put kako bi postavio tip svakog stupca i drugi put da bi učitao podatke. Time učitavanje traje duže, ali nam pomaže izbjeći ručno pisanje scheme. Preuzmite skup podataka sa sljedeće poveznice https://www.kaggle.com/datasets/crawford/80-cereals te ga stavite u mapu u kojoj se nalazi vaša bilježnica.

In [None]:
# Učitavanje datoteke sa zaglavljem i otkrivanjem scheme
df_cereal = spark.read.format('csv').option('header', True).option('inferSchema', True).load('cereal.csv')

# Prikaz samo prva tri reda
df_cereal.show(3)

# Primijetite tipove 
df_cereal.printSchema()

## Agregacije nad grupiranim stupcem
Ako želite grupirati stupce možete se poslužiti transformacijom `groupby('imestupca')`. Metoda `agg()` (agg iz "aggregation") prihvaća jednu ili više agregatnih funkcija iz modula `pyspark.sql.functions`. U primjeru ispod računamo prosječni broj kalorija po tipu žitarica te novi stupac imenujemo metodom `alias('imestupca')` koji bi u protivnom nosio naziv prema funkcijama izvedenim u `agg()` metodi - u konkretnom slučaju "round(avg(calories), 2)".

In [None]:
from pyspark.sql.functions import round

df_cereal.groupBy('type') \
            .agg(round(avg('calories'), 2).alias('avg_calories')) \
            .show()

## Važno! 
Prije nastavka rješavanja obavite zadatak iz poglavlja "*Povezivanje PySparka s Kafkom*". Obratite pozornost na stupce u datoteci - ako ste čitali cijelu vrijednost cijeli redak će vam biti zapisan pod jedan stupac. Razdvojite datoteku po stupcima prije nastavka - npr. spremanjem datoteke nanovo u CSV formatu.
## Dnevni prosjek 
Vaš zadatak je prikazati kretanje prosječne dnevne cijene kroz proteklih sto dana koristeći DataFrame kao strukturu podataka. Ispišite vaše rješenje na ekran - poredajte elemente silazno po prosječnoj cijeni zaokruženoj na jednu decimalu te im dodijelite rang. Možete koristiti `from_unixtime` iz `pyspark.sql.functions` modula za izvršenje zadatka.

In [11]:
from pyspark.sql.functions import to_date, split, desc, rank, avg, round
from pyspark.sql.window import Window

kafka_df = (
    spark.readStream
    .format("kafka")
    .option("kafka.bootstrap.servers", "localhost:9092")
    .option("subscribe", "BTC")
    .option("startingOffsets", "earliest")
    .load()
)

kafka_df = kafka_df.withColumn("value", kafka_df["value"].cast("string"))

split_columns = split(kafka_df['value'], ',')
kafka_df = kafka_df.withColumn('date', to_date(split_columns.getItem(0)))
kafka_df = kafka_df.withColumn('timestamp', split_columns.getItem(1).cast('integer'))
kafka_df = kafka_df.withColumn('price', split_columns.getItem(2).cast('decimal(30, 15)'))
kafka_df = kafka_df.select(['date', 'price'])

kafka_df = (
    kafka_df.groupBy('date')
    .agg(round(avg('price'), 1).alias('avg_price'))
    .orderBy('avg_price', ascending=False)
)

# kafka_df = kafka_df.withColumn("rank", rank().over(Window.partitionBy('timestamp').orderBy(desc("timestamp"))))

In [12]:
kafka_df.printSchema()

root
 |-- date: date (nullable = true)
 |-- avg_price: decimal(17,1) (nullable = true)



In [13]:
kafka_query = (
    kafka_df.writeStream
    .format("console")
    # .option("path", "output/data")
    # .option("checkpointLocation", "output/checkpoint")
    .outputMode("complete")
    .start()
)

kafka_query.processAllAvailable()

# Resilient Distributed Dataset (RDD) 
Postoje dva načina kako kreirati RDD: 
* paralelizacijom postojeće kolekcije - pozivanjem metode parallelize
* referenciranjem skupa podataka u vanjsokom sustavu za pohranu - pozivanjem metode textfile koja kao argument prihvaća put do datoteke


RDD se poziva nad SparkContextom koji možete izvući iz SparkSession objekta.

In [None]:
# SparkContext objekt iz SparkSessiona
sc = spark.sparkContext

# Paralelizacija postojeće kolekcije
rdd_array = sc.parallelize([1, 2, 3])

# Učitavanje iz datoteke - zamijenite naziv CSV datoteke sa svojim
rdd_csv = sc.textFile("coincap.csv")

## Osnovne akcije nad RDD-ovima 
Metoda `collect()` dohvaća cijeli RDD (primijetite kako je svaki redak pročitan kao jedan element iako sadrži dva podatka). 

In [None]:
rdd_csv.collect()

Metoda `count()` pokazuje broj elemenata u RDD-u.

In [None]:
rdd_csv.count()

Kako biste uzeli određeni broj elemenata (po redu) potrebno je pozvati `take(n)` gdje n ukazuje koji je to broj elemenata:

In [None]:
rdd_csv.take(2)

## Transformacije nad RDD-ovima

Tranformacija filter se koristi kako bi se zadržali elementi koji ispunjavaju određeni uvjet; nad svakim elementom se poziva funkcija te ako je odgovor na tu funkciju True taj element se zadržava.

In [None]:
veci_od_jedan = rdd_array.filter(lambda x: x > 1)
veci_od_jedan.collect()

Koristeći map transformaciju možete nad svakim elementom RDD-a izvesti određenu funkciju. Primjerice, ako imate csv datoteku u kojoj imate više stupaca - te elemente možete pročitati pozivajući funkciju koja će razdvojiti vaš redak u elemente po delimiteru u vašoj datoteci.

Kao vježbu napišite naredbu s lambda funkcijom koja će vaš rdd_csv razdvojiti po stupcima te prikažite novi RDD na zaslonu.

In [5]:
sc = spark.sparkContext

cereal_rdd = sc.textFile('cereal.csv')\
    .map(lambda x: x.split(','))

cereal_rdd.collect()

[['name',
  'mfr',
  'type',
  'calories',
  'protein',
  'fat',
  'sodium',
  'fiber',
  'carbo',
  'sugars',
  'potass',
  'vitamins',
  'shelf',
  'weight',
  'cups',
  'rating'],
 ['100% Bran',
  'N',
  'C',
  '70',
  '4',
  '1',
  '130',
  '10',
  '5',
  '6',
  '280',
  '25',
  '3',
  '1',
  '0.33',
  '68.402973'],
 ['100% Natural Bran',
  'Q',
  'C',
  '120',
  '3',
  '5',
  '15',
  '2',
  '8',
  '8',
  '135',
  '0',
  '3',
  '1',
  '1',
  '33.983679'],
 ['All-Bran',
  'K',
  'C',
  '70',
  '4',
  '1',
  '260',
  '9',
  '7',
  '5',
  '320',
  '25',
  '3',
  '1',
  '0.33',
  '59.425505'],
 ['All-Bran with Extra Fiber',
  'K',
  'C',
  '50',
  '4',
  '0',
  '140',
  '14',
  '8',
  '0',
  '330',
  '25',
  '3',
  '1',
  '0.5',
  '93.704912'],
 ['Almond Delight',
  'R',
  'C',
  '110',
  '2',
  '2',
  '200',
  '1',
  '14',
  '8',
  '-1',
  '25',
  '3',
  '1',
  '0.75',
  '34.384843'],
 ['Apple Cinnamon Cheerios',
  'G',
  'C',
  '110',
  '2',
  '2',
  '180',
  '1.5',
  '10.5',
  '10',

Transformacija reduce primjenjuje danu funkciju na dva parametra istog tipa te vraća jedan rezultat. Ta funkcija se primjenjuje elementima u strukturi sličnoj stablu, gdje se na svakoj razini broj elemenata prepolavlja sve dok se ne dođe do jednog elementa.

In [None]:
sum = rdd_array.reduce(lambda x, y: x + y)
print(sum)

Transformacija groupByKey grupira zapise prema ključu i vraća RDD gdje svaki zapis ima ključ i listu vrijednosti povezanih s tim ključem.

In [None]:
rdd_employees = sc.parallelize([("Engineering", "Mike"), ("Engineering", "Tom"), ("Marketing", "Lisa")])

for key, values in rdd_employees.groupByKey().collect():
    print(f"{key}: {list(values)}")

## Najviše dnevne cijene

Vaš zadatak je pronaći najveći dnevni maksimum kroz protekla dva tjedna koristeći RDD kao strukturu podataka. Pripazite na tipove učitanih podataka. Ovisno o kodu koji unesete možete nepovratno "pokvariti" kernel kada računate s RDD-om u Jupyteru pa ga stoga iznova pokrenite s vremena na vrijeme.

In [8]:
from datetime import datetime, timedelta
from decimal import Decimal

coin_rdd = (
    sc.textFile('coin_data.csv')
    .map(lambda x: x.split(','))
    .map(lambda x: (datetime.strptime(x[0], "%Y-%m-%d"), Decimal(x[1])))
    .filter(lambda x: x[0] > datetime.now() - timedelta(days=14))
    .reduceByKey(lambda x, y: max(x, y))
)

coin_rdd.collect()

[(datetime.datetime(2023, 4, 17, 0, 0), Decimal('30361.310477781287508')),
 (datetime.datetime(2023, 4, 20, 0, 0), Decimal('28993.516167577716877')),
 (datetime.datetime(2023, 4, 21, 0, 0), Decimal('28363.641508847066238')),
 (datetime.datetime(2023, 4, 22, 0, 0), Decimal('27979.228066477305455')),
 (datetime.datetime(2023, 4, 25, 0, 0), Decimal('28313.967171490043940')),
 (datetime.datetime(2023, 4, 27, 0, 0), Decimal('29805.672889383884658')),
 (datetime.datetime(2023, 4, 28, 0, 0), Decimal('29557.199025097977494')),
 (datetime.datetime(2023, 4, 18, 0, 0), Decimal('30412.312209607134311')),
 (datetime.datetime(2023, 4, 19, 0, 0), Decimal('30405.896696393288237')),
 (datetime.datetime(2023, 4, 23, 0, 0), Decimal('27947.727003361786518')),
 (datetime.datetime(2023, 4, 24, 0, 0), Decimal('27858.053217704780079')),
 (datetime.datetime(2023, 4, 26, 0, 0), Decimal('29964.030845553844425')),
 (datetime.datetime(2023, 4, 29, 0, 0), Decimal('29488.927214196094023'))]