# Projekt Apache Spark

# Wprowadzenie

## Dodatkowe biblioteki wymagane przez szablon

Zanim uruchomisz ten notatnik jako pierwszy w środowisku *Dataproc* lub *Docker* <br>
(jeśli już to zrobiłeś w obecnym środowisku, nie wykonuj tego ponownie)

1. Otwórz terminal (możesz do tego celu wykorzystać interfejs JupyterLab), a następnie wykonaj poniższe instrukcje,
   które pobiorą wymagane dodatkowe biblioteki i umieszczą je w katalogu SPARK_HOME/jars

   ```
   wget https://jankiewicz.pl/bigdata/metricfunctions.jar
   wget https://jankiewicz.pl/bigdata/toolsfunctions.jar
   sudo cp *.jar $SPARK_HOME/jars
   ```

## Dodatkowe biblioteki i parametry konfiguracyjne 

2. Jeśli Twój projekt wymaga dodatkowych bibliotek lub parametrów konfiguracyjnych, uzupełnij instrukcję w poniższym paragrafie 

(do ewentualnego uzupełnienia)
a) Sprawdź wersję Apache Spark
    
```
spark-submit --version
```
    
b) Odczytaj ze strony `https://docs.delta.io/latest/releases.html` która wersja biblioteki Delta Lake odpowiada Twojej wersji Sparka. 
    
c) Sprawdź w którym katalogu znajdują się pliki konfiguracyjne naszych kerneli 
    
```
jupyter kernelspec list
```
    
d) Wprowadź zmiany do pliku konfiguracyjnego kernela
    
```
cd /opt/conda/miniconda3/share/jupyter/kernels/apache_toree_scala
    
sudo cp kernel.json kernel.json.tmp
    
jq '.env.SPARK_OPTS = "--packages io.delta:delta-spark_2.12:3.2.1 --conf spark.sql.extensions=io.delta.sql.DeltaSparkSessionExtension --conf spark.sql.catalog.spark_catalog=org.apache.spark.sql.delta.catalog.DeltaCatalog"' kernel.json.tmp | sudo tee kernel.json > /dev/null
```
    
    
Kilka słów komentarza.

W założeniu biblioteka `io.delta:delta-spark_2.12:3.2.1` to najnowsza z wersji utworzonych dla wersji Sparka 3.5.X (patrz: `https://github.com/delta-io/delta/releases`)

Parametry konfiguracyjne:

- `spark.sql.extensions` = `io.delta.sql.DeltaSparkSessionExtension`
- `spark.sql.catalog.spark_catalog` = `org.apache.spark.sql.delta.catalog.DeltaCatalog`

pozwolą nam odpowiednio używać rozszerzeń w poleceniach SQL dostępnych dla Delta Lake oraz pozwolą obsługiwać tabele Delta Lake w repozytorium metadanych. 
 repozytorium metadanych.

2. Zrestartuj wszystkie kernele

# Dalsze instrukcje 

Wykorzystując ten notatnik jako szablon, zrealizuj projekt Apache Spark zgodnie z przydzielonym zestawem. 

Kilka uwag:

* Nie modyfikuj ani nie usuwaj paragrafów *markdown* w tym notatniku, chyba że wynika to jednoznacznie z instrukcji. 
* Istniejące paragrafy zawierające *kod* uzupełnij w razie potrzeby zgodnie z instrukcjami
    - nie usuwaj ich
    - nie usuwaj zawartych w nich instrukcji oraz kodu
    - nie modyfikuj ich, jeśli instrukcje jawnie tego nie nakazują
* Możesz dodawać nowe paragrafy zarówno zawierające kod jak i komentarze dotyczące tego kodu (markdown)

# Treść projektu

Poniżej w paragrafie *markdown* wstaw tytuł przydzielonego zestawu

# Zestaw 0 – wzorzec

**Uwaga**

- W ramach wzorca nie są spełnione żadne reguły projektu. 
- Brak konsekwencji w wykorzystaniu właściwego API w ramach poszczególnych części
- Zadanie *misji głównej* polega na zliczeniu słówek.  

# Działania wstępne 

W poniższym paragrafie uzupełnij polecenia definiujące poszczególne zmienne. 

Pamiętaj abyś:

* w późniejszym kodzie, dla wszystkich cześci projektu, korzystał z tych zdefiniowanych zmiennych. Wykorzystuj je analogicznie jak parametry
* przed ostateczną rejestracją projektu usunął ich wartości, tak aby nie pozostawiać w notatniku niczego co mogłoby identyfikować Ciebie jako jego autora

In [2]:
// pełna ścieżka do katalogu w zasobniku zawierającego podkatalogi datasource1 i datasource4 z danymi źródłowymi
val inputDir = "/home/jovyan/data/zestaw10"

inputDir = /home/jovyan/data/zestaw10


/home/jovyan/data/zestaw10

Nie modyfikuj poniższych paragrafów. Wykonaj je i używaj zdefiniowanych poniżej zmiennych jak parametrów Twojego programu.

In [3]:
// NIE ZMIENIAĆ
// ścieżki dla danych źródłowych 
val datasource1Dir = inputDir + "/datasource1"
val datasource4Dir = inputDir + "/datasource4"

