# Projekt Apache Spark

# Wprowadzenie

Wykorzystując ten notatnik jako szablon zrealizuj projekt Apache Spark zgodnie z przydzielonym zestawem. 

Kilka uwag:

* Nie modyfikuj ani nie usuwaj paragrafów *markdown* w tym notatniku, chyba że wynika to jednoznacznie z instrukcji. 
* Istniejące paragrafy zawierające *kod* uzupełnij w razie potrzeby zgodnie z instrukcjami
    - nie usuwaj ich
    - nie usuwaj zawartych w nich instrukcji oraz kodu
    - nie modyfikuj ich, jeśli instrukcje jawnie tego nie nakazują
* Możesz dodawać nowe paragrafy zarówno zawierające kod jak i komentarze dotyczące tego kodu (markdown)

# Treść projektu

Poniżej w paragrafie markdown wstaw tytuł przydzielonego zestawu

# Zestaw 0 – wzorzec

**Uwaga**

- W ramach wzorca nie są spełnione żadne reguły projektu. 
- Brak konsekwencji w wykorzystaniu właściwego API w ramach poszczególnych części
- Zadanie *misji głównej* polega na zliczeniu słówek.  

# Działania wstępne 

Uruchom poniższy paragraf, aby utworzyć obiekty kontekstu Sparka. Jeśli jest taka potrzeba dostosuj te polecenia. Pamiętaj o potrzebnych bibliotekach.

In [1]:
from pyspark.sql import SparkSession

# Spark session & context
spark = SparkSession.builder.getOrCreate()

sc = spark.sparkContext

W poniższym paragrafie uzupełnij polecenia definiujące poszczególne zmienne. 

Pamiętaj abyś:

* w późniejszym kodzie, dla wszystkich cześci projektu, korzystał z tych zdefiniowanych zmiennych. Wykorzystuj je analogicznie jak parametry
* przed ostateczną rejestracją projektu usunął ich wartości, tak aby nie pozostawiać w notatniku niczego co mogłoby identyfikować Ciebie jako jego autora

In [2]:
# pełna ścieżka do katalogu w zasobniku zawierającego podkatalogi `datasource1` i `datasource4` 
# z danymi źródłowymi
input_dir = "/home/jovyan/data/zestaw10"

Nie modyfikuj poniższych paragrafów. Wykonaj je i używaj zdefniowanych poniżej zmiennych jak parametrów Twojego programu.

In [3]:
# NIE ZMIENIAĆ
# ścieżki dla danych źródłowych 
datasource1_dir = input_dir + "/datasource1"
datasource4_dir = input_dir + "/datasource4"

# nazwy i ścieżki dla wyników dla misji głównej 
# część 1 (Spark Core - RDD) 
rdd_result_dir = "/tmp/output1"

# część 2 (Spark SQL - DataFrame)
df_result_table = "output2"

# część 3 (Pandas API on Spark)
ps_result_file = "/tmp/output3.json"

In [4]:
# NIE ZMIENIAĆ
import os
def remove_file(file):
    if os.path.exists(file):
        os.remove(file)

remove_file("metric_functions.py")
remove_file("tools_functions.py")

In [5]:
# NIE ZMIENIAĆ
import requests
r = requests.get("https://jankiewicz.pl/bigdata/metric_functions.py", allow_redirects=True)
open('metric_functions.py', 'wb').write(r.content)
r = requests.get("https://jankiewicz.pl/bigdata/tools_functions.py", allow_redirects=True)
open('tools_functions.py', 'wb').write(r.content)

3322

In [6]:
# NIE ZMIENIAĆ
%run metric_functions.py
%run tools_functions.py

Poniższe paragrafy mają na celu usunąć ewentualne pozostałości poprzednich uruchomień tego lub innych notatników

In [7]:
# NIE ZMIENIAĆ
# usunięcie miejsca docelowego dla część 1 (Spark Core - RDD) 
delete_dir(spark, rdd_result_dir)

Successfully deleted directory: /tmp/output1


In [8]:
# NIE ZMIENIAĆ
# usunięcie miejsca docelowego dla część 2 (Spark SQL - DataFrame) 
drop_table(spark, df_result_table)

