# Część 2

## 2.1

In [226]:
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, DoubleType
from pyspark.sql.functions import col, expr, concat, lit, sum, to_date, min, max, avg, count, month, year

spark=SparkSession.builder.appName("BigDataEx5").getOrCreate()

main_df = spark.read.csv("final_data.csv", header=True, inferSchema=True)
main_df = main_df.withColumn("date", to_date(col("date"), "dd/MM/yyyy"))
main_df.show(5)
main_df.printSchema()


                                                                                

+------------+-------------+----------------+------------------+------------+----------+---------------------+------------------+----------------------+----------------------------+----------------------+------------+--------------+-----------------+--------------------+--------------------------+----+--------+---------+---------------+---------------+---------------+---------------+---------------+---------------+---------------+---------------+----------+-----------------+-----------+---------------------------+---------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+-----------------+------------------+-------------------+-------------------+-------------------+-------------------+-------------------+-------------------+-----------------+------------------+-------------------+-------------------+-------------------+-------------------+-------------------+-------------------+------------------

## 2.2 i 2.3


In [227]:
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, DoubleType

# Ręcznie definiujesz schemat tylko dla wybranych kolumn
schema_manual = StructType([
    StructField("Country_name", StringType(), True),
    StructField("3_letter_code", StringType(), True),
    StructField("population_urban", IntegerType(), True),
    StructField("gdp_per_capita_usd", DoubleType(), True),
    StructField("date", StringType(), True),
    StructField("Total_confirmed_cases", IntegerType(), True),
    StructField("Total_new_deceased", IntegerType(), True),
    StructField("Total_new_vaccinations", IntegerType(), True),
    StructField("Area (km²)", DoubleType(), True),
    StructField("Density (per km²)", DoubleType(), True),
    StructField("2022 Population", IntegerType(), True)
])

wanted_cols = [field.name for field in schema_manual.fields]

df_selected = main_df.select(wanted_cols)

# To nie działa bo spark idzie po kolei od lewej do prawej i bierze pierwsze 
#df_selected = spark.read.csv("final_data.csv", header=True, schema=schema_manual)

df_selected.printSchema()
df_selected.show(5)


root
 |-- Country_name: string (nullable = true)
 |-- 3_letter_code: string (nullable = true)
 |-- population_urban: integer (nullable = true)
 |-- gdp_per_capita_usd: integer (nullable = true)
 |-- date: date (nullable = true)
 |-- Total_confirmed_cases: integer (nullable = true)
 |-- Total_new_deceased: integer (nullable = true)
 |-- Total_new_vaccinations: integer (nullable = true)
 |-- Area (km²): integer (nullable = true)
 |-- Density (per km²): double (nullable = true)
 |-- 2022 Population: integer (nullable = true)

+------------+-------------+----------------+------------------+----------+---------------------+------------------+----------------------+----------+-----------------+---------------+
|Country_name|3_letter_code|population_urban|gdp_per_capita_usd|      date|Total_confirmed_cases|Total_new_deceased|Total_new_vaccinations|Area (km²)|Density (per km²)|2022 Population|
+------------+-------------+----------------+------------------+----------+---------------------+----

## 2.4

In [228]:

df = df_selected.withColumn(
    "urban_population_ratio",
    expr("try_divide(population_urban, `2022 Population`)")
)
df.show(5)


+------------+-------------+----------------+------------------+----------+---------------------+------------------+----------------------+----------+-----------------+---------------+----------------------+
|Country_name|3_letter_code|population_urban|gdp_per_capita_usd|      date|Total_confirmed_cases|Total_new_deceased|Total_new_vaccinations|Area (km²)|Density (per km²)|2022 Population|urban_population_ratio|
+------------+-------------+----------------+------------------+----------+---------------------+------------------+----------------------+----------+-----------------+---------------+----------------------+
|     Finland|          FIN|         4716888|             50150|2021-01-17|                  236|                 1|                 62050|    338424|          16.3722|        5540745|    0.8513093455843934|
|     Finland|          FIN|         4716888|             50150|2021-01-24|                  246|                 5|                 53248|    338424|          16.3722|

## 2.5

In [229]:
df_with_concat = df_selected.withColumn(
    "country_info",
    concat(
        col("Country_name"),
        lit(" ("),
        col("3_letter_code"),
        lit(") - "),
        col("date")
    )
)

