# Sprawdzenie kernela 

Na początek sprawdź czy silnik wykonawczy Twojego notatnika to PySpark. 
Mógłby on być po prostu interpreterem Pythona, jednak wówczas zmienne kontekstu musielibyśmy tworzyć samodzielnie.

Sprawdź, czy obiekt kontekstu jest dostępny. W przypadku *Spark SQL* jest to `SparkSession`

In [8]:
spark

Dzięki powyższej informacji dowiedzieliśmy się nie tylko w jakim trybie został uruchomiony Spark obsługujący nasze polecenia, w jakiej jest wersji, ale także czy obsługuje funkcjonalność platformy Hive.

Dowiedz się także pod jakim użytkownikiem działamy w ramach tego notatnika.

In [9]:
%%sh 
whoami

root


Czas na nasze właściwe zadania. 

W razie potrzeby korzystaj z https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/index.html

# 20 Years of Games

**7. Zaczytaj do zmiennej gameInfosDF zawartość pliku ign.csv**

In [10]:
username = "helmas_bigdata" # UWAGA! ustaw zmienną username na poprawną wartość

gameInfosDF=spark.read.\
    option("inferSchema", "true").\
    csv(f"/user/{username}/ign.csv", header=True).cache()

                                                                                

**8. Wyświetl schemat zmiennej gameInfosDF**

In [11]:
gameInfosDF.printSchema()

root
 |-- _c0: integer (nullable = true)
 |-- score_phrase: string (nullable = true)
 |-- title: string (nullable = true)
 |-- url: string (nullable = true)
 |-- platform: string (nullable = true)
 |-- score: double (nullable = true)
 |-- genre: string (nullable = true)
 |-- editors_choice: string (nullable = true)
 |-- release_year: integer (nullable = true)
 |-- release_month: integer (nullable = true)
 |-- release_day: integer (nullable = true)



Możesz także po prostu przyglądnąć się jej kolumnom

In [12]:
gameInfosDF.columns

['_c0',
 'score_phrase',
 'title',
 'url',
 'platform',
 'score',
 'genre',
 'editors_choice',
 'release_year',
 'release_month',
 'release_day']

Zobaczmy też trzy pierwsze wiersze. Zróbmy to na kilka sposobów. 

* Na początek metoda `show()`

In [13]:
gameInfosDF.limit(3).show()

[Stage 2:>                                                          (0 + 1) / 1]

+---+------------+--------------------+--------------------+----------------+-----+----------+--------------+------------+-------------+-----------+
|_c0|score_phrase|               title|                 url|        platform|score|     genre|editors_choice|release_year|release_month|release_day|
+---+------------+--------------------+--------------------+----------------+-----+----------+--------------+------------+-------------+-----------+
|  0|     Amazing|LittleBigPlanet P...|/games/littlebigp...|PlayStation Vita|  9.0|Platformer|             Y|        2012|            9|         12|
|  1|     Amazing|LittleBigPlanet P...|/games/littlebigp...|PlayStation Vita|  9.0|Platformer|             Y|        2012|            9|         12|
|  2|       Great|Splice: Tree of Life|/games/splice/ipa...|            iPad|  8.5|    Puzzle|             N|        2012|            9|         12|
+---+------------+--------------------+--------------------+----------------+-----+----------+------------

                                                                                

Przetwarzane dane mogą być duże. Wyniki natomiast z reguły są znacznie mniejsze, to pozwala nam je (o ile znamy ich wielkość) przekonwertować do obiektów `pandas DataFrame` i dzięki temu przedstawić w przyjaźniejszej postaci.
* metoda `toPandas()`

In [14]:
gameInfosDF.limit(3).toPandas()

Unnamed: 0,_c0,score_phrase,title,url,platform,score,genre,editors_choice,release_year,release_month,release_day
0,0,Amazing,LittleBigPlanet PS Vita,/games/littlebigplanet-vita/vita-98907,PlayStation Vita,9.0,Platformer,Y,2012,9,12
1,1,Amazing,LittleBigPlanet PS Vita -- Marvel Super Hero E...,/games/littlebigplanet-ps-vita-marvel-super-he...,PlayStation Vita,9.0,Platformer,Y,2012,9,12
2,2,Great,Splice: Tree of Life,/games/splice/ipad-141070,iPad,8.5,Puzzle,N,2012,9,12


