# Zadania
1. Utwórz dwie nowe kolumny w ramce "Countries", w których powierzchnia i gęstość zaludnienia wyrażone sa odpowiednio w milach kwadratowych i osobach na milę kwadratową. 
  - Przelicznik: 1 mila kwadratowa = 0.38610 km kwadratowego (powierzchnię należy pomnożyć, a gęstość zaludnienia podzielić przez ten współczynnik)
  - Typami nowych kolumn mają być odpowiednio integer (powierzchnia) i float (gęstość). Zaokrąglanie do integer: "int(x)"
  - Nazwy nowych kolumn proszę ustawić odpowiednio na '<tt>Area (sq mi)</tt>' oraz '<tt>Pop Density (per sq mi)</tt>'
2. Na wykresie scatterplot narysuj zależność liczby lotnisk od powierzchni kraju. 
  - Dane potrzebne do wykresu są w dwóch ramkach (Countries i Airports), konieczne więc będzie ich złączenie (join).
  - Wskazówka: w pierwszym etapie stwórz ramkę agregującą liczbę lotnisk w zależności od kraju. W drugim etapie złącz (join) tę ramkę z ramką countries. Kolumną wspólną (warunkiem złączenia) będzie Country.
  - Nie zawsze nazwy tych samych krajów będą identyczne w obu ramkach, co może stanowić problem (złączenie nie powstanie). Zastanów się jak przy pomocy mechanizmu złączeń wykryć, które nazwy się różnią. Stwórz odpowiednią ramkę, która zawiera takie nazwy. 
  - Następnie zmień nazwy kilku największych państw w jednej z ramek, tak żeby ujednolicić nazwy. Można to zrobić funkcją replace:
<br><tt>cdf=cdf.replace(['Korea, South', 'Korea, North'], ['South Korea', 'North Korea'], 'Country')</tt>
3. Do ramki "Countries" dodaj nową kolumnę "Continent" (według klasyfikacji Africa, Asia, Europe, North America, South America, Antarctica, Australia/Oceania). 
  - Skorzystaj z kolumny "Region" i pomocniczej funkcji (UDF) mapującej region na kontynent.
4. Oblicz, ile lotnisk jest na poszczególnych kontynentach. To zadanie również wymaga złączenia ramek "Countries" i "Airports". Wynik przedstaw na wykresie słupkowym.


In [402]:
from pyspark.sql import SparkSession

In [403]:
import pandas as pd

In [404]:
import numpy as np

In [405]:
spark = SparkSession.builder.getOrCreate()

In [406]:
import matplotlib
import matplotlib.pyplot as plt
matplotlib.style.use('ggplot')

In [407]:
# Wbudowane funkcje, które mogą być przydatne:
# - udf: służy do tworzenia funkcji użytkownika
# - trim: usuwa białe spacje z początku i końca stringu
# - isnull: testuje, czy wartość jest pusta
from pyspark.sql.functions import col, udf, trim, isnull
from pyspark.sql.types import FloatType, IntegerType

In [408]:
def to_float (s) :
   return float(s.replace(',','.'))
float_udf = udf(to_float , FloatType())

In [409]:
countries = spark.read.csv("countries of the world.csv", inferSchema=True, header=True)

# Usuwa kropki z wszystkich nazw kolumn (powodują wiele błędów, prawdopodobnie bug w Sparku)
# (Nb. wszystkie inne metody zmiany tych nazw (np. użycie columnRenamed) zawiodły)
new_columns = [s.replace('.','') for s in countries.columns]
countries = countries.toDF(*new_columns) # tutaj '*' to operator "splat" -- robi z tablicy listę argumentów

# Tak można ustawić wyświetlanie większej liczby wierszy w Jupyterze
pd.set_option('display.max_rows', 20)
display(countries.toPandas())