df_with_concat.show(5, truncate=False)

+------------+-------------+----------------+------------------+----------+---------------------+------------------+----------------------+----------+-----------------+---------------+--------------------------+
|Country_name|3_letter_code|population_urban|gdp_per_capita_usd|date      |Total_confirmed_cases|Total_new_deceased|Total_new_vaccinations|Area (km²)|Density (per km²)|2022 Population|country_info              |
+------------+-------------+----------------+------------------+----------+---------------------+------------------+----------------------+----------+-----------------+---------------+--------------------------+
|Finland     |FIN          |4716888         |50150             |2021-01-17|236                  |1                 |62050                 |338424    |16.3722          |5540745        |Finland (FIN) - 2021-01-17|
|Finland     |FIN          |4716888         |50150             |2021-01-24|246                  |5                 |53248                 |338424    |16

## 2.6

In [230]:
# 1. Kraje z gdp_per_capita_usd większym niż 100000
df1 = main_df.filter(col("gdp_per_capita_usd") > 100000).select("Country_name", "gdp_per_capita_usd").distinct()
df1.show(5)
df1.write.csv("wyniki_2_6/high_gdp_countries", header=True, mode="overwrite")

+-------------+------------------+
| Country_name|gdp_per_capita_usd|
+-------------+------------------+
|   Luxembourg|            116348|
|Liechtenstein|            173356|
+-------------+------------------+



In [231]:
# 2. Kraje z populacją miejską większą niż 50 milionów
df2 = main_df.filter(col("population_urban") > 50000000).select("Country_name", "population_urban").distinct()
df2.show(5)
df2.write.csv("wyniki_2_6/large_urban_population", header=True, mode="overwrite")


+--------------+----------------+
|  Country_name|population_urban|
+--------------+----------------+
|        Mexico|       102626859|
|United Kingdom|        55908316|
|    Bangladesh|        60987417|
|        France|        54123364|
|   Philippines|        50975903|
+--------------+----------------+
only showing top 5 rows


In [232]:
# 3. Kraje z gęstością zaludnienia większą niż 1000 osób/km²
df3 = main_df.filter(col("Density (per km²)") > 1000).select("Country_name", "Density (per km²)").distinct()
df3.show(5)
df3.write.csv("wyniki_2_6/high_density_countries", header=True, mode="overwrite")

+------------+-----------------+
|Country_name|Density (per km²)|
+------------+-----------------+
|  Bangladesh|         1160.035|
|     Bahrain|        1924.4876|
|   Singapore|        8416.4634|
|     Bermuda|        1188.5926|
|    Maldives|        1745.9567|
+------------+-----------------+
only showing top 5 rows


In [233]:
# 4. Kraje, gdzie liczba nowych zgonów COVID jest większa niż 1000 w danym dniu
df4 = main_df.filter(col("Total_new_deceased") > 1000).select("Country_name", "date", "Total_new_deceased").distinct()
df4.show(5)
df4.write.csv("wyniki_2_6/high_covid_deaths", header=True, mode="overwrite")

+--------------+----------+------------------+
|  Country_name|      date|Total_new_deceased|
+--------------+----------+------------------+
|        Mexico|2021-01-08|              1197|
|        Mexico|2021-01-16|              1294|
|United Kingdom|2021-01-13|              1219|
|        Mexico|2021-01-17|              1402|
|        Mexico|2021-01-25|              1478|
+--------------+----------+------------------+
only showing top 5 rows


In [234]:
# 6. Kraje, gdzie łączna liczba nowych szczepień przekracza 100 000 000
df6 = main_df.groupBy("Country_name") \
    .agg(sum("Total_new_vaccinations").alias("total_vaccinations")) \
    .filter(col("total_vaccinations") > 100000000)

df6.show(5)
df6.write.csv("wyniki_2_6/high_total_vaccinations", header=True, mode="overwrite")

+--------------------+------------------+
|        Country_name|total_vaccinations|
+--------------------+------------------+
|          Bangladesh|         127358755|
|           Indonesia|         188810480|
|              Brazil|         141361044|
|               India|         980066849|
|United States of ...|         259495882|
+--------------------+------------------+
only showing top 5 rows