Za pomocą parametru konfiguracyjnego `spark.sql.repl.eagerEval.enabled` naszego kontekstu, również możemy 
ułatwić sobie wgląd w zawartość naszych wyników. Warto także ustawić parametr aby kontrolować liczbę pobieranych w ten sposób wierszy (tak, w razie niedoszacowania wyniku)
* parametr `spark.sql.repl.eagerEval.enabled`

In [15]:
spark.conf.set('spark.sql.repl.eagerEval.enabled', True)
spark.conf.set('spark.sql.repl.eagerEval.maxNumRows', 3)
gameInfosDF

_c0,score_phrase,title,url,platform,score,genre,editors_choice,release_year,release_month,release_day
0,Amazing,LittleBigPlanet P...,/games/littlebigp...,PlayStation Vita,9.0,Platformer,Y,2012,9,12
1,Amazing,LittleBigPlanet P...,/games/littlebigp...,PlayStation Vita,9.0,Platformer,Y,2012,9,12
2,Great,Splice: Tree of Life,/games/splice/ipa...,iPad,8.5,Puzzle,N,2012,9,12


Wykorzystuj powyższe, aby móc podglądać uzyskiwane wyniki

 
**9. Na początek coś prostego. 
Wyświetl trzy najlepiej ocenione gry wydane w roku 2016 na platformę PC.**

In [16]:
from pyspark.sql.functions import col, lit
# tu wprowadź swoje rozwiazanie
gameInfosDF.filter(col("platform") == "PC").filter(col("release_year") == "2016").orderBy(col("score").desc()).limit(3).select("title","score")

                                                                                

title,score
Undertale,10.0
The Witness,10.0
Inside,10.0


**10. Określ dla każdej oceny opisowej (score_phrase) minimalną i 
maksymalną ocenę liczbową. Wyniki posortuj
rosnąco pod względem minimalnej oceny liczbowej.**

In [17]:
from pyspark.sql.functions import *
spark.conf.set('spark.sql.repl.eagerEval.maxNumRows', 20)
# tu wprowadź swoje rozwiazanie
gameInfosDF.groupBy("score_phrase").agg(max(col("score")).alias("max"),min(col("score")).alias("min")).select("score_phrase","max","min").orderBy("min")


                                                                                

score_phrase,max,min
Disaster,0.8,0.5
Unbearable,1.9,1.0
Painful,2.9,2.0
Awful,3.9,3.0
Bad,4.9,4.0
Mediocre,5.9,5.0
Okay,6.9,6.0
Good,7.9,7.0
Great,8.9,8.0
Amazing,9.9,9.0


**11. To może coś trudniejszego. Wyznacz liczbę oraz średnią ocenę gier wydawanych w poszczególnych latach
począwszy od roku 2000 na poszczególne platformy. Nie analizuj wszystkich platform – ogranicz je tylko do
tych, dla których liczba wszystkich recenzji gier biorąc pod uwagę wszystkie lata przekroczyła 500.**

*Uwaga: Klasycznie odwołalibyśmy się do źródłowego zboru danych dwa razy. Raz aby wyznaczyć popularne platformy, a następnie aby wyznaczyć ostateczny wynik. 
Korzystając z funkcji analitycznych możesz to zadanie rozwiązać sięgając do źródłowych danych tylko raz.*

**Rozwiąż to zadanie na dwa sposoby:**

a. Za pomocą DataFrame API


In [18]:
from pyspark.sql.window import Window
# tu wprowadź swoje rozwiazanie

platformsOver500 = gameInfosDF.groupBy(expr("platform as platformName")).agg(count(gameInfosDF._c0).alias("how_many")).select("platformName", "how_many").filter(col("how_many") > 500)

platformsOver500.join(gameInfosDF, platformsOver500["platformName"] == gameInfosDF.platform).groupBy("release_year", "platform").agg(avg(col("score")).alias("avg"), count("_c0").alias("count")).select("platform","release_year","avg","count").filter(col("release_year") > 1999).orderBy(col("release_year"), col("platform"))

                                                                                