// nazwy i ścieżki dla wyników 
// część 1 (Spark Core - RDD) 
val rddResultDir = "/tmp/output1"

// część 2 (Spark SQL - DataFrame)
val dfResultTable = "output2"

// część 3 (Spark SQL - Dataset)
val dsResultFile = "/tmp/output3.json"

datasource1Dir = /home/jovyan/data/zestaw10/datasource1
datasource4Dir = /home/jovyan/data/zestaw10/datasource4
rddResultDir = /tmp/output1
dfResultTable = output2
dsResultFile = /tmp/output3.json


/tmp/output3.json

In [4]:
// NIE ZMIENIAĆ
import ToolsFunctions._
import SparkMetrics._

Poniższe paragrafy mają na celu usunąć ewentualne pozostałości poprzednich uruchomień tego lub innych notatników

In [5]:
// NIE ZMIENIAĆ
// usunięcie miejsca docelowego dla część 1 (Spark Core - RDD) 
deleteDir(spark, rddResultDir)

Successfully deleted directory: /tmp/output1


In [6]:
// NIE ZMIENIAĆ
// usunięcie miejsca docelowego dla część 2 (Spark SQL - DataFrame) 
dropTable(spark, dfResultTable)

The table output2 does not exist.
Successfully deleted directory: file:/home/jovyan/work/project2/spark-warehouse/output2


In [7]:
// NIE ZMIENIAĆ
// usunięcie miejsca docelowego dla część 3 (Spark SQL - Dataset) 
deleteDir(path = dsResultFile)

Successfully deleted file: /tmp/output3.json


***Uwaga!***

Uruchom poniższy paragraf i sprawdź czy adres, pod którym dostępny *Apache Spark Application UI* jest poprawny wywołując następny testowy paragraf. 

W razie potrzeby określ samodzielnie poprawny adres pod którym dostępny *Apache Spark Application UI*

In [8]:
// adres URL, pod którym dostępny Apache Spark Application UI (REST API) 

val sparkUiAddress = extractHostAndPort(spark, "http://localhost:4040")
sparkUiAddress

sparkUiAddress = http://localhost:4040


http://localhost:4040

In [9]:
// testowy paragraf
val testMetrics = getCurrentMetrics(sparkUiAddress)