In [235]:
# 7. Dni, w których w Finlandi było powyżej 20 zgonów
df7 = main_df.filter(col("Total_new_deceased") > 20).filter(col("Country_name") == "Finland").select("Country_name", "date", "Total_new_deceased").orderBy(col("Total_new_deceased").desc())
df7.show(5)
df7.write.csv("wyniki_2_6/date_when_deceased_gt_500_Finland", header=True, mode="overwrite", sep=",")

+------------+----------+------------------+
|Country_name|      date|Total_new_deceased|
+------------+----------+------------------+
|     Finland|2022-03-23|                50|
|     Finland|2022-03-07|                40|
|     Finland|2022-03-06|                35|
|     Finland|2022-04-05|                34|
|     Finland|2022-03-08|                30|
+------------+----------+------------------+
only showing top 5 rows


In [236]:
# 8. Kiedy i w jakich krajach osoby były w pełni zaszczepione
df8 = main_df.filter(col("New_persons_fully_vaccinated") > 0).groupBy("Country_name").agg(min("date").alias("New_persons_fully_vaccinated")).orderBy("New_persons_fully_vaccinated")
df8.show(5)
df8.write.csv("wyniki_2_6/countries_and_date_fully_vaccinated", header=True, mode="overwrite", sep=",")

+--------------------+----------------------------+
|        Country_name|New_persons_fully_vaccinated|
+--------------------+----------------------------+
|United States of ...|                  2020-12-14|
|         Switzerland|                  2020-12-21|
|              France|                  2020-12-27|
|             Estonia|                  2020-12-28|
|      Czech Republic|                  2020-12-29|
+--------------------+----------------------------+
only showing top 5 rows


In [237]:
# 9. Pierwsze szczepienia kiedy i gdzie
df9 = main_df.filter(col("Total_new_vaccinations") > 0).select("Country_name", "date", "Total_new_vaccinations").orderBy("date")
df9.show(5)
df9.write.csv("wyniki_2_6/countries_and_date_first_vaccinated", header=True, mode="overwrite", sep=",")

+------------+----------+----------------------+
|Country_name|      date|Total_new_vaccinations|
+------------+----------+----------------------+
|      Latvia|2020-12-07|                     1|
|      Norway|2020-12-08|                     5|
|      Latvia|2020-12-09|                     1|
|      Norway|2020-12-09|                     1|
|      Norway|2020-12-10|                     1|
+------------+----------+----------------------+
only showing top 5 rows


In [238]:
# 10. Kraje z PKB per capita między 20000 a 50000 USD
df10 = main_df.filter((col("gdp_per_capita_usd") >= 20000) & (col("gdp_per_capita_usd") <= 50000)) \
    .select("Country_name", "gdp_per_capita_usd").distinct()
df10.show(5)
df10.write.csv("wyniki_2_6/gdp_midrange", header=True, mode="overwrite")

+--------------+------------------+
|  Country_name|gdp_per_capita_usd|
+--------------+------------------+
|  Saudi Arabia|             23139|
|United Kingdom|             43070|
|        France|             41300|
|         Aruba|             29007|
|        Greece|             20296|
+--------------+------------------+
only showing top 5 rows


## 2.7

In [239]:
# 1. Liczba dni, w których liczba nowych testów była większa niż 100000
df11 = main_df.filter(col("Total_tested") > 100000).groupBy("Country_name").agg(count("date").alias("days_with_more_than_100000_tests")).orderBy("Country_name")
df11.show(5)
df11.write.csv("wyniki_2_7/days_with_tested_gt_100000", header=True, mode="overwrite", sep=",")

+------------+--------------------------------+
|Country_name|days_with_more_than_100000_tests|
+------------+--------------------------------+
|   Argentina|                              25|
|   Australia|                             271|
|     Austria|                             508|
|     Belgium|                              38|
|      Brazil|                             259|
+------------+--------------------------------+
only showing top 5 rows


In [240]:
# 2. Suma wszystkich nowych szczepień dla każdego kraju
df12 = main_df.groupBy("Country_name").agg(sum("Total_new_vaccinations").alias("total_new_vaccinations")).orderBy("Country_name")
df12.show(5)
df12.write.csv("wyniki_2_7/total_vaccinations_per_country", header=True, mode="overwrite", sep=",")


+------------+----------------------+
|Country_name|total_new_vaccinations|
+------------+----------------------+
|     Albania|               1216854|
|   Argentina|              17660829|
|     Armenia|               1129104|
|       Aruba|                   332|
|   Australia|              41514738|
+------------+----------------------+
only showing top 5 rows


