# Spark galvenās datu struktūras un darbību izpilde 

Spark veic darbības ar atmiņā esošiem datiem aptuveni divas kārtas ātrāk, nekā mapreduce un aptuveni kārtu ātrāk ar datiem, kas atrodas uz diska.  

Spark ir general purpose rīks, kas ir piemērots lielākajai daļai lielo datu uzdevumu.  

Spark katra komanda ir **transformācija** vai **darbība**.  
'Slinkā' novērtēšana atliek datu transformāciju izpildi līdz tā ir patiesi nepieciešama,ar katru nākamo transformāciju tikai papildinot/uzlabojot izpildes plānu. Kad tiek izsaukta darbība, transformācijas tiek izpildītas pēc plāna un rezultāts tiek vai nu atgriezts uz ekrāna, ierakstīts faiā vai failu sistēmā vai padots tālāk citām transformācijām.  


Spark ir trīs veidu datu struktūras.  
* Resilient Distributed Dataset (RDD)
* Data Frame (DF)  
* Data Set (DS)

RDD ir Spark pamats. DF un DS struktūras ir izveidotas, uzbūvējot papildu loģiku un funkcionalitāti pa virsu RDD.  

DF ir uzbūvēts, lai atļautu tipiskās tabulāro datu darbības. Līdzīgi kā pandas DataFrame, atbalsta filter, select utml. operācijas. Atšķirībā no pandas DataFrame, ja DF reģistrē (darbība), tad var var rakstīt SQL vaicājumus tieši uz DF. DF ir kolonnas un kolonnu tipi, bet viņi nav stingri definēti. Int kolonnā var ierakstīt string vērtības u.t.t.  

DS ir DataFrame, tikai ar stingri definētiem kolonnu tipiem, kas atļauj nišejiskas optimizācijas un atļauj nelegālas darbības ar datiem (piem. string kolonnas vērtību atņemšana no integer kolonnas vērtībām) noķert, jau veidojot transformācijas (compile time), nevis, kad tiek izsaukta darbība (runtime).


Pēc darbības izpildes, izpildes plānu un laiku, kas pavadīts uz katru darbību var redzēt caur Spark Web UI. Tas atļauj atrast algoritmiskos pudeles kaklus un tādējādi arī tos novērst.


Ja tiek izmantoti vairākas savstarpēji neatkarīgas mašīnas, tad Spark katrai no šīm mašīnām piešķir skaitļošanas 'lomas'. Spark izmanto tā saucamo Driver/Executors pieeju. Serveris uz kura rezultāts tiek pieprasīts un atgriezts izpilda Driver lomu un visas pārējās mašīnas izpilda Executor lomu. Uz Driver tiek sastādīts izpildes plāns, un, ja uz Driver mašīnas atrodas dati, tad tie arī tiek nosūtīti visām pārējām mašīnām, uz kurām tad attiecīgi notiek paralelizējamie aprēķini.  
Kopsavilkuma aprēķins un rezultātu atainošana gan notiek uz Driver.

# SparkSession, tā konfigurēšana  
SparkSession kalpo kā ieejas punkts visām datu struktūrām. Uz vienas mašīnas/klastera paralēli var eksistēt tikai viens SparkSession objekts un SparkSession objekts ir arī tieši saistīts ar Web UI.  

In [2]:
# Jautājums: Vai šīs sekojošās koda rindiņas nostrādās?
spark

In [11]:
spark.conf

In [12]:
spark.sparkContext

# Spark datu struktūru izveide
RDD var izveidot no Python datu struktūrām, bet lielākoties, ja mēs izmantojam Spark, tad datu avots būs vai nu HDFS/S3 vai lieli lokālie teksta faili.  
RDD ir **nemaināma** (Immutable) datu struktūra. Ja uz to grib veikt darbības, jāveido jauna instance.  
Tāpēc, ka RDD ir nemaināma, arī DF un DS ir nemaināmas. Nemaināmībai ir savas priekšrocības, kā vienu var minēt, ka mainīgais/datu struktūra var rasties tieši no vienas vietas, kas samazina potenciālos kļūdu avotus. Ja ir vairākas funkcijas, kas lasa vienu un to pašu datu struktūru un veic vienas un tās pašas darbības, rezultāts vienmēr būs viens un tas pats.

In [3]:
# Python visas iebūvētās datu struktūras ir maināmas
n = 2
n+=1 # Mainīgajā n tiek mainīta vērtība 'in-place'. 

some_list = [20,30,40,50,7694]
some_list.pop() # list arī ir 'in-place' maināma datu struktūra
print(some_list)