Unnamed: 0,Country,Region,Population,Area (sq mi),Pop Density (per sq mi),Coastline (coast/area ratio),Net migration,Infant mortality (per 1000 births),GDP ($ per capita),Literacy (%),Phones (per 1000),Arable (%),Crops (%),Other (%),Climate,Birthrate,Deathrate,Agriculture,Industry,Service
0,Afghanistan,ASIA (EX. NEAR EAST),31056997,647500,480,000,2306,16307,700.0,360,32,1213,022,8765,1,466,2034,038,024,038
1,Albania,EASTERN EUROPE,3581655,28748,1246,126,-493,2152,4500.0,865,712,2109,442,7449,3,1511,522,0232,0188,0579
2,Algeria,NORTHERN AFRICA,32930091,2381740,138,004,-039,31,6000.0,700,781,322,025,9653,1,1714,461,0101,06,0298
3,American Samoa,OCEANIA,57794,199,2904,5829,-2071,927,8000.0,970,2595,10,15,75,2,2246,327,,,
4,Andorra,WESTERN EUROPE,71201,468,1521,000,66,405,19000.0,1000,4972,222,0,9778,3,871,625,,,
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
222,West Bank,NEAR EAST,2460492,5860,4199,000,298,1962,800.0,,1452,169,1897,6413,3,3167,392,009,028,063
223,Western Sahara,NORTHERN AFRICA,273008,266000,10,042,,,,,,002,0,9998,1,,,,,04
224,Yemen,NEAR EAST,21456188,527970,406,036,0,615,800.0,502,372,278,024,9698,1,4289,83,0135,0472,0393
225,Zambia,SUB-SAHARAN AFRICA,11502010,752614,153,000,0,8829,800.0,806,82,708,003,929,2,41,1993,022,029,0489


In [410]:
# Utworzenie nowej ramki 'cdf':
# - wartosci puste wypelnione "-1", zeby nie usuwac wierszy
# - trim stringów (w nazwach występują niepożądane spacje na końcu)
# - konwersja wszystkich kolumn zmiennoprzecinkowych na float
# - zmiana 'sq mi' na 'sq km', bo w zbiorze wartości są faktycznie w km^2, a nie milach^2
cdf = countries.na.fill("-1").select( 
                 trim(col('Country')).alias('Country'),\
                 trim(col('Region')).alias('Region'),'Population',\
                 col('`Area (sq mi)`').alias('Area (sq km)'),\
                 float_udf('`Pop Density (per sq mi)`').alias('Pop Density (per sq km)'),\
                 float_udf('Coastline (coast/area ratio)').alias('Coastline (coast/area ratio)'),\
                 float_udf('Net migration').alias('Net migration'),\
                 float_udf('Infant mortality (per 1000 births)').alias('Infant mortality (per 1000 births)'),\
                 'GDP ($ per capita)',\
                 float_udf('Literacy (%)').alias('Literacy (%)'),\
                 float_udf('Phones (per 1000)').alias('Phones (per 1000)'),\
                 float_udf('Arable (%)').alias('Arable (%)'),\
                 float_udf('Crops (%)').alias('Crops (%)'),\
                 float_udf('Other (%)').alias('Other (%)'),\
                 float_udf('Birthrate').alias('Birthrate'),\
                 float_udf('Deathrate').alias('Deathrate'),\
                 float_udf('Agriculture').alias('Agriculture'),\
                 float_udf('Industry').alias('Industry'),\
                 float_udf('Service').alias('Service'))

# Zadanie 1: nowe kolumny

In [411]:
cdf.toPandas()

Unnamed: 0,Country,Region,Population,Area (sq km),Pop Density (per sq km),Coastline (coast/area ratio),Net migration,Infant mortality (per 1000 births),GDP ($ per capita),Literacy (%),Phones (per 1000),Arable (%),Crops (%),Other (%),Birthrate,Deathrate,Agriculture,Industry,Service
0,Afghanistan,ASIA (EX. NEAR EAST),31056997,647500,48.000000,0.000000,23.059999,163.070007,700.0,36.000000,3.200000,12.13,0.220000,87.650002,46.599998,20.34,0.380,0.240,0.380
1,Albania,EASTERN EUROPE,3581655,28748,124.599998,1.260000,-4.930000,21.520000,4500.0,86.500000,71.199997,21.09,4.420000,74.489998,15.110000,5.22,0.232,0.188,0.579
2,Algeria,NORTHERN AFRICA,32930091,2381740,13.800000,0.040000,-0.390000,31.000000,6000.0,70.000000,78.099998,3.22,0.250000,96.529999,17.139999,4.61,0.101,0.600,0.298
3,American Samoa,OCEANIA,57794,199,290.399994,58.290001,-20.709999,9.270000,8000.0,97.000000,259.500000,10.00,15.000000,75.000000,22.459999,3.27,-1.000,-1.000,-1.000
4,Andorra,WESTERN EUROPE,71201,468,152.100006,0.000000,6.600000,4.050000,19000.0,100.000000,497.200012,2.22,0.000000,97.779999,8.710000,6.25,-1.000,-1.000,-1.000
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
222,West Bank,NEAR EAST,2460492,5860,419.899994,0.000000,2.980000,19.620001,800.0,-1.000000,145.199997,16.90,18.969999,64.129997,31.670000,3.92,0.090,0.280,0.630
223,Western Sahara,NORTHERN AFRICA,273008,266000,1.000000,0.420000,-1.000000,-1.000000,,-1.000000,-1.000000,0.02,0.000000,99.980003,-1.000000,-1.00,-1.000,-1.000,0.400
224,Yemen,NEAR EAST,21456188,527970,40.599998,0.360000,0.000000,61.500000,800.0,50.200001,37.200001,2.78,0.240000,96.980003,42.889999,8.30,0.135,0.472,0.393
225,Zambia,SUB-SAHARAN AFRICA,11502010,752614,15.300000,0.000000,0.000000,88.290001,800.0,80.599998,8.200000,7.08,0.030000,92.900002,41.000000,19.93,0.220,0.290,0.489