platform,release_year,avg,count
PC,2000,7.0278195488721815,266
PlayStation,2000,6.438306451612904,248
PlayStation 2,2000,7.2444444444444445,45
Game Boy Advance,2001,6.803947368421052,76
GameCube,2001,8.152941176470588,17
PC,2001,7.092444444444446,225
PlayStation,2001,6.268750000000001,48
PlayStation 2,2001,7.19748427672956,159
Xbox,2001,7.78787878787879,33
Game Boy Advance,2002,6.693023255813954,172


b. Za pomocą SQL (po zarejestrowaniu źródeł danych jako tymczasowych perspektyw).

In [19]:
gameInfosDF.createOrReplaceTempView("gameinfos")
# tu wprowadź swoje rozwiazanie
spark.sql("""SELECT gi1.release_year, gi1.platform, count(*), avg(gi1.score) FROM gameInfos gi1 JOIN (SELECT platform, count(*) c FROM gameInfos GROUP BY platform HAVING c >= 500) gi2 ON (gi1.platform = gi2.platform) 
 GROUP BY gi1.release_year, gi1.platform HAVING gi1.release_year >= 2000  ORDER BY gi1.release_year, gi1.platform """)

release_year,platform,count(1),avg(score)
2000,PC,266,7.0278195488721815
2000,PlayStation,248,6.438306451612904
2000,PlayStation 2,45,7.2444444444444445
2001,Game Boy Advance,76,6.803947368421052
2001,GameCube,17,8.152941176470588
2001,PC,225,7.092444444444446
2001,PlayStation,48,6.268750000000001
2001,PlayStation 2,159,7.19748427672956
2001,Xbox,33,7.78787878787879
2002,Game Boy Advance,172,6.693023255813954