# Ja mēs .pop() lietotu vairākas reizes uz vienu un to pašu sarakstu, tad katru reizi mums būtu cits rezultāts.
# Paradigma, kas to nepieļauj piespiež veidot jaunu struktūru, 
# tādējādi mums nebūs kļūdu, kas dažreiz notiek, bet citas reizes ne. Vismaz tieši Spark kodā.

[20, 30, 40, 50]


In [78]:
# RDD var izveidot no list
lst = [[14],[23],[32],[41]]
a = spark.sparkContext.parallelize(lst)
a_mapped = a.map(lambda x: 2*x).map(lambda x: x[0]) # map funkcija tranformē katru elementu

# Uzdevums:
# Paskatamies uz web interface.

In [80]:
a_mapped.reduce(lambda summ, elem: summ + elem)

110

## Spark DataFrame izveide un darbības

In [19]:
# RDD var arī pārvērst par DataFrame, bet ir jāuzdefinē shēma
from pyspark.sql.types import StructType
from pyspark.sql.types import StructField

from pyspark.sql.types import IntegerType

column_list = [StructField('kolonnas_nosauk',IntegerType(),True)]

schema = StructType(column_list)

b = a.toDF(schema)

In [37]:
b.createOrReplaceTempView("tabula")

In [40]:
# Jautājums: 
# Vai šis nostrādās?
# spark.sql("select * from tabula").show()

In [None]:
csv_frame.describe().show()

In [None]:
# DF var izveidot arī no csv faila
csv_frame = spark.read.csv('order.csv')
csv_frame.printSchema()

In [69]:
# Vai json faila
json_frame = spark.read.json('example.json')
json_frame.printSchema()
# json_frame.show() # Šis nenostrādās jo json queryot tiešā veidā nav iespējams, ja nav json mapējuma uz shēmu
# , bet ielasīšana notiek. 
# .show() efektīvi ir select * from df limit 20

root
 |-- _corrupt_record: string (nullable = true)



In [7]:
# DF darbības, izmantojot select, filter, map, reduce. arī show.
# csv_frame.show()

In [8]:
csv_frame = spark.read.format("csv").option("header","true").load('order.csv')
csv_frame.show()

+--------+----------+-------+-----------+
|order_id|order_date| amount|customer_id|
+--------+----------+-------+-----------+
|       1|07/04/1776|$234.56|          1|
|       2|03/14/1760| $78.50|          3|
|       3|05/23/1784|$124.00|          2|
|       4|09/03/1790| $65.50|          3|
|       5|07/21/1795| $25.50|         10|
|       6|11/27/1787| $14.40|          9|
+--------+----------+-------+-----------+



In [None]:
# Tāpat kā pandas df, var lietot .describe()

In [77]:
# Uzdevums:
# Ielasiet iekšā Spark DF datu failu, kas atrodas šeit:
# https://raw.githubusercontent.com/reversedego/RCS_course/master/5_spark/records.csv

In [10]:
csv_frame = spark.read.format("csv").option("header","true").load('records.csv')
csv_frame.show()

+--------------------+--------------------+---------------+-------------+--------------+----------+---------+----------+----------+----------+---------+-------------+----------+------------+
|              Region|             Country|      Item Type|Sales Channel|Order Priority|Order Date| Order ID| Ship Date|Units Sold|Unit Price|Unit Cost|Total Revenue|Total Cost|Total Profit|
+--------------------+--------------------+---------------+-------------+--------------+----------+---------+----------+----------+----------+---------+-------------+----------+------------+
|  Sub-Saharan Africa|                Chad|Office Supplies|       Online|             L| 1/27/2011|292494523| 2/12/2011|      4484|    651.21|   524.96|   2920025.64|2353920.64|   566105.00|
|              Europe|              Latvia|      Beverages|       Online|             C|12/28/2015|361825549| 1/23/2016|      1075|     47.45|    31.79|     51008.75|  34174.25|    16834.50|
|Middle East and N...|            Pakistan|  

In [11]:
csv_frame.createOrReplaceTempView("sales")

In [12]:
# spark.sql("select * from 'Ship Date'").show()

In [13]:
# Uzdevums:
# 1. Uzrakstiet vaicājumu, kas saskaita cik ir unikālu valstu uz kurām tiek veiktas piegādes. Apskatiet izpildes plānu.
# 2. Uzrakstiet vaicājumu, kas saskaita cik ir piegāžu uz katru valsti. 
# Salīdziniet izpildes plānu ar iepriekšējo punktu. 
# Vai kādas daļas tika izlaistas? Kāpēc tā varētu būt?