In [412]:
cdf = cdf.withColumn('Area (sq mi)', col('Area (sq km)') * 0.38610)\
    .withColumn('Pop Density (per sq mi)', col('Pop Density (per sq km)') / 0.38610)

In [413]:
cdf = cdf.withColumn('Area (sq mi)', cdf['Area (sq mi)'].cast(IntegerType()))

In [414]:
cdf.toPandas()

Unnamed: 0,Country,Region,Population,Area (sq km),Pop Density (per sq km),Coastline (coast/area ratio),Net migration,Infant mortality (per 1000 births),GDP ($ per capita),Literacy (%),...,Arable (%),Crops (%),Other (%),Birthrate,Deathrate,Agriculture,Industry,Service,Area (sq mi),Pop Density (per sq mi)
0,Afghanistan,ASIA (EX. NEAR EAST),31056997,647500,48.000000,0.000000,23.059999,163.070007,700.0,36.000000,...,12.13,0.220000,87.650002,46.599998,20.34,0.380,0.240,0.380,249999,124.320124
1,Albania,EASTERN EUROPE,3581655,28748,124.599998,1.260000,-4.930000,21.520000,4500.0,86.500000,...,21.09,4.420000,74.489998,15.110000,5.22,0.232,0.188,0.579,11099,322.714319
2,Algeria,NORTHERN AFRICA,32930091,2381740,13.800000,0.040000,-0.390000,31.000000,6000.0,70.000000,...,3.22,0.250000,96.529999,17.139999,4.61,0.101,0.600,0.298,919589,35.742036
3,American Samoa,OCEANIA,57794,199,290.399994,58.290001,-20.709999,9.270000,8000.0,97.000000,...,10.00,15.000000,75.000000,22.459999,3.27,-1.000,-1.000,-1.000,76,752.136736
4,Andorra,WESTERN EUROPE,71201,468,152.100006,0.000000,6.600000,4.050000,19000.0,100.000000,...,2.22,0.000000,97.779999,8.710000,6.25,-1.000,-1.000,-1.000,180,393.939410
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
222,West Bank,NEAR EAST,2460492,5860,419.899994,0.000000,2.980000,19.620001,800.0,-1.000000,...,16.90,18.969999,64.129997,31.670000,3.92,0.090,0.280,0.630,2262,1087.542072
223,Western Sahara,NORTHERN AFRICA,273008,266000,1.000000,0.420000,-1.000000,-1.000000,,-1.000000,...,0.02,0.000000,99.980003,-1.000000,-1.00,-1.000,-1.000,0.400,102702,2.590003
224,Yemen,NEAR EAST,21456188,527970,40.599998,0.360000,0.000000,61.500000,800.0,50.200001,...,2.78,0.240000,96.980003,42.889999,8.30,0.135,0.472,0.393,203849,105.154101
225,Zambia,SUB-SAHARAN AFRICA,11502010,752614,15.300000,0.000000,0.000000,88.290001,800.0,80.599998,...,7.08,0.030000,92.900002,41.000000,19.93,0.220,0.290,0.489,290584,39.627040


# Join examples

In [415]:
valuesA = [('Pirate',1),('Monkey',2),('Ninja',3),('Spaghetti',4)]
TableA = spark.createDataFrame(valuesA,['name','id'])
 
valuesB = [('Rutabaga',1),('Pirate',2),('Ninja',3),('Darth Vader',4)]
TableB = spark.createDataFrame(valuesB,['name','id'])

In [416]:
a = TableA.alias('a')
b = TableB.alias('b')

In [417]:
a.join(b, a.name==b.name).show() # how='inner'

+------+---+------+---+
|  name| id|  name| id|
+------+---+------+---+
| Ninja|  3| Ninja|  3|
|Pirate|  1|Pirate|  2|
+------+---+------+---+