The table output2 does not exist.
Successfully deleted directory: file:/home/jovyan/work/project2/spark-warehouse/output2


In [9]:
# NIE ZMIENIAĆ
# usunięcie miejsca docelowego dla część 3 (Pandas API on Spark) 
remove_file(ps_result_file)

In [10]:
# NIE ZMIENIAĆ
spark

***Uwaga!***

Uruchom poniższy paragraf i sprawdź czy adres, pod którym dostępny *Apache Spark Application UI* jest poprawny wywołując następny testowy paragraf. 

W razie potrzeby określ samodzielnie poprawny adres, pod którym dostępny *Apache Spark Application UI*

In [11]:
# adres URL, pod którym dostępny Apache Spark Application UI (REST API)
# 
spark_ui_address = extract_host_and_port(spark, "http://localhost:4041")
spark_ui_address

'http://localhost:4041'

In [12]:
# testowy paragraf
test_metrics = get_current_metrics(spark_ui_address)
test_metrics

{'numTasks': 0,
 'numActiveTasks': 0,
 'numCompleteTasks': 0,
 'numFailedTasks': 0,
 'numKilledTasks': 0,
 'numCompletedIndices': 0,
 'executorDeserializeTime': 0,
 'executorDeserializeCpuTime': 0,
 'executorRunTime': 0,
 'executorCpuTime': 0,
 'resultSize': 0,
 'jvmGcTime': 0,
 'resultSerializationTime': 0,
 'memoryBytesSpilled': 0,
 'diskBytesSpilled': 0,
 'peakExecutionMemory': 0,
 'inputBytes': 0,
 'inputRecords': 0,
 'outputBytes': 0,
 'outputRecords': 0,
 'shuffleRemoteBlocksFetched': 0,
 'shuffleLocalBlocksFetched': 0,
 'shuffleFetchWaitTime': 0,
 'shuffleRemoteBytesRead': 0,
 'shuffleRemoteBytesReadToDisk': 0,
 'shuffleLocalBytesRead': 0,
 'shuffleReadBytes': 0,
 'shuffleReadRecords': 0,
 'shuffleWriteBytes': 0,
 'shuffleWriteTime': 0,
 'shuffleWriteRecords': 0}

# Część 1 - Spark Core (RDD)

## Misje poboczne

W ponizszych paragrafach wprowadź swoje rozwiązania *misji pobocznych*, o ile **nie** chcesz, aby oceniana była *misja główna*. W przeciwnym przypadku **KONIECZNIE** pozostaw je **puste**.  

## Misja główna 

Poniższy paragraf zapisuje metryki przed uruchomieniem Twojego rozwiązania *misji głównej*. 

Nie musisz go uruchamiać podczas implementacji rozwiązania.

In [13]:
# NIE ZMIENIAĆ
before_rdd_metrics = get_current_metrics(spark_ui_address)

W poniższych paragrafach wprowadź **rozwiązanie** *misji głównej* oparte na *RDD API*. 

Pamiętaj o wydajności Twojego przetwarzania, *RDD API* tego wymaga. 

Nie wprowadzaj w poniższych paragrafach żadnego kodu, w przypadku wykorzystania *misji pobocznych*.

In [14]:
# Wczytanie plików tekstowych
text_files = sc.textFile(datasource4_dir)

In [15]:
# Podział linii na słowa i zliczanie ilości wystąpień każdego słowa
word_counts = text_files.flatMap(lambda line: line.split(" ")) \
                        .map(lambda word: (word, 1)) \
                        .reduceByKey(lambda x, y: x + y)

In [16]:
# Zapis wyniku do pliku pickle
word_counts.saveAsPickleFile(rdd_result_dir)

Poniższy paragraf zapisuje metryki po uruchomieniu Twojego rozwiązania *misji głównej*. 

Nie musisz go uruchamiać podczas implementacji rozwiązania.

In [17]:
# NIE ZMIENIAĆ
after_rdd_metrics = get_current_metrics(spark_ui_address)

# Część 2 - Spark SQL (DataFrame)

## Misje poboczne