In [14]:
# Piemērs:
# Vēlamies atlasīt pilnīgi visus decembra sūtījumus un pārbaudīt vai 12. mēnesī ienākumu mums bija vairāk, nekā vidēji.
# Lai to izdarītu, nepieciešams pārveidot kolonnu 'Ship Date' par divām. Par dienu un mēnesi t.i. Ship_day , Ship_month

from pyspark.sql.functions import col # col funkcija atļauj uz kolonnām atsaukties tikai ar to nosaukumiem
from pyspark.sql.functions import regexp_replace
from pyspark.sql.functions import regexp_extract

# regexp_replace un regexp_extract ir funkcijas, kas izmanto regular expressions teksta pavedienu šķelšanas principus. 
# Regular expressions ir ārkārtīgi plaši pielietota teksta pavedienu apstrādes metode, 
# sastopama visās programmēšanas valodās, bet tā nav pilnīgi universāla. Dažām valodām ir nelielas variācijas regex 
# simbolu mijiedarbībām.

updated_frame = csv_frame\
.withColumn("Ship_day",regexp_replace(col("Ship Date"),"\/.*$","" ))\
.withColumn("Ship_month",regexp_extract(col("Ship Date"),"(?<=\/).*?(?=\/)",0))\
.withColumn("Revenue",col("Total Revenue")) # Pārsaucu kolonnu, lai nevajadzētu SQL vaicājumā lietot atstarpi

# Šo var izdarīt arī ar MySQL ar funkciju, kas saucās tāpat.
# https://dev.mysql.com/doc/refman/8.0/en/regexp.html

# updated_frame.select(["Ship_day","Ship_month"]).show() # Iebūvēto transformāciju piemērs, kur selectē sarakstu ar kolonnām.
updated_frame.createOrReplaceTempView("sales") # Padaram updated_frame pieejamu priekš SQL vaicājumiem kā tabulu 'sales'

In [15]:
# Ar šo vaicājumu tagad ir iegūts starprezultāts un tas parādīts uz ekrāna. 
spark.sql("select Ship_month, sum(Revenue) from sales group by ship_month").show()

+----------+----------------------------+
|Ship_month|sum(CAST(Revenue AS DOUBLE))|
+----------+----------------------------+
|         7|               4.056610547E8|
|        15|        3.9870536796000034E8|
|        11|        4.5867255731000006E8|
|        29|         3.786441625999999E8|
|         3|         4.521581724900002E8|
|        30|         3.847275025599999E8|
|         8|        4.2590097509999996E8|
|        22|         4.533163571899999E8|
|        28|         4.588998300499998E8|
|        16|               4.035133133E8|
|         5|        4.7138266552000004E8|
|        31|        2.7400352576000005E8|
|        18|         4.290000913999998E8|
|        27|        5.1839069140000033E8|
|        17|        4.2104703061999977E8|
|        26|         5.008667586800002E8|
|         6|         4.512693841599997E8|
|        19|        4.2144749794000006E8|
|        23|              4.4774310822E8|
|        25|         4.021502548800001E8|
+----------+----------------------

In [9]:
# Uzdevumi: 
# 1. Novietojiet blakus 12. mēneša ienākumus un vidējos ienākumus pa visiem pārējiem mēnešiem.
# 2. Apskatiet izpildes plānu

Lai gan šajos piemēros lietots tiek SQL jo to ir intuitīvāk saprast un vieglāk nepieļaut sintaktiskas kļūdas, vienmēr var izmantot DataFrame datu struktūras transformācijas kā .select() , .orderBy() , .groupBy()  , .filter()  u.t.t.  
Performances ziņā, starpības nav jo pēc tam, kad kods tiek noparsēts (īsi sakot, izveidots izpildes plāns), tas izpildās vienādi gan ķēdējot kopā funkcijas, gan ar spark.sql()

In [45]:
# Uzdevums:
# Saskaitiet visas piegādes, kuras netika veiktas uz valstīm, kuru nosaukumos nav e burta (mazā), 
# atrodot fundamentālo (nevis sintakses) kļūdu šajā kodā.
from pyspark.sql.functions import lit

contains_frame = csv_frame\
.withColumn("does_contain",lit(col("Country").contains("e")))\
.filter(csv_frame['does_contain'] != True)

# Funkcionālās darbības  
Šīs nav vienīgās darbības, kuras ar Spark datu struktūrām var veikt. Ir iespējams veikt arī programmistiskākas darbības, izmantojot .map() funkciju, bet tās strādā tikai uz RDD un DS.