In [418]:
a.join(b, a.name==b.name, how='left').show()

+---------+---+------+----+
|     name| id|  name|  id|
+---------+---+------+----+
|Spaghetti|  4|  null|null|
|    Ninja|  3| Ninja|   3|
|   Pirate|  1|Pirate|   2|
|   Monkey|  2|  null|null|
+---------+---+------+----+



In [419]:
a.join(b, a.name==b.name, how='right').show()

+------+----+-----------+---+
|  name|  id|       name| id|
+------+----+-----------+---+
|  null|null|   Rutabaga|  1|
| Ninja|   3|      Ninja|  3|
|Pirate|   1|     Pirate|  2|
|  null|null|Darth Vader|  4|
+------+----+-----------+---+



In [420]:
a.join(b, a.name==b.name, how='full_outer').show()

+---------+----+-----------+----+
|     name|  id|       name|  id|
+---------+----+-----------+----+
|     null|null|   Rutabaga|   1|
|Spaghetti|   4|       null|null|
|    Ninja|   3|      Ninja|   3|
|   Pirate|   1|     Pirate|   2|
|   Monkey|   2|       null|null|
|     null|null|Darth Vader|   4|
+---------+----+-----------+----+



# Zadanie 2: zależność liczby lotnisk od powierzchni kraju 

In [421]:
airports = spark.read.csv("airports2.csv", inferSchema=True, header=False)

In [422]:
airports = airports.toDF('Airport ID', 'Name', 'City', 'Country', 
'IATA', 'ICAO', 'Latitude', 'Longitude', 'Altitude', 'Timezone',
'DST', 'Tz database time zone', 'Type', 'Source'
)

In [423]:
airports.toPandas()

Unnamed: 0,Airport ID,Name,City,Country,IATA,ICAO,Latitude,Longitude,Altitude,Timezone,DST,Tz database time zone,Type,Source
0,1,Goroka Airport,Goroka,Papua New Guinea,GKA,AYGA,-6.081690,145.391998,5282,10,U,Pacific/Port_Moresby,airport,OurAirports
1,2,Madang Airport,Madang,Papua New Guinea,MAG,AYMD,-5.207080,145.789001,20,10,U,Pacific/Port_Moresby,airport,OurAirports
2,3,Mount Hagen Kagamuga Airport,Mount Hagen,Papua New Guinea,HGU,AYMH,-5.826790,144.296005,5388,10,U,Pacific/Port_Moresby,airport,OurAirports
3,4,Nadzab Airport,Nadzab,Papua New Guinea,LAE,AYNZ,-6.569803,146.725977,239,10,U,Pacific/Port_Moresby,airport,OurAirports
4,5,Port Moresby Jacksons International Airport,Port Moresby,Papua New Guinea,POM,AYPY,-9.443380,147.220001,146,10,U,Pacific/Port_Moresby,airport,OurAirports
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
7179,12053,Rugao Air Base,Rugao,China,RUG,ZSRG,32.257885,120.501656,0,\N,\N,\N,airport,OurAirports
7180,12054,Wuhu Air Base,Wuhu,China,WHU,ZSWU,31.390600,118.408997,0,\N,\N,\N,airport,OurAirports
7181,12055,Shanshan Airport,Shanshan,China,SXJ,ZWSS,42.911701,90.247498,0,\N,\N,\N,airport,OurAirports
7182,12056,Yingkou Lanqi Airport,Yingkou,China,YKH,ZYYK,40.542524,122.358600,0,\N,\N,\N,airport,OurAirports


In [424]:
# TODO
# zacznij od stworzenia ramki, która zawiera posortowaną liczbę lotnisk w poszczególnych krajach

In [425]:
import pyspark.sql.functions as func

In [426]:
sumAirports = airports \
    .groupBy("Country") \
    .agg(func.count("Name").alias("Airports sum")) \
    .select("Country", "Airports sum") \
    .sort("Airports sum") \
    .withColumn("Country", trim(col("Country")))

In [427]:
countries = countries.replace(['Korea, South', 'Korea, North', 'South Korea', 'North Korea'], 'Country')

In [428]:
countries = countries \
    .withColumn("Country", trim(col("Country")))

In [429]:
sumAirports.toPandas()

Unnamed: 0,Country,Airports sum
0,Anguilla,1
1,British Indian Ocean Territory,1
2,Jersey,1
3,Norfolk Island,1
4,Saint Helena,1
...,...,...
232,Russia,238
233,Germany,241
234,Australia,296
235,Canada,417


In [430]:
# TODO złączenie ramek i wykres