In [241]:
# 3. Maksymalna liczba potwierdzonych przypadków w jednym dniu dla każdego kraju
df13 = main_df.groupBy("Country_name").agg(max("Total_confirmed_cases").alias("max_confirmed_cases")).orderBy("Country_name")
df13.show(5)
df13.write.csv("wyniki_2_7/max_confirmed_cases_per_day", header=True, mode="overwrite", sep=",")

+------------+-------------------+
|Country_name|max_confirmed_cases|
+------------+-------------------+
|     Albania|               2177|
|   Argentina|             174174|
|     Armenia|               3127|
|       Aruba|                  3|
|   Australia|             150702|
+------------+-------------------+
only showing top 5 rows


In [242]:
# 4. Średnia liczba zmarłych dla kraju
df14 = main_df.groupBy("Country_name").agg(avg("Total_new_deceased").alias("avg_new_deceased")).orderBy("Country_name")
df14.show(5)
df14.write.csv("wyniki_2_7/average_new_confirmed", header=True, mode="overwrite", sep=",")

+------------+------------------+
|Country_name|  avg_new_deceased|
+------------+------------------+
|     Albania| 4.675438596491228|
|   Argentina|158.65774378585087|
|     Armenia|13.708333333333334|
|       Aruba|               0.0|
|   Australia|15.663419913419913|
+------------+------------------+
only showing top 5 rows


In [243]:
# 5. Nowi w pełni zaszczepieni w kazdym dniu
df15 = main_df.groupBy("date").agg(sum("New_persons_fully_vaccinated").alias("global_fully_vaccinated"))
df15.show(5)
df15.write.csv("wyniki_2_7/global_deceased", header=True, mode="overwrite", sep=",")

+----------+-----------------------+
|      date|global_fully_vaccinated|
+----------+-----------------------+
|2021-06-22|                6257511|
|2021-08-27|               12374078|
|2021-01-27|                 942969|
|2021-10-11|                7787152|
|2021-11-13|                8025234|
+----------+-----------------------+
only showing top 5 rows


## 2.8


In [244]:
main_df = main_df.withColumn("date_parsed", to_date(col("date"), "yyyy-MM-dd"))

main_df = main_df.withColumn("month", month(col("date_parsed")))
main_df = main_df.withColumn("year", year(col("date_parsed")))


monthly_covid_cases = main_df.groupBy("year", "month") \
    .agg(sum("Total_confirmed_cases").alias("monthly_confirmed_cases")) \
    .orderBy("year", "month")

monthly_covid_cases.show(10)
monthly_covid_cases.write.csv("wyniki_2_8/monthly_covid_cases", header=True, mode="overwrite", sep=",")


+----+-----+-----------------------+
|year|month|monthly_confirmed_cases|
+----+-----+-----------------------+
|2020|    2|                      2|
|2020|    3|                  10220|
|2020|    4|                  81909|
|2020|    5|                 429295|
|2020|    6|                 888503|
|2020|    7|                1269888|
|2020|    8|                1255338|
|2020|    9|                 904111|
|2020|   10|                 725208|
|2020|   11|                 800592|
+----+-----+-----------------------+
only showing top 10 rows


## 2.9


In [253]:
inner_join_df = df3.join(df2, on="Country_name", how="inner")
inner_join_df.show(5)
inner_join_df.write.csv("wyniki_2_9/inner_join", header=True, mode="overwrite", sep=",")
# Tylko kraje spełniające oba warunki: wysokie GDP i duża populacja miejska

+------------+-----------------+----------------+
|Country_name|Density (per km²)|population_urban|
+------------+-----------------+----------------+
|  Bangladesh|         1160.035|        60987417|
+------------+-----------------+----------------+



In [254]:
left_join_df = df1.join(df2, on="Country_name", how="left")
left_join_df.show()
left_join_df.write.csv("wyniki_2_9/left_join", header=True, mode="overwrite", sep=",")
# Kraje z wysokim GDP + NULL bo nie mają populacji > 50 mln

+-------------+------------------+----------------+
| Country_name|gdp_per_capita_usd|population_urban|
+-------------+------------------+----------------+
|   Luxembourg|            116348|            NULL|
|Liechtenstein|            173356|            NULL|
+-------------+------------------+----------------+