In [1]:
lyrics = [["Twinkle twinkle little star"],\
          ["How I wonder what you are"],\
          ["Twinkle twinkle little star"]\
         ]
words = spark.sparkContext.parallelize(lyrics) #Izveido RDD


from pyspark.sql.types import StructType
from pyspark.sql.types import StructField
from pyspark.sql.types import StringType

# Definē shēmu, izmantojot Spark lietotos iebūvētos tipus. 
# # StructField - Lauks. Jeb kolonna.
# Spark DF atbalsta arī alternatīvas kolonnu definīcijas.
# Tāpēc arī jānorāda specifiski, ka kolonna 'Line' sastāvēs no string vērtībām un būs NULL`ojama.

#StructType objektā ieliekam python sarakstu ar StructField objektiem.
words_schema = StructType([StructField('Line',StringType(),True)])
words_df = spark.createDataFrame(words,words_schema)

# Piezīme sev: Pārbaudīt vai var uztaisīt trīs līmeņus ar hierarhiju un uztaisīt kkādu ORM.

In [4]:
# Klasiskais Spark wordcount piemērs
# Katra 'rindiņa' iekš RDD ir saraksts, kurā iekšā ir string.
# katrs ieraksts -  vārds tiek pārvērsts par ierakstu ('vards', 1)
words.flatMap(lambda line : line[0].split(" ")).map(lambda word: (word,1)).reduceByKey(lambda a,b: a+b).collect()

[('twinkle', 2),
 ('wonder', 1),
 ('are', 1),
 ('Twinkle', 2),
 ('little', 2),
 ('star', 2),
 ('How', 1),
 ('I', 1),
 ('what', 1),
 ('you', 1)]

RDD transformācijas tipiski tiek izmantotas priekš datiem, kuriem nav definēta vai definējama shēma (visbiežāk teksts), bet strādājot ar datiem ar shēmu visbiežāk tiek izmantots dataframe.  
Praktiski, jūs vienmēr ielasīsiet tabulārus datus un tad domāsiet, ko ar viņiem tālāk darīt.  

In [24]:
# Uzdevums:  
# 1) Ielasiet Spark DataFrame objektā json failu, kas atrodams šeit:

# 2) Papildiniet šo DataFrame ar vēl divām rindām un saglabājiet to json failā flat_output.json
# 3) Izpētiet rezultātu caur komandrindu.

## Spark skaitļošanas paralelizācija  
Spark, kā neatkarīgo vienību uz kuras var veikt neatkarīgas darbības, sauc par partīciju. Partīciju skaits var noteikt skaitļošanas laiku. Spark datu struktūrām ir iespējams manuāli iestatīt to, cik partīcijās datus sadalīt.  
Šo īpašību var izmantot, ja ir kādas divas, lielas, distributētas tabulas, piem. uz HDFS, tad, ja tabulas ielasot, mēs manuāli iestādam abām tabulām partīciju skaitu uz 10, tad, veicot joinu, notiek map side join  
https://jaceklaskowski.gitbooks.io/mastering-spark-sql/spark-sql-joins-broadcast.html  
To var darīt ar .repartition() vai .coalesce() funkcijām  


Vēl viens veids kā izmantot partīciju skaitu, lai optimizētu būtu izmantot partīciju sapludināšana (.coalesce()), lai novērstu pilnu shuffle piem. apvienojot datus, kuri nav tik lieli, bet shuffle paredzams iespaidīgs (3+ groupby statementi spark.sql() komandā), tad varētu būtu prātīgi samazināt partīciju skaitu.

In [28]:
# Šādi var ierakstīt json failu no dataframe. Ievērojiet coalesce komandu.
words_df.coalesce(1).write.format('json').save('coalesced_res.json')

In [18]:
# Uzdevums: 
# 1) Ielasiet coalesced_res.json atpakaļ dataframe objektā, saskaitiet unikālās vērtības katrā kolonnā.
# 2) Apskatiet izpildes plānu un salīdziniet ielasīšanas operācijas ar tām, kas tika veiktas no flat_json.json

In [31]:
# Uzdevums:
# Katrā valstī ir savs piegādes serviss un katram pasūtījumam ir piegādes prioritāte.
# Vissenāk pasūtītās lietas būtu jāpiegādā pirmās.
# Izveidot sarakstu ar visprioritārākajām lietām iekš C, H un M prioritātēm katrai valstij

Labs demo par to, ko ar Spark var izdarīt:  
https://www.youtube.com/watch?v=2Qj1b4TruKA