testMetrics = Map(resultSerializationTime -> 232, shuffleWriteTime -> 3153692000, shuffleRemoteBytesReadToDisk -> 0, shuffleRemoteBytesRead -> 0, outputBytes -> 0, peakExecutionMemory -> 1059639120, outputRecords -> 0, diskBytesSpilled -> 138071669, shuffleLocalBlocksFetched -> 115, numCompleteTasks -> 82, shuffleWriteBytes -> 139704075, shuffleReadBytes -> 141148875, executorDeserializeTime -> 1233, executorDeserializeCpuTime -> 735307400, numCompletedIndices -> 82, numActiveTasks -> 0, numTasks -> 209, inputBytes -> 3785496010, memoryBytesSpilled -> 1142943488, inputRecords -> 127623804, jvmGcTime -> 9507, executorCpuTime -> 548303174000, numFailedTasks -> 0, shuffleFetchWaitTime -> 0, shuffleRemoteBlocksFetched -> 0, executorRunTime -> 665203, shuffleLocalBytesRead ...


Map(resultSerializationTime -> 232, shuffleWriteTime -> 3153692000, shuffleRemoteBytesReadToDisk -> 0, shuffleRemoteBytesRead -> 0, outputBytes -> 0, peakExecutionMemory -> 1059639120, outputRecords -> 0, diskBytesSpilled -> 138071669, shuffleLocalBlocksFetched -> 115, numCompleteTasks -> 82, shuffleWriteBytes -> 139704075, shuffleReadBytes -> 141148875, executorDeserializeTime -> 1233, executorDeserializeCpuTime -> 735307400, numCompletedIndices -> 82, numActiveTasks -> 0, numTasks -> 209, inputBytes -> 3785496010, memoryBytesSpilled -> 1142943488, inputRecords -> 127623804, jvmGcTime -> 9507, executorCpuTime -> 548303174000, numFailedTasks -> 0, shuffleFetchWaitTime -> 0, shuffleRemoteBlocksFetched -> 0, executorRunTime -> 665203, shuffleLocalBytesRead ...

# Część 1 - Spark Core (RDD)

## Misje poboczne

W ponizszych paragrafach wprowadź swoje rozwiązania *misji pobocznych*, o ile **nie** chcesz, aby oceniana była *misja główna*. W przeciwnym przypadku **KONIECZNIE** pozostaw je **puste**.  

## Misja główna 

Poniższy paragraf zapisuje metryki przed uruchomieniem Twojego rozwiązania *misji głównej*. 

Nie musisz go uruchamiać podczas implementacji rozwiązania.

In [10]:
//NIE ZMIENIAĆ
val beforeRddMetrics = getCurrentMetrics(sparkUiAddress)

beforeRddMetrics = Map(resultSerializationTime -> 232, shuffleWriteTime -> 3153692000, shuffleRemoteBytesReadToDisk -> 0, shuffleRemoteBytesRead -> 0, outputBytes -> 0, peakExecutionMemory -> 1059639120, outputRecords -> 0, diskBytesSpilled -> 138071669, shuffleLocalBlocksFetched -> 115, numCompleteTasks -> 82, shuffleWriteBytes -> 139704075, shuffleReadBytes -> 141148875, executorDeserializeTime -> 1233, executorDeserializeCpuTime -> 735307400, numCompletedIndices -> 82, numActiveTasks -> 0, numTasks -> 209, inputBytes -> 3785496010, memoryBytesSpilled -> 1142943488, inputRecords -> 127623804, jvmGcTime -> 9507, executorCpuTime -> 548303174000, numFailedTasks -> 0, shuffleFetchWaitTime -> 0, shuffleRemoteBlocksFetched -> 0, executorRunTime -> 665203, shuffleLocalBytes...


Map(resultSerializationTime -> 232, shuffleWriteTime -> 3153692000, shuffleRemoteBytesReadToDisk -> 0, shuffleRemoteBytesRead -> 0, outputBytes -> 0, peakExecutionMemory -> 1059639120, outputRecords -> 0, diskBytesSpilled -> 138071669, shuffleLocalBlocksFetched -> 115, numCompleteTasks -> 82, shuffleWriteBytes -> 139704075, shuffleReadBytes -> 141148875, executorDeserializeTime -> 1233, executorDeserializeCpuTime -> 735307400, numCompletedIndices -> 82, numActiveTasks -> 0, numTasks -> 209, inputBytes -> 3785496010, memoryBytesSpilled -> 1142943488, inputRecords -> 127623804, jvmGcTime -> 9507, executorCpuTime -> 548303174000, numFailedTasks -> 0, shuffleFetchWaitTime -> 0, shuffleRemoteBlocksFetched -> 0, executorRunTime -> 665203, shuffleLocalBytes...

W poniższych paragrafach wprowadź **rozwiązanie** *misji głównej* oparte na *RDD API*. 

Pamiętaj o wydajności Twojego przetwarzania, *RDD API* tego wymaga. 

Nie wprowadzaj w poniższych paragrafach żadnego kodu, w przypadku wykorzystania *misji pobocznych*.

In [11]:
// Wczytanie plików tekstowych
val textFiles = sc.textFile(datasource4Dir)

textFiles = /home/jovyan/data/zestaw10/datasource4 MapPartitionsRDD[1] at textFile at <console>:32


/home/jovyan/data/zestaw10/datasource4 MapPartitionsRDD[1] at textFile at <console>:32

In [12]:
// Podział linii na słowa i zliczanie ilości wystąpień każdego słowa
val wordCounts = textFiles
  .flatMap(line => line.split(" "))
  .map(word => (word, 1))
  .reduceByKey((x, y) => x + y)

wordCounts = ShuffledRDD[4] at reduceByKey at <console>:34


ShuffledRDD[4] at reduceByKey at <console>:34

In [13]:
// Zapis wyniku jako ObjectFile
wordCounts.saveAsObjectFile(rddResultDir)

Poniższy paragraf zapisuje metryki po uruchomieniu Twojego rozwiązania *misji głównej*. 

Nie musisz go uruchamiać podczas implementacji rozwiązania.

In [14]:
// NIE ZMIENIAĆ
val afterRddMetrics = getCurrentMetrics(sparkUiAddress)

afterRddMetrics = Map(resultSerializationTime -> 232, shuffleWriteTime -> 3153692000, shuffleRemoteBytesReadToDisk -> 0, shuffleRemoteBytesRead -> 0, outputBytes -> 0, peakExecutionMemory -> 1059639120, outputRecords -> 0, diskBytesSpilled -> 138071669, shuffleLocalBlocksFetched -> 115, numCompleteTasks -> 82, shuffleWriteBytes -> 139704075, shuffleReadBytes -> 141148875, executorDeserializeTime -> 1233, executorDeserializeCpuTime -> 735307400, numCompletedIndices -> 82, numActiveTasks -> 0, numTasks -> 209, inputBytes -> 3785496010, memoryBytesSpilled -> 1142943488, inputRecords -> 127623804, jvmGcTime -> 9507, executorCpuTime -> 548303174000, numFailedTasks -> 0, shuffleFetchWaitTime -> 0, shuffleRemoteBlocksFetched -> 0, executorRunTime -> 665203, shuffleLocalBytesR...


Map(resultSerializationTime -> 232, shuffleWriteTime -> 3153692000, shuffleRemoteBytesReadToDisk -> 0, shuffleRemoteBytesRead -> 0, outputBytes -> 0, peakExecutionMemory -> 1059639120, outputRecords -> 0, diskBytesSpilled -> 138071669, shuffleLocalBlocksFetched -> 115, numCompleteTasks -> 82, shuffleWriteBytes -> 139704075, shuffleReadBytes -> 141148875, executorDeserializeTime -> 1233, executorDeserializeCpuTime -> 735307400, numCompletedIndices -> 82, numActiveTasks -> 0, numTasks -> 209, inputBytes -> 3785496010, memoryBytesSpilled -> 1142943488, inputRecords -> 127623804, jvmGcTime -> 9507, executorCpuTime -> 548303174000, numFailedTasks -> 0, shuffleFetchWaitTime -> 0, shuffleRemoteBlocksFetched -> 0, executorRunTime -> 665203, shuffleLocalBytesR...

# Część 2 - Spark SQL (DataFrame)

## Misje poboczne

W ponizszych paragrafach wprowadź swoje rozwiązania *misji pobocznych*, o ile **nie** chcesz, aby oceniana była *misja główna*. W przeciwnym przypadku **KONIECZNIE** pozostaw je **puste**.  

## Misja główna 

Poniższy paragraf zapisuje metryki przed uruchomieniem Twojego rozwiązania *misji głównej*. 

Nie musisz go uruchamiać podczas implementacji rozwiązania.

In [15]:
//NIE ZMIENIAĆ
val beforeDfMetrics = getCurrentMetrics(sparkUiAddress)

beforeDfMetrics = Map(resultSerializationTime -> 232, shuffleWriteTime -> 3153692000, shuffleRemoteBytesReadToDisk -> 0, shuffleRemoteBytesRead -> 0, outputBytes -> 0, peakExecutionMemory -> 1059639120, outputRecords -> 0, diskBytesSpilled -> 138071669, shuffleLocalBlocksFetched -> 115, numCompleteTasks -> 82, shuffleWriteBytes -> 139704075, shuffleReadBytes -> 141148875, executorDeserializeTime -> 1233, executorDeserializeCpuTime -> 735307400, numCompletedIndices -> 82, numActiveTasks -> 0, numTasks -> 209, inputBytes -> 3785496010, memoryBytesSpilled -> 1142943488, inputRecords -> 127623804, jvmGcTime -> 9507, executorCpuTime -> 548303174000, numFailedTasks -> 0, shuffleFetchWaitTime -> 0, shuffleRemoteBlocksFetched -> 0, executorRunTime -> 665203, shuffleLocalBytesR...


Map(resultSerializationTime -> 232, shuffleWriteTime -> 3153692000, shuffleRemoteBytesReadToDisk -> 0, shuffleRemoteBytesRead -> 0, outputBytes -> 0, peakExecutionMemory -> 1059639120, outputRecords -> 0, diskBytesSpilled -> 138071669, shuffleLocalBlocksFetched -> 115, numCompleteTasks -> 82, shuffleWriteBytes -> 139704075, shuffleReadBytes -> 141148875, executorDeserializeTime -> 1233, executorDeserializeCpuTime -> 735307400, numCompletedIndices -> 82, numActiveTasks -> 0, numTasks -> 209, inputBytes -> 3785496010, memoryBytesSpilled -> 1142943488, inputRecords -> 127623804, jvmGcTime -> 9507, executorCpuTime -> 548303174000, numFailedTasks -> 0, shuffleFetchWaitTime -> 0, shuffleRemoteBlocksFetched -> 0, executorRunTime -> 665203, shuffleLocalBytesR...

W poniższych paragrafach wprowadź **rozwiązanie** misji głównej oparte na *DataFrame API*.

Pamiętaj o wydajności Twojego przetwarzania, *DataFrame API* tego wymaga.

Nie wprowadzaj w poniższych paragrafach żadnego kodu, w przypadku wykorzystania misji pobocznych.

In [16]:
import org.apache.spark.sql.functions.{split, explode, count}

// Wczytanie danych
val data = spark.read.text(datasource4Dir)

data = [value: string]


[value: string]

In [17]:
// Dzielenie linii na słowa i eksplozja do osobnych wierszy
val words = data.select(explode(split(data("value"), " ")).alias("word"))

// Zliczanie słów
val wordCounts = words.groupBy("word").agg(count("word").alias("count"))

words = [word: string]
wordCounts = [word: string, count: bigint]


[word: string, count: bigint]

In [18]:
// Zapis wyników do tabeli
wordCounts.write.mode("overwrite").saveAsTable(dfResultTable)

Poniższy paragraf zapisuje metryki po uruchomieniu Twojego rozwiązania *misji głównej*. 

Nie musisz go uruchamiać podczas implementacji rozwiązania.

In [19]:
//NIE ZMIENIAĆ
val afterDfMetrics = getCurrentMetrics(sparkUiAddress)

afterDfMetrics = Map(resultSerializationTime -> 232, shuffleWriteTime -> 3153692000, shuffleRemoteBytesReadToDisk -> 0, shuffleRemoteBytesRead -> 0, outputBytes -> 0, peakExecutionMemory -> 1059639120, outputRecords -> 0, diskBytesSpilled -> 138071669, shuffleLocalBlocksFetched -> 115, numCompleteTasks -> 82, shuffleWriteBytes -> 139704075, shuffleReadBytes -> 141148875, executorDeserializeTime -> 1233, executorDeserializeCpuTime -> 735307400, numCompletedIndices -> 82, numActiveTasks -> 0, numTasks -> 209, inputBytes -> 3785496010, memoryBytesSpilled -> 1142943488, inputRecords -> 127623804, jvmGcTime -> 9507, executorCpuTime -> 548303174000, numFailedTasks -> 0, shuffleFetchWaitTime -> 0, shuffleRemoteBlocksFetched -> 0, executorRunTime -> 665203, shuffleLocalBytesRe...


Map(resultSerializationTime -> 232, shuffleWriteTime -> 3153692000, shuffleRemoteBytesReadToDisk -> 0, shuffleRemoteBytesRead -> 0, outputBytes -> 0, peakExecutionMemory -> 1059639120, outputRecords -> 0, diskBytesSpilled -> 138071669, shuffleLocalBlocksFetched -> 115, numCompleteTasks -> 82, shuffleWriteBytes -> 139704075, shuffleReadBytes -> 141148875, executorDeserializeTime -> 1233, executorDeserializeCpuTime -> 735307400, numCompletedIndices -> 82, numActiveTasks -> 0, numTasks -> 209, inputBytes -> 3785496010, memoryBytesSpilled -> 1142943488, inputRecords -> 127623804, jvmGcTime -> 9507, executorCpuTime -> 548303174000, numFailedTasks -> 0, shuffleFetchWaitTime -> 0, shuffleRemoteBlocksFetched -> 0, executorRunTime -> 665203, shuffleLocalBytesRe...

# Część 3 - Spark SQL (Dataset)

Ta część to wyzwanie. Utrzymanie się na wąskiej grani *Dataset API* to duża umiejętność. Wystarczy jedna nierozważna transformacja *untyped* i spadamy w przepaść typu `DataFrame`. 

Powodzenia!

## Misje poboczne

W ponizszych paragrafach wprowadź swoje rozwiązania *misji pobocznych*, o ile **nie** chcesz, aby oceniana była *misja główna*. W przeciwnym przypadku **KONIECZNIE** pozostaw je **puste**.  

## Misja główna 

Poniższy paragraf zapisuje metryki przed uruchomieniem Twojego rozwiązania *misji głównej*. 

Nie musisz go uruchamiać podczas implementacji rozwiązania.

In [20]:
//NIE ZMIENIAĆ
val beforeDsMetrics = getCurrentMetrics(sparkUiAddress)

beforeDsMetrics = Map(resultSerializationTime -> 232, shuffleWriteTime -> 3153692000, shuffleRemoteBytesReadToDisk -> 0, shuffleRemoteBytesRead -> 0, outputBytes -> 0, peakExecutionMemory -> 1059639120, outputRecords -> 0, diskBytesSpilled -> 138071669, shuffleLocalBlocksFetched -> 115, numCompleteTasks -> 82, shuffleWriteBytes -> 139704075, shuffleReadBytes -> 141148875, executorDeserializeTime -> 1233, executorDeserializeCpuTime -> 735307400, numCompletedIndices -> 82, numActiveTasks -> 0, numTasks -> 209, inputBytes -> 3785496010, memoryBytesSpilled -> 1142943488, inputRecords -> 127623804, jvmGcTime -> 9507, executorCpuTime -> 548303174000, numFailedTasks -> 0, shuffleFetchWaitTime -> 0, shuffleRemoteBlocksFetched -> 0, executorRunTime -> 665203, shuffleLocalBytesR...


Map(resultSerializationTime -> 232, shuffleWriteTime -> 3153692000, shuffleRemoteBytesReadToDisk -> 0, shuffleRemoteBytesRead -> 0, outputBytes -> 0, peakExecutionMemory -> 1059639120, outputRecords -> 0, diskBytesSpilled -> 138071669, shuffleLocalBlocksFetched -> 115, numCompleteTasks -> 82, shuffleWriteBytes -> 139704075, shuffleReadBytes -> 141148875, executorDeserializeTime -> 1233, executorDeserializeCpuTime -> 735307400, numCompletedIndices -> 82, numActiveTasks -> 0, numTasks -> 209, inputBytes -> 3785496010, memoryBytesSpilled -> 1142943488, inputRecords -> 127623804, jvmGcTime -> 9507, executorCpuTime -> 548303174000, numFailedTasks -> 0, shuffleFetchWaitTime -> 0, shuffleRemoteBlocksFetched -> 0, executorRunTime -> 665203, shuffleLocalBytesR...

W poniższych paragrafach wprowadź **rozwiązanie** swojego projektu oparte o *Dataset API*. 

Pamiętaj o wydajności Twojego przetwarzania, *Dataset API* nie jest w stanie wszystkiego "naprawić". 

Nie wprowadzaj w poniższych paragrafach żadnego kodu, w przypadku wykorzystania *misji pobocznych*.

In [21]:
import org.apache.spark.sql.functions.{split, explode, count}
import org.apache.spark.sql.Dataset

// Wczytanie danych
val data: Dataset[String] = spark.read.text(datasource4Dir).as[String]

data = [value: string]


[value: string]

In [22]:
// Dzielenie linii na słowa i eksplozja do osobnych wierszy
val words: Dataset[String] = data.flatMap(line => line.split(" "))

// Zliczanie słów
val wordCounts: Dataset[(String, Long)] = words.groupByKey(identity).agg(count("value").as("count").as[Long])

words = [value: string]
wordCounts = [key: string, count: bigint]


[key: string, count: bigint]

In [23]:
import org.json4s._
import org.json4s.jackson.JsonMethods._

// Ustawienie domyślnego formatu
implicit val formats: Formats = DefaultFormats

// Zebranie danych do lokalnej kolekcji
val collectedData: Array[(String, Long)] = wordCounts.take(100)

// Konwersja Array[(String, Long)] na JArray
val jsonArray: JArray = JArray(collectedData.map { case (key, value) =>
  JObject("key" -> JString(key), "value" -> JLong(value))
}.toList)

// Zapis danych do lokalnego pliku JSON
val writer = new java.io.FileWriter(dsResultFile)
try {
  writer.write(compact(render(jsonArray)))
} finally {
  writer.close()
}

formats = org.json4s.DefaultFormats$@a7fbafd
collectedData = Array((Worcester,9), (Decigu,3), (Expert,161), (Bangla,76), (Morel?http://Ivanmorel.barber.cutz.com?sr.morel@gmail.com?8589935456,1), (CV?~?mabecorpti@gmail.com?8589935486,1), (IT?https://sisrute.kemkes.go.id?poentoro@yahoo.com?8589935850,1), (Tutting,,1), (PT?~?developer.compro@gmail.com?8589935985,1), (Boba?~?abeeryas20@gmail.com?8589936140,1), (AKD,7), (GROUP?https://varniapp.com?varniitsolutions@gmail.com?8589936309,1), (Inc.?http://www.plateau.tv?support@plateautel.com?8589936421,1), (CYBntity,1), (CARS,13), (MM?~?android@aerospool.sk?8589936796,1), (ShirdiSaiChants,1), (online,317), (Tydecon,4), (Apps?~?banglaappreview2...


Array((Worcester,9), (Decigu,3), (Expert,161), (Bangla,76), (Morel?http://Ivanmorel.barber.cutz.com?sr.morel@gmail.com?8589935456,1), (CV?~?mabecorpti@gmail.com?8589935486,1), (IT?https://sisrute.kemkes.go.id?poentoro@yahoo.com?8589935850,1), (Tutting,,1), (PT?~?developer.compro@gmail.com?8589935985,1), (Boba?~?abeeryas20@gmail.com?8589936140,1), (AKD,7), (GROUP?https://varniapp.com?varniitsolutions@gmail.com?8589936309,1), (Inc.?http://www.plateau.tv?support@plateautel.com?8589936421,1), (CYBntity,1), (CARS,13), (MM?~?android@aerospool.sk?8589936796,1), (ShirdiSaiChants,1), (online,317), (Tydecon,4), (Apps?~?banglaappreview2...

Poniższy paragraf zapisuje metryki po uruchomieniu Twojego rozwiązania *misji głównej*. 

Nie musisz go uruchamiać podczas implementacji rozwiązania.

In [24]:
//NIE ZMIENIAĆ
val afterDsMetrics = getCurrentMetrics(sparkUiAddress)

afterDsMetrics = Map(resultSerializationTime -> 232, shuffleWriteTime -> 3153692000, shuffleRemoteBytesReadToDisk -> 0, shuffleRemoteBytesRead -> 0, outputBytes -> 0, peakExecutionMemory -> 1059639120, outputRecords -> 0, diskBytesSpilled -> 138071669, shuffleLocalBlocksFetched -> 115, numCompleteTasks -> 82, shuffleWriteBytes -> 139704075, shuffleReadBytes -> 141148875, executorDeserializeTime -> 1233, executorDeserializeCpuTime -> 735307400, numCompletedIndices -> 82, numActiveTasks -> 0, numTasks -> 209, inputBytes -> 3785496010, memoryBytesSpilled -> 1142943488, inputRecords -> 127623804, jvmGcTime -> 9507, executorCpuTime -> 548303174000, numFailedTasks -> 0, shuffleFetchWaitTime -> 0, shuffleRemoteBlocksFetched -> 0, executorRunTime -> 665203, shuffleLocalBytesRe...


Map(resultSerializationTime -> 232, shuffleWriteTime -> 3153692000, shuffleRemoteBytesReadToDisk -> 0, shuffleRemoteBytesRead -> 0, outputBytes -> 0, peakExecutionMemory -> 1059639120, outputRecords -> 0, diskBytesSpilled -> 138071669, shuffleLocalBlocksFetched -> 115, numCompleteTasks -> 82, shuffleWriteBytes -> 139704075, shuffleReadBytes -> 141148875, executorDeserializeTime -> 1233, executorDeserializeCpuTime -> 735307400, numCompletedIndices -> 82, numActiveTasks -> 0, numTasks -> 209, inputBytes -> 3785496010, memoryBytesSpilled -> 1142943488, inputRecords -> 127623804, jvmGcTime -> 9507, executorCpuTime -> 548303174000, numFailedTasks -> 0, shuffleFetchWaitTime -> 0, shuffleRemoteBlocksFetched -> 0, executorRunTime -> 665203, shuffleLocalBytesRe...

# Analiza wyników i wydajności *misji głównych*

## Część 1 - Spark Core (RDD)

In [25]:
// Wczytanie wyników z pliku object
val wordCounts = sc.objectFile[(String, Long)](rddResultDir)

// Wyświetlenie 50 pierwszych elementów
wordCounts.take(50).foreach(println)

(G4Ehttp://games4escape.com/g4egamesapp@gmail.com17179951671,1)
(eCart,1)
(ABhttp://www.newsec.sesodergren.d@gmail.com8590192481,1)
(Radio~stmatthewapp@vimg.org8590080357,1)
(CISPE,1)
(Gamesofa,4)
(-TSAS,2)
(Nagpurhttp://igreenvalley.com/info@igreenvalley.com8590000379,1)
(Unionhttp://www.westconsincu.orgonlineservices@westconsincu.org25769865679,1)
(LLChttps://marvellousllc.appspot.commarvellous.android@gmail.com278414,1)
(Galaxy~gamesgalaxy344@gmail.com25770099512,1)
(ISWDhttps://social.marinenotes.com/profileiswd2020@gmail.com17180133114,1)
(Blakeyhttps://www.wolfchat.app/supportthomas@blakey.co.uk17180135559,1)
(miah~mcappslabbd@gmail.com17179993941,1)
(SlaMarthttp://slamart.infoslamartapps@gmail.com25769842745,1)
(Guptahttps://kuntal.mekuntal1230@gmail.com17350,1)
(Solutionshttps://athleticsolutions.net/CollegiateSportQualifier/mlucas@athleticsolutions.net165810,1)
(LLChttps://excellentflorists.com/vanessa@excellentfarms.com17180115044,1)


wordCounts = MapPartitionsRDD[25] at objectFile at <console>:44


MapPartitionsRDD[25] at objectFile at <console>:44

In [26]:
{ val jsonResult = compact(render(Extraction.decompose(subtractMetrics(afterRddMetrics, beforeRddMetrics))))
  val formattedResult = jsonResult.replaceAll(",", ",\n")
  println(formattedResult)
}

{"resultSerializationTime":0,
"shuffleWriteTime":0,
"shuffleRemoteBytesReadToDisk":0,
"shuffleRemoteBytesRead":0,
"outputBytes":0,
"peakExecutionMemory":0,
"outputRecords":0,
"diskBytesSpilled":0,
"shuffleLocalBlocksFetched":0,
"numCompleteTasks":0,
"shuffleWriteBytes":0,
"shuffleReadBytes":0,
"executorDeserializeTime":0,
"executorDeserializeCpuTime":0,
"numCompletedIndices":0,
"numActiveTasks":0,
"numTasks":0,
"inputBytes":0,
"memoryBytesSpilled":0,
"inputRecords":0,
"jvmGcTime":0,
"executorCpuTime":0,
"numFailedTasks":0,
"shuffleFetchWaitTime":0,
"shuffleRemoteBlocksFetched":0,
"executorRunTime":0,
"shuffleLocalBytesRead":0,
"numKilledTasks":0,
"shuffleWriteRecords":0,
"resultSize":0,
"shuffleReadRecords":0}


## Część 2 - Spark SQL (DataFrame)

In [27]:
val df = spark.table(dfResultTable)

// Wyświetlenie 50 pierwszych rekordów
df.show(50)

+-------------------------+-----+
|                     word|count|
+-------------------------+-----+
|                      The| 9372|
|                   Bidhee|    7|
|                Solutions| 6041|
|                   ArtAce|    2|
|                  PuyTech|    1|
|                   McLeod|  208|
|                      RTV|   13|
|     Softwarehttp://p...|    1|
|紫荊雜誌社https://bau...|    1|
|                  Bacilio|    2|
|     Developerhttps:/...|    1|
|     Softwarehttp://w...|    1|
|                  Backend|   13|
|하이퍼펌프~hyper.cho...|    1|
|                    METRO|   21|
|     ADBANDhttp://www...|    1|
|                      Tcf|    1|
|                      Pug|   12|
|              Techologies|    4|
|     Tourismhttps://t...|    1|
|     Kinsale~gourmet...|    1|
|     Englishhttps://w...|    1|
|                    Darul|   10|
|                       📱|    3|
|                  Panipat|    2|
|     Konyukhovhttp://...|    1|
|                     Bol

df = [word: string, count: bigint]


[word: string, count: bigint]

In [28]:
{ val jsonResult = compact(render(Extraction.decompose(subtractMetrics(afterDfMetrics, beforeDfMetrics))))
  val formattedResult = jsonResult.replaceAll(",", ",\n")
  println(formattedResult)
}

{"resultSerializationTime":0,
"shuffleWriteTime":0,
"shuffleRemoteBytesReadToDisk":0,
"shuffleRemoteBytesRead":0,
"outputBytes":0,
"peakExecutionMemory":0,
"outputRecords":0,
"diskBytesSpilled":0,
"shuffleLocalBlocksFetched":0,
"numCompleteTasks":0,
"shuffleWriteBytes":0,
"shuffleReadBytes":0,
"executorDeserializeTime":0,
"executorDeserializeCpuTime":0,
"numCompletedIndices":0,
"numActiveTasks":0,
"numTasks":0,
"inputBytes":0,
"memoryBytesSpilled":0,
"inputRecords":0,
"jvmGcTime":0,
"executorCpuTime":0,
"numFailedTasks":0,
"shuffleFetchWaitTime":0,
"shuffleRemoteBlocksFetched":0,
"executorRunTime":0,
"shuffleLocalBytesRead":0,
"numKilledTasks":0,
"shuffleWriteRecords":0,
"resultSize":0,
"shuffleReadRecords":0}


## Część 3 - Spark SQL (Dataset)

In [29]:
import scala.io.Source
import org.json4s._
import org.json4s.jackson.JsonMethods._

// Wczytanie zawartości pliku JSON
val jsonContent: JValue = parse(scala.io.Source.fromFile(dsResultFile).mkString)

// Ograniczenie do pierwszych 50 elementów tablicy
val limitedContent: JValue = jsonContent match {
  case JArray(elements) => JArray(elements.take(50))
  case other => other
}

// Wyświetlenie ograniczonej zawartości z nowymi liniami
println(pretty(render(limitedContent)))

[ {
  "key" : "Worcester",
  "value" : 9
}, {
  "key" : "Decigu",
  "value" : 3
}, {
  "key" : "Expert",
  "value" : 161
}, {
  "key" : "Bangla",
  "value" : 76
}, {
  "key" : "Morel\u0001http://Ivanmorel.barber.cutz.com\u0001sr.morel@gmail.com\u00018589935456",
  "value" : 1
}, {
  "key" : "CV\u0001~\u0001mabecorpti@gmail.com\u00018589935486",
  "value" : 1
}, {
  "key" : "IT\u0001https://sisrute.kemkes.go.id\u0001poentoro@yahoo.com\u00018589935850",
  "value" : 1
}, {
  "key" : "Tutting,",
  "value" : 1
}, {
  "key" : "PT\u0001~\u0001developer.compro@gmail.com\u00018589935985",
  "value" : 1
}, {
  "key" : "Boba\u0001~\u0001abeeryas20@gmail.com\u00018589936140",
  "value" : 1
}, {
  "key" : "AKD",
  "value" : 7
}, {
  "key" : "GROUP\u0001https://varniapp.com\u0001varniitsolutions@gmail.com\u00018589936309",
  "value" : 1
}, {
  "key" : "Inc.\u0001http://www.plateau.tv\u0001support@plateautel.com\u00018589936421",
  "value" : 1
}, {
  "key" : "CYBntity",
  "value" : 1
}, {
  "key" : "

jsonContent = JArray(List(JObject(List((key,JString(Worcester)), (value,JInt(9)))), JObject(List((key,JString(Decigu)), (value,JInt(3)))), JObject(List((key,JString(Expert)), (value,JInt(161)))), JObject(List((key,JString(Bangla)), (value,JInt(76)))), JObject(List((key,JString(Morel?http://Ivanmorel.barber.cutz.com?sr.morel@gmail.com?8589935456)), (value,JInt(1)))), JObject(List((key,JString(CV?~?mabecorpti@gmail.com?8589935486)), (value,JInt(1)))), JObject(List((key,JString(IT?https://sisrute.kemkes.go.id?poentoro@yahoo.com?8589935850)), (value,JInt(1)))), JObject(List((key,JString(Tutting,)), (value,JInt(1)))), JObject(List((key,JString(PT?~?developer.compro@gmail.com?8589935985)), (v...


JArray(List(JObject(List((key,JString(Worcester)), (value,JInt(9)))), JObject(List((key,JString(Decigu)), (value,JInt(3)))), JObject(List((key,JString(Expert)), (value,JInt(161)))), JObject(List((key,JString(Bangla)), (value,JInt(76)))), JObject(List((key,JString(Morel?http://Ivanmorel.barber.cutz.com?sr.morel@gmail.com?8589935456)), (value,JInt(1)))), JObject(List((key,JString(CV?~?mabecorpti@gmail.com?8589935486)), (value,JInt(1)))), JObject(List((key,JString(IT?https://sisrute.kemkes.go.id?poentoro@yahoo.com?8589935850)), (value,JInt(1)))), JObject(List((key,JString(Tutting,)), (value,JInt(1)))), JObject(List((key,JString(PT?~?developer.compro@gmail.com?8589935985)), (v...

In [30]:
{ val jsonResult = compact(render(Extraction.decompose(subtractMetrics(afterDsMetrics, beforeDsMetrics))))
  val formattedResult = jsonResult.replaceAll(",", ",\n")
  println(formattedResult)
}

{"resultSerializationTime":0,
"shuffleWriteTime":0,
"shuffleRemoteBytesReadToDisk":0,
"shuffleRemoteBytesRead":0,
"outputBytes":0,
"peakExecutionMemory":0,
"outputRecords":0,
"diskBytesSpilled":0,
"shuffleLocalBlocksFetched":0,
"numCompleteTasks":0,
"shuffleWriteBytes":0,
"shuffleReadBytes":0,
"executorDeserializeTime":0,
"executorDeserializeCpuTime":0,
"numCompletedIndices":0,
"numActiveTasks":0,
"numTasks":0,
"inputBytes":0,
"memoryBytesSpilled":0,
"inputRecords":0,
"jvmGcTime":0,
"executorCpuTime":0,
"numFailedTasks":0,
"shuffleFetchWaitTime":0,
"shuffleRemoteBlocksFetched":0,
"executorRunTime":0,
"shuffleLocalBytesRead":0,
"numKilledTasks":0,
"shuffleWriteRecords":0,
"resultSize":0,
"shuffleReadRecords":0}