W ponizszych paragrafach wprowadź swoje rozwiązania *misji pobocznych*, o ile **nie** chcesz, aby oceniana była *misja główna*. W przeciwnym przypadku **KONIECZNIE** pozostaw je **puste**.  

## Misja główna 

Poniższy paragraf zapisuje metryki przed uruchomieniem Twojego rozwiązania *misji głównej*. 

Nie musisz go uruchamiać podczas implementacji rozwiązania.

In [18]:
# NIE ZMIENIAĆ
before_df_metrics = get_current_metrics(spark_ui_address)

W poniższych paragrafach wprowadź **rozwiązanie** *misji głównej* swojego projektu oparte o *DataFrame API*. 

Pamiętaj o wydajności Twojego przetwarzania, *DataFrame API* nie jest w stanie wszystkiego "naprawić". 

Nie wprowadzaj w poniższych paragrafach żadnego kodu, w przypadku wykorzystania *misji pobocznych*.

In [19]:
from pyspark.sql.functions import split, explode, count
# Wczytanie danych
data = spark.read.text(datasource4_dir)

In [20]:
# Dzielenie linii na słowa i eksplozja do osobnych wierszy
words = data.select(explode(split(data.value, " ")).alias("word"))

# Zliczanie słów
word_counts = words.groupBy("word").agg(count("word").alias("count"))

In [21]:
# Zapis wyników do tabeli 
word_counts.write.mode("overwrite").saveAsTable(df_result_table)

Poniższy paragraf zapisuje metryki po uruchomieniu Twojego rozwiązania *misji głównej*. 

Nie musisz go uruchamiać podczas implementacji rozwiązania.

In [22]:
# NIE ZMIENIAĆ
after_df_metrics = get_current_metrics(spark_ui_address)

# Część 3 - Pandas API on Spark

Ta część to wyzwanie. W szczególności dla osób, które nie programują na co dzień w Pythonie, lub które nie nie korzystały do tej pory z Pandas API.  

Powodzenia!

## Misje poboczne

W ponizszych paragrafach wprowadź swoje rozwiązania *misji pobocznych*, o ile **nie** chcesz, aby oceniana była *misja główna*. W przeciwnym przypadku **KONIECZNIE** pozostaw je **puste**.  

## Misja główna 

Poniższy paragraf zapisuje metryki przed uruchomieniem Twojego rozwiązania *misji głównej*. 

Nie musisz go uruchamiać podczas implementacji rozwiązania.

In [23]:
#NIE ZMIENIAĆ
before_ps_metrics = get_current_metrics(spark_ui_address)

W poniższych paragrafach wprowadź **rozwiązanie** swojego projektu oparte o *Pandas API on Spark*. 

Pamiętaj o wydajności Twojego przetwarzania, *Pandas API on Spark* nie jest w stanie wszystkiego "naprawić". 

Nie wprowadzaj w poniższych paragrafach żadnego kodu, w przypadku wykorzystania *misji pobocznych*.

In [24]:
import pyspark.pandas as ps

lines_ps = ps.read_csv(datasource4_dir, header=None)



In [25]:
words_ps = lines_ps[0].apply(lambda x: x.split(' ') if x is not None else []).explode().reset_index(drop=True)

In [26]:
word_counts = words_ps.value_counts()



In [27]:
word_counts_pandas = word_counts.head(50).to_pandas()



In [28]:
word_counts_pandas.to_json(ps_result_file, orient='index')

Poniższy paragraf zapisuje metryki po uruchomieniu Twojego rozwiązania *misji głównej*. 

Nie musisz go uruchamiać podczas implementacji rozwiązania.

In [30]:
#NIE ZMIENIAĆ
after_ps_metrics = get_current_metrics(spark_ui_address)

# Analiza wyników i wydajności *misji głównych*

## Część 1 - Spark Core (RDD)

In [31]:
# Wczytanie wyników z pliku pickle
word_counts = sc.pickleFile(rdd_result_dir)

# Wyświetlenie 50 pierwszych elementów
result_sample = word_counts.take(50)
for item in result_sample:
    print(item)