**12. Jeśli masz swoją ulubioną serię gier (https://pl.wikipedia.org/wiki/Kategoria:Serie_gier_komputerowych)
zobacz jakie średnie oceny zdobyły poszczególne pozycje z tej serii. Wyniki posortuj chronologicznie.**

In [20]:
# tu wprowadź swoje rozwiazanie


**13. (opcjonalne) Porównaj ze sobą gry wchodzące w skład wybranych serii gier wchodzących w skład 20
najlepszych serii wg Guinessa (lista z 2010 roku). W związku z tym, że gry nie są wydawane co roku, pogrupuj
dane w przedziały o długości 5 lat.**

In [21]:
# tu wprowadź swoje rozwiazanie


In [22]:
# brudnopis

 
# MondialDB – DataFrames

**14. Na początku do zmiennych `citiesDF`, `countriesDF` załaduj odpowiednio dane z plików
`mondial.cities.json`, `mondial.countries.json`**

In [23]:
citiesDF = spark.read.json(f"/user/{username}/mondial.cities.json").cache()
countriesDF = spark.read.json(f"/user/{username}/mondial.countries.json").cache()

                                                                                

**15. Zapoznaj się z ich strukturą. Zwróć uwagę na występujące typy array.**

In [24]:
citiesDF.printSchema()
countriesDF.printSchema()

root
 |-- _id: string (nullable = true)
 |-- country: string (nullable = true)
 |-- elevation: double (nullable = true)
 |-- latitude: double (nullable = true)
 |-- location: struct (nullable = true)
 |    |-- coordinates: array (nullable = true)
 |    |    |-- element: double (containsNull = true)
 |    |-- type: string (nullable = true)
 |-- longitude: double (nullable = true)
 |-- name: string (nullable = true)
 |-- other_names: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- population: long (nullable = true)
 |-- province: string (nullable = true)

root
 |-- _id: struct (nullable = true)
 |    |-- $oid: string (nullable = true)
 |-- area: double (nullable = true)
 |-- capital: string (nullable = true)
 |-- code: string (nullable = true)
 |-- gdp: double (nullable = true)
 |-- government: string (nullable = true)
 |-- independence: struct (nullable = true)
 |    |-- $date: string (nullable = true)
 |-- inflation: double (nullable = true)
 |-- name: stri

**16. Zanim zaczniesz realizować zadania, zapoznaj się ze funkcją `explode`, która nadaje się świetnie do pracy z tablicami i ich rozpłaszczania.**

**Przykładowe zapytanie:**

In [32]:
countriesDF.where("name = 'Poland'").\
select(col("population"))

population
"[{23929757, 1946}..."


In [39]:
countries_last_year = countriesDF.\
    select(col("name"), explode_outer(countriesDF.population).alias("pop_year")).\
    select(col("name").alias("tmp_name"), col("pop_year.value").alias("pop"), col("pop_year.year").alias("year")).\
    filter(col("year")<=2009).groupBy(col("tmp_name")).agg(max(col("year")).alias("max_year"))



countriesDF.\
            join(countries_last_year, countriesDF.name==countries_last_year["tmp_name"]).\
            select("name","max_year", explode_outer(countriesDF.population).alias("pop_year")).\
            select("name","max_year", col("pop_year.value").alias("pop"), col("pop_year.year").alias("year")).\
            filter(col("year") == col("max_year")).\
            groupBy(lit("1")).agg(sum(col("pop"))).\
            show()
# countriesDF.columns

+----+---------+
|year| sum(pop)|
+----+---------+
|2009|316746100|
+----+---------+

+--------------------+--------+----+
|                name|     pop|year|
+--------------------+--------+----+
|             Belarus| 9503807|2009|
|              Poland|38153389|2009|
|             Romania|21498616|2009|
|       Faroe Islands|   48668|2009|
|         Switzerland| 7785806|2009|
|              Sweden| 9340682|2009|
|         Afghanistan|24485600|2009|
|          Azerbaijan| 8922447|2009|
|             Vietnam|85846997|2009|
|          Kazakhstan|16009597|2009|
|          Kyrgyzstan| 5107640|2009|
|              Israel| 7552000|2009|
|United Arab Emirates| 5066000|2009|
|               Haiti| 9923243|2009|
|       New Caledonia|  245580|2009|
|     Solomon Islands|  515870|2009|
|             Vanuatu|  234023|2009|
|                Mali|14517176|2009|
|                Chad|11039873|2009|
|            Djibouti|  818159|2009|
+--------------------+--------+----+
only showing top 20 rows



Zwróć uwagę także na inne funkcje z tej rodziny jak: `explode_outer`, `posexplode`, `posexplode_outer`
https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/functions.html


Wszystkie zadania wykonaj korzystając *DataFrame API*. Nie korzystaj z SQL.

**17. Oblicz sumę ludności wszystkich Państw na rok 2010. 
W sytuacji gdy w danym kraju nie przeprowadzono
badania w roku 2010 wykorzystaj najnowsze z badań wcześniejszych.**

In [25]:
from pyspark.sql.window import Window
# tu wprowadź swoje rozwiazanie
# countriesDF.agg(count(countriesDF.population).alias("how_many")).select("population", "how_many")
countriesDF.groupBy("year").agg(sum(col("population")).alias("pop")).filter(col("year") == 2009).select("pop")

AnalysisException: Column 'year' does not exist. Did you mean one of the following? [area, _id, code, gdp, name, capital, inflation, government, population, unemployment, independence];
'Aggregate ['year], ['year, sum(population#4927) AS pop#5426]
+- Relation [_id#4918,area#4919,capital#4920,code#4921,gdp#4922,government#4923,independence#4924,inflation#4925,name#4926,population#4927,unemployment#4928] json


**18. Było ciężko? Nie wierzę.**

**Teraz już będzie z górki. Podaj nazwy i gęstość zaludnienia trzech krajów o największej gęstości zaludnienia w roku 2010.**

In [41]:
# tu wprowadź swoje rozwiazanie
countries_last_year = countriesDF.\
    select(col("name"), explode_outer(countriesDF.population).alias("pop_year")).\
    select(col("name").alias("tmp_name"), col("pop_year.value").alias("pop"), col("pop_year.year").alias("year")).\
    filter(col("year")<=2010).groupBy(col("tmp_name")).agg(max(col("year")).alias("max_year"))



countriesDF.\
            join(countries_last_year, countriesDF.name==countries_last_year["tmp_name"]).\
            select("name","max_year","area", explode_outer(countriesDF.population).alias("pop_year")).\
            select("*", col("pop_year.value").alias("pop"), col("pop_year.year").alias("year")).\
            filter(col("year") == col("max_year")).\
            select("*", (col("pop")/col("area")).alias("density")).\
            orderBy(col("density").desc()).\
            limit(6).show()


+---------+-----+---------------+-------+----+------------------+
|     name| area|       pop_year|    pop|year|           density|
+---------+-----+---------------+-------+----+------------------+
|   Monaco|  1.9|  {36845, 2010}|  36845|2010|19392.105263157897|
|Singapore|632.6|{5076700, 2010}|5076700|2010| 8025.134366108125|
|  Bahrain|620.0|{1234596, 2010}|1234596|2010| 1991.283870967742|
|  Bermuda| 53.3|  {64237, 2010}|  64237|2010|1205.1969981238274|
+---------+-----+---------------+-------+----+------------------+




**19. Podaj trzy kraje o największym procencie ludności żyjącym w miastach powyżej 50 000 mieszkańców w roku
2010.**

In [53]:
# tu wprowadź swoje rozwiazanie
window = Window.partitionBy("name").orderBy(col("year").desc())

population_2010 = countriesDF.\
    select("name", "code", explode_outer(countriesDF.population).alias("pop_year")).\
    select("*", col("pop_year.value").alias("pop"), col("pop_year.year").alias("year")).\
    filter(col("year")<=2010).withColumn("number", row_number().over(window)).where(col("number") == 1)

cities_over_50k = citiesDF.\
                            select("country", "population").where(col("population")>50000).\
                            groupBy("country").agg(sum("population").alias("pop_city"))

cities_over_50k.join(population_2010, cities_over_50k["country"]==population_2010["code"]).\
select(col("name"), (col("pop_city")/col("pop")*100).alias("percent")).orderBy(col("percent").desc()).show()

+------------+------------------+
|        name|           percent|
+------------+------------------+
|   Singapore|             100.0|
|     Bahamas| 70.39229990555847|
| South Korea| 67.05403609999775|
|      Panama|  64.6663219618928|
|       Congo| 59.10584429977178|
|    Djibouti| 56.99058553827413|
|      Russia|50.358822224276814|
|     Bolivia| 49.29070208528456|
|      Brazil| 49.11962550059719|
|    Kiribati|48.692969007743216|
|       Gabon| 48.04886449362623|
|       Libya|44.035273247147806|
|      Mexico| 41.94759678280276|
|    Maldives| 40.84171031704606|
|   Argentina| 38.26722153567646|
|Saudi Arabia|38.203304664333096|
|       Japan| 37.53685926599513|
|     Estonia| 37.50569908672834|
| North Korea| 37.09013114823686|
|       Syria| 36.25182593693089|
+------------+------------------+
only showing top 20 rows



No cóż, dane dotyczące ludności w miastach są zapewne nowsze niż z 2010 roku.
Na marginesie, zarówno Melilla jak i Ceuta to hiszpańskie miasta, afrykańskie eksklawy położone na terytorium
Maroka. Oba liczą ponad 70 tyś mieszkańców i oba posiadają autonomię (uzyskaną jednocześnie w marcu 1995 roku)
dlatego znalazły się w naszym zestawieniu.
A co to takiego eksklawy i czy enklawa jest tym samym, to już możesz przeczytać samodzielnie np. tu:
https://pl.wikipedia.org/wiki/Eksklawa

In [42]:
# brudnopis
citiesDF.printSchema()
countriesDF.printSchema()

root
 |-- _id: string (nullable = true)
 |-- country: string (nullable = true)
 |-- elevation: double (nullable = true)
 |-- latitude: double (nullable = true)
 |-- location: struct (nullable = true)
 |    |-- coordinates: array (nullable = true)
 |    |    |-- element: double (containsNull = true)
 |    |-- type: string (nullable = true)
 |-- longitude: double (nullable = true)
 |-- name: string (nullable = true)
 |-- other_names: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- population: long (nullable = true)
 |-- province: string (nullable = true)

root
 |-- _id: struct (nullable = true)
 |    |-- $oid: string (nullable = true)
 |-- area: double (nullable = true)
 |-- capital: string (nullable = true)
 |-- code: string (nullable = true)
 |-- gdp: double (nullable = true)
 |-- government: string (nullable = true)
 |-- independence: struct (nullable = true)
 |    |-- $date: string (nullable = true)
 |-- inflation: double (nullable = true)
 |-- name: stri