In [256]:
right_join_df = df1.join(df2, on="Country_name", how="right")
right_join_df.show()
right_join_df.write.csv("wyniki_2_9/right_join", header=True, mode="overwrite", sep=",")
# Kraje z wysokim GDP + NULL bo nie mają populacji > 50 mln

+--------------------+------------------+----------------+
|        Country_name|gdp_per_capita_usd|population_urban|
+--------------------+------------------+----------------+
|              Mexico|              NULL|       102626859|
|      United Kingdom|              NULL|        55908316|
|          Bangladesh|              NULL|        60987417|
|              France|              NULL|        54123364|
|         Philippines|              NULL|        50975903|
|           Indonesia|              NULL|       151509724|
|                Iran|              NULL|        62509623|
|              Brazil|              NULL|       183241641|
|              Turkey|              NULL|        63097818|
|United States of ...|              NULL|       270663028|
|               India|              NULL|       471031528|
|               Japan|              NULL|       115782416|
|            Pakistan|              NULL|        79927762|
|              Russia|              NULL|       10768388

In [257]:
outer_join_df = df1.join(df2, on="Country_name", how="outer")
outer_join_df.show()
outer_join_df.write.csv("wyniki_2_9/outer_join_df", header=True, mode="overwrite", sep=",")
# Wszystkie kraje: bogate, miejskie i pozostałe – nawet jeśli nie pasują

+--------------------+------------------+----------------+
|        Country_name|gdp_per_capita_usd|population_urban|
+--------------------+------------------+----------------+
|          Bangladesh|              NULL|        60987417|
|              Brazil|              NULL|       183241641|
|              France|              NULL|        54123364|
|               India|              NULL|       471031528|
|           Indonesia|              NULL|       151509724|
|                Iran|              NULL|        62509623|
|               Japan|              NULL|       115782416|
|       Liechtenstein|            173356|            NULL|
|          Luxembourg|            116348|            NULL|
|              Mexico|              NULL|       102626859|
|            Pakistan|              NULL|        79927762|
|         Philippines|              NULL|        50975903|
|              Russia|              NULL|       107683889|
|              Turkey|              NULL|        6309781

In [None]:
left_semi_df = df3.join(df6, on="Country_name", how="left_semi")
left_semi_df.show(5)
left_semi_df.write.csv("wyniki_2_9/left_semi", header=True, mode="overwrite", sep=",")
# Pokaż tylko te kraje z dużą gęstością zaludnienia, które mają łącznie ponad 100 milionów szczepień.

+------------+-----------------+
|Country_name|Density (per km²)|
+------------+-----------------+
|  Bangladesh|         1160.035|
+------------+-----------------+



In [261]:
left_anti_df = df10.join(df4, on="Country_name", how="left_anti")
left_anti_df.show(5)
left_anti_df.write.csv("wyniki_2_9/left_anti", header=True, mode="overwrite", sep=",")
# Pokaż kraje z PKB między 20k–50k USD, które nie miały pojedynczego dnia z >1000 zgonów.

+--------------------+------------------+
|        Country_name|gdp_per_capita_usd|
+--------------------+------------------+
|        Saudi Arabia|             23139|
|              France|             41300|
|               Aruba|             29007|
|              Greece|             20296|
|United Arab Emirates|             43103|
+--------------------+------------------+
only showing top 5 rows


In [264]:
cross_join_df = df3.crossJoin(df7)
cross_join_df.show(5)
cross_join_df.write.csv("wyniki_2_9/cross_join", header=True, mode="overwrite", sep=",")
# Stworzy wszystkie możliwe pary między krajami o dużej gęstości i dniami w Finlandii z dużą liczbą zgonów – może służyć do generowania scenariuszy lub testów korelacji.

+------------+-----------------+------------+----------+------------------+
|Country_name|Density (per km²)|Country_name|      date|Total_new_deceased|
+------------+-----------------+------------+----------+------------------+
|  Bangladesh|         1160.035|     Finland|2022-01-03|                23|
|  Bangladesh|         1160.035|     Finland|2022-01-25|                21|
|  Bangladesh|         1160.035|     Finland|2022-01-26|                21|
|  Bangladesh|         1160.035|     Finland|2022-01-30|                26|
|  Bangladesh|         1160.035|     Finland|2022-02-01|                29|
+------------+-----------------+------------+----------+------------------+
only showing top 5 rows