('TeriyakiApps\x01~\x01teriyakiapps@gmail.com\x018589934594', 1)
('Koza\x01http://www.xynapse.pl\x01xynapse@xynapse.pl\x018589934595', 1)
('Tools\x01https://vcb30cb43.app-ads-txt.com/app-ads.txt\x01androtools222@gmail.com\x018589934596', 1)
('Muslim', 110)
('FireFlies', 2)
('Studio\x01~\x01manuariza95@gmail.com\x018589934602', 1)
('News', 494)
('IST-Development\x01https://istanbulit.com\x01info@istanbulit.com\x018589934604', 1)
('FAStuidoTI\x01~\x01karimkhalfy@gmail.com\x018589934605', 1)
('Web4Minds,', 1)
('V3', 8)
('Smart', 2437)
('Ltd\x01http://www.v3smarttech.com\x01support@v3smarttech.com\x018589934607', 1)
('Mobil', 143)
('UNDERSCORE:', 1)
('Apps', 6350)
('and', 5289)
('Games\x01~\x01ergamesapps@gmail.com\x018589934609', 1)
('tamapps\x01~\x01zakdermeister@gmail.com\x018589934614', 1)
('S.', 397)
('Connect', 331)
('Team\x01https://mewe.com/join/klwpdevelopersteam\x01designcorpviti@gmail.com\x018589934618', 1)
('for', 2565)
('with', 262)
('NETWORKS', 23)
('PTE', 226)
('Art\x01https

In [32]:
subtract_metrics(after_rdd_metrics, before_rdd_metrics)

{'numTasks': 6,
 'numActiveTasks': 0,
 'numCompleteTasks': 6,
 'numFailedTasks': 0,
 'numKilledTasks': 0,
 'numCompletedIndices': 6,
 'executorDeserializeTime': 763,
 'executorDeserializeCpuTime': 288417800,
 'executorRunTime': 52789,
 'executorCpuTime': 3791290300,
 'resultSize': 12143,
 'jvmGcTime': 1808,
 'resultSerializationTime': 19,
 'memoryBytesSpilled': 0,
 'diskBytesSpilled': 0,
 'peakExecutionMemory': 0,
 'inputBytes': 84276905,
 'inputRecords': 1179547,
 'outputBytes': 90624535,
 'outputRecords': 14566,
 'shuffleRemoteBlocksFetched': 0,
 'shuffleLocalBlocksFetched': 9,
 'shuffleFetchWaitTime': 0,
 'shuffleRemoteBytesRead': 0,
 'shuffleRemoteBytesReadToDisk': 0,
 'shuffleLocalBytesRead': 49730906,
 'shuffleReadBytes': 49730906,
 'shuffleReadRecords': 228,
 'shuffleWriteBytes': 49730906,
 'shuffleWriteTime': 405851800,
 'shuffleWriteRecords': 228}

## Część 2 - Spark SQL (DataFrame)

In [33]:
df = spark.table(df_result_table)

# Wyświetlenie 50 pierwszych rekordów
df.show(50)

+-------------------------+-----+
|                     word|count|
+-------------------------+-----+
|                      The| 9372|
|                   Bidhee|    7|
|                Solutions| 6041|
|                   ArtAce|    2|
|                  PuyTech|    1|
|                   McLeod|  208|
|                      RTV|   13|
|     Softwarehttp://p...|    1|
|紫荊雜誌社https://bau...|    1|
|                  Bacilio|    2|
|     Developerhttps:/...|    1|
|     Softwarehttp://w...|    1|
|                  Backend|   13|
|하이퍼펌프~hyper.cho...|    1|
|                    METRO|   21|
|     ADBANDhttp://www...|    1|
|                      Tcf|    1|
|                      Pug|   12|
|              Techologies|    4|
|     Tourismhttps://t...|    1|
|     Kinsale~gourmet...|    1|
|     Englishhttps://w...|    1|
|                    Darul|   10|
|                       📱|    3|
|                  Panipat|    2|
|     Konyukhovhttp://...|    1|
|                     Bol

In [34]:
subtract_metrics(after_df_metrics, before_df_metrics)

{'numTasks': 12,
 'numActiveTasks': 0,
 'numCompleteTasks': 8,
 'numFailedTasks': 0,
 'numKilledTasks': 0,
 'numCompletedIndices': 8,
 'executorDeserializeTime': 1254,
 'executorDeserializeCpuTime': 446474600,
 'executorRunTime': 54900,
 'executorCpuTime': 22626614900,
 'resultSize': 36185,
 'jvmGcTime': 2428,
 'resultSerializationTime': 110,
 'memoryBytesSpilled': 0,
 'diskBytesSpilled': 0,
 'peakExecutionMemory': 440400752,
 'inputBytes': 84344235,
 'inputRecords': 1179547,
 'outputBytes': 50321941,
 'outputRecords': 1456441,
 'shuffleRemoteBlocksFetched': 0,
 'shuffleLocalBlocksFetched': 16,
 'shuffleFetchWaitTime': 0,
 'shuffleRemoteBytesRead': 0,
 'shuffleRemoteBytesReadToDisk': 0,
 'shuffleLocalBytesRead': 63406957,
 'shuffleReadBytes': 63406957,
 'shuffleReadRecords': 1622698,
 'shuffleWriteBytes': 63406957,
 'shuffleWriteTime': 817119700,
 'shuffleWriteRecords': 1622698}

## Część 3 - Pandas API on Spark

In [35]:
import json

# Odczytaj zawartość pliku JSON
with open(ps_result_file, 'r') as file:
    json_content = json.load(file)

# Wyświetl zawartość
print(json.dumps(json_content, indent=2))

{
  "-": 12048,
  "&": 11714,
  "The": 9329,
  "App": 8068,
  "of": 7933,
  "de": 7396,
  "Technologies": 7056,
  "Solutions": 6880,
  "Software": 6813,
  "Co.": 6800,
  "Apps": 6579,
  "Media": 6462,
  "Digital": 5836,
  "Pvt": 5602,
  "Pvt.": 5290,
  "Games": 5044,
  "Technology": 5009,
  "and": 4989,
  "Mobile": 4754,
  "Private": 4196,
  "": 3879,
  "Development": 3868,
  "Group": 3761,
  "IT": 3665,
  "Services": 3527,
  "Tech": 3445,
  "Game": 3333,
  "Bank": 3127,
  "by": 2831,
  "Systems": 2778,
  "International": 2580,
  "Global": 2579,
  "Web": 2563,
  "for": 2536,
  "BH": 2507,
  "Appswiz": 2452,
  "Smart": 2436,
  "Studio": 2330,
  "Credit": 2215,
  "Pty": 2202,
  "Free": 2186,
  "Business": 1840,
  "Radio": 1810,
  "New": 1778,
  "Health": 1710,
  "Company": 1710,
  "Online": 1629,
  "My": 1539,
  "Church": 1516,
  "Creative": 1458
}


In [36]:
subtract_metrics(after_ps_metrics, before_ps_metrics)

{'numTasks': 33,
 'numActiveTasks': 0,
 'numCompleteTasks': 25,
 'numFailedTasks': 0,
 'numKilledTasks': 0,
 'numCompletedIndices': 25,
 'executorDeserializeTime': 1838,
 'executorDeserializeCpuTime': 440241100,
 'executorRunTime': 166601,
 'executorCpuTime': 55323279000,
 'resultSize': 134363,
 'jvmGcTime': 4753,
 'resultSerializationTime': 123,
 'memoryBytesSpilled': 0,
 'diskBytesSpilled': 0,
 'peakExecutionMemory': 427817888,
 'inputBytes': 385819487,
 'inputRecords': 5409845,
 'outputBytes': 0,
 'outputRecords': 0,
 'shuffleRemoteBlocksFetched': 0,
 'shuffleLocalBlocksFetched': 20,
 'shuffleFetchWaitTime': 0,
 'shuffleRemoteBytesRead': 0,
 'shuffleRemoteBytesReadToDisk': 0,
 'shuffleLocalBytesRead': 61239298,
 'shuffleReadBytes': 61239298,
 'shuffleReadRecords': 1573467,
 'shuffleWriteBytes': 61239298,
 'shuffleWriteTime': 1111152100,
 'shuffleWriteRecords': 1573467}