In [431]:
a = countries.alias('a')
b = sumAirports.alias('b')

In [439]:
joined = b.join(a, on=['Country'], how='inner')

In [440]:
joined.toPandas()

Unnamed: 0,Country,Airports sum,Region,Population,Area (sq mi),Pop Density (per sq mi),Coastline (coast/area ratio),Net migration,Infant mortality (per 1000 births),GDP ($ per capita),...,Phones (per 1000),Arable (%),Crops (%),Other (%),Climate,Birthrate,Deathrate,Agriculture,Industry,Service
0,Chad,6,SUB-SAHARAN AFRICA,9944201,1284000,77,000,-011,9382,1200.0,...,13,286,002,9712,2,4573,1638,0335,0259,0406
1,Paraguay,8,LATIN AMER. & CARIB,6506464,406750,160,000,-008,2563,4700.0,...,492,76,023,9217,2,291,449,0224,0207,0569
2,Anguilla,1,LATIN AMER. & CARIB,13477,102,1321,5980,1076,2103,8600.0,...,4600,0,0,100,2,1417,534,004,018,078
3,Russia,238,C.W. OF IND. STATES,142893540,17075200,84,022,102,1539,8900.0,...,2806,733,011,9256,,995,1465,0054,0371,0575
4,Yemen,11,NEAR EAST,21456188,527970,406,036,0,615,800.0,...,372,278,024,9698,1,4289,83,0135,0472,0393
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
200,United Kingdom,162,WESTERN EUROPE,60609153,244820,2476,508,219,516,27700.0,...,5435,2346,021,7633,3,1071,1013,0005,0237,0758
201,Moldova,5,C.W. OF IND. STATES,4466706,33843,1320,000,-026,4042,1800.0,...,2081,553,1079,3391,,157,1264,0213,0233,0555
202,Vietnam,24,ASIA (EX. NEAR EAST),84402966,329560,2561,105,-045,2595,2500.0,...,1877,1997,595,7408,2,1686,622,0209,041,0381
203,Netherlands,26,WESTERN EUROPE,16491461,41526,3971,109,291,504,28600.0,...,4608,2671,097,7232,3,109,868,0021,0244,0736


In [446]:
joined_anti = a.join(b, on=['Country'], how='leftanti')

In [447]:
joined_anti.toPandas()

Unnamed: 0,Country,Region,Population,Area (sq mi),Pop Density (per sq mi),Coastline (coast/area ratio),Net migration,Infant mortality (per 1000 births),GDP ($ per capita),Literacy (%),Phones (per 1000),Arable (%),Crops (%),Other (%),Climate,Birthrate,Deathrate,Agriculture,Industry,Service
0,Andorra,WESTERN EUROPE,71201,468,1521,000,66,405,19000,1000,4972,222,0,9778,3,871,625,,,
1,Antigua & Barbuda,LATIN AMER. & CARIB,69108,443,1560,3454,-615,1946,11000,890,5499,1818,455,7727,2,1693,537,0038,022,0743
2,"Bahamas, The",LATIN AMER. & CARIB,303770,13940,218,2541,-22,2521,16700,956,4606,08,04,988,2,1757,905,003,007,09
3,Bosnia & Herzegovina,EASTERN EUROPE,4498976,51129,880,004,031,2105,6100,,2154,136,296,8344,4,877,827,0142,0308,055
4,British Virgin Is.,LATIN AMER. & CARIB,23098,153,1510,5229,1001,1805,16000,978,5065,20,667,7333,2,1489,442,0018,0062,092
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
17,St Pierre & Miquelon,NORTHERN AMERICA,7026,242,290,4959,-486,754,6900,990,6832,1304,0,8696,,1352,683,,,
18,San Marino,WESTERN EUROPE,29251,61,4795,000,1098,573,34600,960,7043,1667,0,8333,,1002,817,,,
19,Sao Tome & Principe,SUB-SAHARAN AFRICA,193413,1001,1932,2088,-272,4311,1200,793,362,625,4896,4479,2,4025,647,0167,0148,0684
20,Trinidad & Tobago,LATIN AMER. & CARIB,1065842,5128,2079,706,-1083,2431,9500,986,3035,1462,916,7622,2,129,1057,0007,057,0423


In [438]:
sumAirports.toPandas().to_csv(r'sumAirports.csv', index = False)
joined.toPandas().to_csv(r'joined.csv', index = False)



In [434]:
# TODO wykrycie niepasujących nazw państwa i ujednolicenie wartości 

# Zadanie 3

In [435]:
# TODO

# Zadanie 4

In [436]:
# TODO