In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, DoubleType, DateType
from pyspark.sql.functions import col, concat, lit, when, year, month, udf
from pyspark.sql.functions import (
    sum as spark_sum,
    min as spark_min,
    max as spark_max,
    avg as spark_avg,
    count as spark_count
)
spark=SparkSession.builder.appName("sesja").getOrCreate()

Using Spark's default log4j profile: org/apache/spark/log4j2-defaults.properties
25/05/30 19:48:48 WARN Utils: Your hostname, Kuba, resolves to a loopback address: 127.0.1.1; using 10.255.255.254 instead (on interface lo)
25/05/30 19:48:48 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Using Spark's default log4j profile: org/apache/spark/log4j2-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/05/30 19:48:49 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [2]:
schema = StructType(
    [
    StructField("location_key", StringType(), True),
    StructField("date", DateType(), True),
    StructField("country_name", StringType(), True),
    StructField("new_confirmed", IntegerType(), True),
    StructField("cumulative_confirmed", IntegerType(), True),
    StructField("new_tested", IntegerType(), True),
    StructField("cumulative_tested", IntegerType(), True),
    StructField("new_deceased", IntegerType(), True),
    StructField("cumulative_deceased", IntegerType(), True),
    StructField("new_persons_vaccinated", IntegerType(), True),
    StructField("cumulative_persons_vaccinated", IntegerType(), True),
    StructField("new_persons_fully_vaccinated", IntegerType(), True),
    StructField("cumulative_persons_fully_vaccinated", IntegerType(), True),
    StructField("new_vaccine_doses_administered", IntegerType(), True),
    StructField("cumulative_vaccine_doses_administered", IntegerType(), True),
    StructField("population", IntegerType(), True),
    StructField("human_development_index", DoubleType(), True),
    StructField("continent", StringType(), True),
    StructField("area_sq_km", DoubleType(), True),
    StructField("nurses_per_1000", DoubleType(), True),
    StructField("physicians_per_1000", DoubleType(), True)
    ])
df = spark.read.csv("combined_data.csv", header=True, schema=schema)
df.printSchema()


root
 |-- location_key: string (nullable = true)
 |-- date: date (nullable = true)
 |-- country_name: string (nullable = true)
 |-- new_confirmed: integer (nullable = true)
 |-- cumulative_confirmed: integer (nullable = true)
 |-- new_tested: integer (nullable = true)
 |-- cumulative_tested: integer (nullable = true)
 |-- new_deceased: integer (nullable = true)
 |-- cumulative_deceased: integer (nullable = true)
 |-- new_persons_vaccinated: integer (nullable = true)
 |-- cumulative_persons_vaccinated: integer (nullable = true)
 |-- new_persons_fully_vaccinated: integer (nullable = true)
 |-- cumulative_persons_fully_vaccinated: integer (nullable = true)
 |-- new_vaccine_doses_administered: integer (nullable = true)
 |-- cumulative_vaccine_doses_administered: integer (nullable = true)
 |-- population: integer (nullable = true)
 |-- human_development_index: double (nullable = true)
 |-- continent: string (nullable = true)
 |-- area_sq_km: double (nullable = true)
 |-- nurses_per_1000: do

In [3]:
df = df.withColumn("population_density", col("population") / col("area_sq_km"))
df.select("country_name", "population", "area_sq_km", "population_density").show(10, truncate=False)

+--------------+----------+----------+------------------+
|country_name  |population|area_sq_km|population_density|
+--------------+----------+----------+------------------+
|Czech Republic|10649800  |78870.0   |135.02979586661596|
|Finland       |5517919   |338450.0  |16.303498301078445|
|Bulgaria      |7000039   |111000.0  |63.06341441441442 |
|Switzerland   |8544527   |41290.0   |206.9393799951562 |
|Cyprus        |875899    |9250.0    |94.69178378378379 |
|Japan         |126476458 |377970.0  |334.62036140434424|
|Slovenia      |2080908   |20675.0   |100.64851269649336|
|Israel        |8655541   |22070.0   |392.18581785228815|
|Kuwait        |4270563   |17820.0   |239.65            |
|Argentina     |44938712  |2780400.0 |16.162678751258813|
+--------------+----------+----------+------------------+
only showing top 10 rows


In [4]:
df = df.withColumn(
    "tests_vs_vaccinations",
    concat(
        col("date").cast("string"), lit(": "),
        col("new_tested").cast("string"), lit(" tested / "),
        col("new_persons_vaccinated").cast("string"), lit(" vaccinated")
    )
)
df = df.withColumn(
    "confirmed_vs_deceased",
    concat(
        col("date").cast("string"), lit(": "),
        col("new_confirmed").cast("string"), lit(" confirmed / "),
        col("new_deceased").cast("string"), lit(" deceased")
    )
)

df.select("country_name", "population", "area_sq_km", "population_density", "tests_vs_vaccinations", "confirmed_vs_deceased").show(10, truncate=False)
df.coalesce(1) \
      .write \
      .option("header", "true") \
      .mode("overwrite") \
      .csv("dataframes/initial_df")


+--------------+----------+----------+------------------+--------------------------------------------+------------------------------------------+
|country_name  |population|area_sq_km|population_density|tests_vs_vaccinations                       |confirmed_vs_deceased                     |
+--------------+----------+----------+------------------+--------------------------------------------+------------------------------------------+
|Czech Republic|10649800  |78870.0   |135.02979586661596|2021-01-04: 35181 tested / 3605 vaccinated  |2021-01-04: 13029 confirmed / 152 deceased|
|Finland       |5517919   |338450.0  |16.303498301078445|2022-02-04: 11757 tested / 38135 vaccinated |2022-02-04: 6952 confirmed / 22 deceased  |
|Bulgaria      |7000039   |111000.0  |63.06341441441442 |2021-01-30: 4532 tested / 101 vaccinated    |2021-01-30: 130 confirmed / 17 deceased   |
|Switzerland   |8544527   |41290.0   |206.9393799951562 |2021-10-23: 19506 tested / 4260 vaccinated  |2021-10-23: 954 confir

                                                                                

In [5]:
#  1. dni w ktorych zachorowalo wiecej niz 400 000 osob w danym kraju
result1 = df.filter(col("new_confirmed") > 400000) \
  .select("date", "country_name", "new_confirmed") \
  .orderBy(col("new_confirmed").desc())

result1.show(20, truncate=False)
result1.coalesce(1) \
      .write \
      .option("header", "true") \
      .mode("overwrite") \
      .csv("dataframes/result1")

+----------+------------+-------------+
|date      |country_name|new_confirmed|
+----------+------------+-------------+
|2022-01-25|France      |501635       |
|2022-01-18|France      |464769       |
|2022-01-19|France      |436167       |
|2022-01-26|France      |428008       |
|2022-01-20|France      |425183       |
|2022-02-01|France      |416896       |
|2021-05-05|India       |414188       |
|2021-05-04|India       |412262       |
|2021-05-07|India       |403738       |
|2021-04-29|India       |401993       |
|2021-05-06|India       |401078       |
|2022-01-21|France      |400851       |
+----------+------------+-------------+



In [6]:
#  2. kraje z liczba pielegniarek przekraczajaca 100 000
result2 = df.withColumn("nurses", col("nurses_per_1000") * col("population") / 1000) \
    .filter(col("nurses") > 100000) \
    .select("country_name", "nurses", "nurses_per_1000") \
    .distinct() \
    .orderBy((col("nurses")).desc())
result2.show(30, truncate=False)
result2.coalesce(1) \
      .write \
      .option("header", "true") \
      .mode("overwrite") \
      .csv("dataframes/result2")

+--------------+------------------+---------------+
|country_name  |nurses            |nurses_per_1000|
+--------------+------------------+---------------+
|India         |2383405.5733335   |1.7271         |
|Brazil        |2150888.659671    |10.119         |
|Japan         |1537081.0417198   |12.1531        |
|France        |768684.6770281    |11.4707        |
|Indonesia     |660532.1923528999 |2.4149         |
|United Kingdom|544660.1933976    |8.1723         |
|Philippines   |498342.95823530003|4.9351         |
|Canada        |375300.4607766    |9.9438         |
|Italy         |346469.8299946    |5.7401         |
|Australia     |320043.9064548    |12.5508        |
|Mexico        |265947.81858330005|2.3961         |
|Poland        |261731.4039912    |6.8926         |
|Belgium       |222940.4374666    |19.4614        |
|Turkey        |222287.9229374    |2.7107         |
|Netherlands   |193281.98277570002|11.1839        |
|Thailand      |192599.0792954    |2.7593         |
|Saudi Arabi

In [7]:
#  3. kraje z liczba lekarzy mniejsza niz 5000
result3 = df.withColumn("physicians", col("physicians_per_1000") * col("population") / 1000) \
    .filter(col("physicians") < 5000) \
    .select("country_name", "physicians", "physicians_per_1000") \
    .distinct() \
    .orderBy((col("physicians")).asc())
result3.show(30, truncate=False)
result3.coalesce(1) \
      .write \
      .option("header", "true") \
      .mode("overwrite") \
      .csv("dataframes/result3")

+-----------------+------------------+-------------------+
|country_name     |physicians        |physicians_per_1000|
+-----------------+------------------+-------------------+
|Dominica         |80.55072990000001 |1.1189             |
|Bhutan           |327.31781040000004|0.4242             |
|Belize           |446.4886209       |1.1229             |
|Equatorial Guinea|563.5790744999999 |0.4017             |
|Togo             |640.7741664       |0.0774             |
|Malawi           |684.852389        |0.0358             |
|Suriname         |709.8858034       |1.2101             |
|Barbados         |713.9157753000001 |2.4843             |
|Fiji             |770.94184         |0.86               |
|Namibia          |1062.6110712000002|0.4182             |
|Senegal          |1157.005563       |0.0691             |
|Malta            |1411.4800282      |2.8598             |
|Iceland          |1455.7378998      |4.0778             |
|Gabon            |1517.7239232      |0.6819            

In [8]:
#  4. kraj o HDI mniejszym niz 0.5
result4 = df.filter(col("human_development_index") < 0.5) \
    .select("country_name", "human_development_index") \
    .distinct() \
    .orderBy(col("human_development_index").asc())
result4.show(30, truncate=False)
result4.coalesce(1) \
      .write \
      .option("header", "true") \
      .mode("overwrite") \
      .csv("dataframes/result4")

+------------+-----------------------+
|country_name|human_development_index|
+------------+-----------------------+
|Burkina Faso|0.423                  |
|Mozambique  |0.437                  |
|Malawi      |0.477                  |
|Haiti       |0.498                  |
+------------+-----------------------+



In [9]:
#  5. dzien w ktorym bylo najmniej zachorowan w Polsce
result5 = df.filter(col("country_name") == "Poland") \
    .select("date", "new_confirmed") \
    .orderBy(col("new_confirmed").desc())
result5.show(30, truncate=False)
result5.coalesce(1) \
      .write \
      .option("header", "true") \
      .mode("overwrite") \
      .csv("dataframes/result5")

+----------+-------------+
|date      |new_confirmed|
+----------+-------------+
|2022-01-26|57659        |
|2022-02-02|54477        |
|2022-02-09|35777        |
|2021-03-31|35251        |
|2021-03-24|34151        |
|2022-01-19|32835        |
|2021-11-24|28128        |
|2021-04-07|27887        |
|2021-12-08|27458        |
|2021-12-01|27356        |
|2021-03-17|27278        |
|2021-11-17|24882        |
|2022-02-16|23990        |
|2021-12-15|22097        |
|2021-04-14|21130        |
|2021-03-10|21045        |
|2021-11-10|19074        |
|2021-12-22|17156        |
|2022-01-12|16878        |
|2022-02-23|16724        |
|2022-01-05|16576        |
|2021-11-03|15515        |
|2021-03-03|15250        |
|2021-12-29|14325        |
|2021-04-21|12762        |
|2022-03-02|12483        |
|2021-02-24|12142        |
|2022-03-16|11658        |
|2022-03-09|11637        |
|2021-02-17|9073         |
+----------+-------------+
only showing top 30 rows


In [10]:
#  6. kraj z najwieksza liczba zgonow w Europie
result6 = df.filter(col("continent") == "Europe") \
    .groupBy(col("country_name")) \
    .agg(spark_sum("new_deceased") \
    .alias("total_deceased")) \
    .orderBy(col("total_deceased").desc())
result6.show(30, truncate=False)
result6.coalesce(1) \
      .write \
      .option("header", "true") \
      .mode("overwrite") \
      .csv("dataframes/result6")

+----------------------+--------------+
|country_name          |total_deceased|
+----------------------+--------------+
|Italy                 |104777        |
|United Kingdom        |92381         |
|France                |82304         |
|Czech Republic        |29565         |
|Hungary               |19383         |
|Poland                |17214         |
|Greece                |14651         |
|Belgium               |12275         |
|Austria               |9844          |
|Bulgaria              |7310          |
|Switzerland           |6869          |
|Lithuania             |6597          |
|Serbia                |5931          |
|Latvia                |5229          |
|Denmark               |5148          |
|Slovenia              |3983          |
|Croatia               |3203          |
|Norway                |2814          |
|Estonia               |2445          |
|Portugal              |2273          |
|Netherlands           |1221          |
|Sweden                |1010          |


In [11]:
#  7. dni w Polsce w ktorych wskaznik smiertelnosci przekroczyl 5%
result7 = df.withColumn("mortality_rate", col("new_deceased") / col("new_confirmed")) \
    .filter((col("country_name") == "Poland") & (col("mortality_rate") > 0.05)) \
    .select("date", "mortality_rate", "new_confirmed", "new_deceased") \
    .orderBy(col("mortality_rate").desc())
result7.show(30, truncate=False)
result7.coalesce(1) \
      .write \
      .option("header", "true") \
      .mode("overwrite") \
      .csv("dataframes/result7")

+----------+--------------------+-------------+------------+
|date      |mortality_rate      |new_confirmed|new_deceased|
+----------+--------------------+-------------+------------+
|2021-06-30|0.23469387755102042 |98           |23          |
|2021-06-09|0.2198952879581152  |382          |84          |
|2021-06-16|0.21100917431192662 |218          |46          |
|2021-07-07|0.20430107526881722 |93           |19          |
|2021-06-23|0.16326530612244897 |147          |24          |
|2021-06-02|0.1590909090909091  |572          |91          |
|2021-05-19|0.11984659635666348 |2086         |250         |
|2021-07-14|0.11428571428571428 |105          |12          |
|2021-05-26|0.10975609756097561 |1230         |135         |
|2021-05-12|0.09168900804289544 |3730         |342         |
|2021-05-05|0.07930337428082725 |6431         |510         |
|2021-07-21|0.07142857142857142 |126          |9           |
|2021-02-03|0.06834975369458128 |6496         |444         |
|2021-02-10|0.0650684931

In [12]:
#  8. dni w ktorych odsetek przypdakow przekroczyl 0.5% populacji
result8 = df.withColumn("new_confirmed / population", col("new_confirmed") / col("population")) \
    .filter(col("new_confirmed / population") > 0.005) \
    .select("country_name", "date", "new_confirmed / population", "new_confirmed", "population") \
    .orderBy(col("new_confirmed / population").desc())
result8.show(30, truncate=False)
result8.coalesce(1) \
      .write \
      .option("header", "true") \
      .mode("overwrite") \
      .csv("dataframes/result8")

+------------+----------+--------------------------+-------------+----------+
|country_name|date      |new_confirmed / population|new_confirmed|population|
+------------+----------+--------------------------+-------------+----------+
|Finland     |2022-02-03|0.02479267999403398       |136804       |5517919   |
|Slovenia    |2022-02-01|0.011260949547024664      |23433        |2080908   |
|Israel      |2022-01-22|0.00985230154879978       |85277        |8655541   |
|Israel      |2022-01-23|0.009677500227888701      |83764        |8655541   |
|Denmark     |2022-02-08|0.009493494837567716      |55120        |5806081   |
|Denmark     |2022-02-01|0.009472999084924926      |55001        |5806081   |
|Denmark     |2022-02-09|0.00925701863270595       |53747        |5806081   |
|Denmark     |2022-01-27|0.009241173176881274      |53655        |5806081   |
|Israel      |2022-01-24|0.008825907011473922      |76393        |8655541   |
|Denmark     |2022-01-26|0.008789577685877962      |51033       

In [13]:
#  9. dzien w ktorym w Polsce ponad 10% populacji przeszlo chorobe
result9 = df.withColumn("cumulative_confirmed / population", col("cumulative_confirmed") / col("population")) \
    .filter((col("country_name") == "Poland") & (col("cumulative_confirmed / population") > 0.1)) \
    .select("country_name", "date") \
    .orderBy(col("cumulative_confirmed").asc())
result9.show(30, truncate=False)
result9.coalesce(1) \
      .write \
      .option("header", "true") \
      .mode("overwrite") \
      .csv("dataframes/result9")

+------------+----------+
|country_name|date      |
+------------+----------+
|Poland      |2021-12-15|
|Poland      |2021-12-22|
|Poland      |2021-12-29|
|Poland      |2022-01-05|
|Poland      |2022-01-12|
|Poland      |2022-01-19|
|Poland      |2022-01-26|
|Poland      |2022-02-02|
|Poland      |2022-02-09|
|Poland      |2022-02-16|
|Poland      |2022-02-23|
|Poland      |2022-03-02|
|Poland      |2022-03-09|
|Poland      |2022-03-16|
|Poland      |2022-03-23|
|Poland      |2022-03-30|
|Poland      |2022-04-06|
|Poland      |2022-04-13|
|Poland      |2022-04-20|
|Poland      |2022-04-27|
|Poland      |2022-05-04|
|Poland      |2022-05-11|
|Poland      |2022-05-18|
|Poland      |2022-05-25|
|Poland      |2022-06-01|
|Poland      |2022-06-08|
+------------+----------+



In [14]:
# 10. dni w Polsce w ktorych wskaznik pozytywnych testow przekroczyl 20%
result10 = df.withColumn("new_confirmed / new_tested", col("new_confirmed") / col("new_tested")) \
    .filter((col("new_confirmed / new_tested") > 0.2) & (col("country_name") == "Poland")) \
    .select("date", "new_confirmed / new_tested", "new_tested", "new_confirmed") \
    .orderBy(col("new_confirmed / new_tested").desc())
result10.show(30, truncate=False)
result10.coalesce(1) \
      .write \
      .option("header", "true") \
      .mode("overwrite") \
      .csv("dataframes/result10")

+----------+--------------------------+----------+-------------+
|date      |new_confirmed / new_tested|new_tested|new_confirmed|
+----------+--------------------------+----------+-------------+
|2021-03-24|0.33292389280457013       |102579    |34151        |
|2021-03-31|0.3221328703280636        |109430    |35251        |
|2022-01-26|0.3212236348037304        |179498    |57659        |
|2021-03-17|0.31630333951762524       |86240     |27278        |
|2022-02-02|0.3111122532894737        |175104    |54477        |
|2021-03-10|0.286982490590738         |73332     |21045        |
|2022-01-19|0.2625077948865544        |125082    |32835        |
|2021-11-24|0.2535996032998242        |110915    |28128        |
|2021-12-01|0.2509471521222629        |109011    |27356        |
|2021-11-17|0.25005276010732913       |99507     |24882        |
|2021-04-07|0.2500381059974357        |111531    |27887        |
|2022-02-09|0.24928232998885172       |143520    |35777        |
|2021-12-08|0.24603060822

In [15]:
# 1. suma smierci na kontynentach
result11 = df.groupBy("continent") \
    .agg(spark_sum("new_deceased").alias("total_deceased")) \
    .orderBy(col("total_deceased").desc())
result11.show(30, truncate=False)
result11.coalesce(1) \
      .write \
      .option("header", "true") \
      .mode("overwrite") \
      .csv("dataframes/result11")

+-------------+--------------+
|continent    |total_deceased|
+-------------+--------------+
|Asia         |677141        |
|South America|582317        |
|Europe       |438363        |
|North America|179345        |
|Africa       |46906         |
|Oceania      |13669         |
+-------------+--------------+



In [16]:
# 2. srednia liczba dziennych zachorowan w krajach w Europie
result12 = df.filter(col("continent") == "Europe") \
    .groupBy("country_name") \
    .agg(spark_avg("new_confirmed").alias("confirmed_daily_avg")) \
    .orderBy(col("confirmed_daily_avg").desc())
result12.show(30, truncate=False)
result12.coalesce(1) \
      .write \
      .option("header", "true") \
      .mode("overwrite") \
      .csv("dataframes/result12")

+----------------------+-------------------+
|country_name          |confirmed_daily_avg|
+----------------------+-------------------+
|France                |51856.41263940521  |
|United Kingdom        |38387.620689655174 |
|Italy                 |31506.55023923445  |
|Netherlands           |12596.555555555555 |
|Poland                |10967.180555555555 |
|Portugal              |8996.835616438357  |
|Belgium               |6608.810408921933  |
|Switzerland           |5769.412044374009  |
|Denmark               |5552.113172541744  |
|Greece                |5540.322784810127  |
|Czech Republic        |5418.357256778309  |
|Finland               |4370.2             |
|Austria               |3844.2581453634084 |
|Hungary               |3797.71875         |
|Serbia                |3650.1805555555557 |
|Norway                |2617.5377358490564 |
|Sweden                |1977.3972602739725 |
|Lithuania             |1910.396887159533  |
|Slovenia              |1734.446153846154  |
|Belarus  

In [17]:
# 3. kraje ktore raportowaly najwieksza liczbe razy
result13 = df.groupBy(col("country_name")) \
    .agg(spark_count("*").alias("number_of_daily_reports")) \
    .orderBy(col("number_of_daily_reports").desc())
result13.show(30, truncate=False)
result13.coalesce(1) \
      .write \
      .option("header", "true") \
      .mode("overwrite") \
      .csv("dataframes/result13")

+-------------------+-----------------------+
|country_name       |number_of_daily_reports|
+-------------------+-----------------------+
|Switzerland        |631                    |
|Italy              |627                    |
|Czech Republic     |627                    |
|Estonia            |623                    |
|Malaysia           |568                    |
|Australia          |568                    |
|Israel             |549                    |
|Latvia             |540                    |
|Denmark            |539                    |
|Belgium            |538                    |
|France             |538                    |
|Norway             |530                    |
|Argentina          |523                    |
|Slovenia           |520                    |
|Lithuania          |514                    |
|Malta              |503                    |
|United Kingdom     |493                    |
|Canada             |487                    |
|Ecuador            |479          

In [18]:
# 4. kraj z najmniejszym odsetkiem zgonow w stosunku do osob zakazonych
result274 = df.groupBy(col("country_name")) \
    .agg(spark_sum(col("new_confirmed")).alias("total_confirmed"), spark_sum(col("new_deceased")).alias("total_deceased")) \
    .withColumn("mortality_rate", when(col("total_confirmed") == 0, 0) \
    .otherwise(col("total_deceased") / col("total_confirmed"))) \
    .filter(col("mortality_rate") > 0) \
    .select("country_name", "mortality_rate", "total_confirmed", "total_deceased") \
    .orderBy(col("mortality_rate").asc())
result274.show(100, truncate=False)
result274.coalesce(1) \
      .write \
      .option("header", "true") \
      .mode("overwrite") \
      .csv("dataframes/result274")

+----------------------+---------------------+---------------+--------------+
|country_name          |mortality_rate       |total_confirmed|total_deceased|
+----------------------+---------------------+---------------+--------------+
|New Zealand           |2.4270920581740654E-4|210128         |51            |
|Qatar                 |2.878526194588371E-4 |10422          |3             |
|Iceland               |3.8061060816137887E-4|36783          |14            |
|Kuwait                |9.002716336825766E-4 |64425          |58            |
|Australia             |0.001339602121519039 |10089563       |13516         |
|Netherlands           |0.0013462674981696936|906952         |1221          |
|Maldives              |0.0016977637536013171|116624         |198           |
|Denmark               |0.0017202495899035918|2992589        |5148          |
|Switzerland           |0.0018868292506054802|3640499        |6869          |
|Israel                |0.0020020348972294745|3895037        |77

In [19]:
# 5. najwieksza liczba dziennych zakazen dla kazdego kraju w Azji
result275 = df.filter(col("continent") == "Asia") \
    .groupBy(col("continent"), col("country_name")) \
    .agg(spark_max(col("new_confirmed")).alias("max_daily_confirmed")) \
    .select("country_name", "max_daily_confirmed") \
    .orderBy(col("max_daily_confirmed").desc())
result275.show(100, truncate=False)
result275.coalesce(1) \
      .write \
      .option("header", "true") \
      .mode("overwrite") \
      .csv("dataframes/result275")

+--------------------+-------------------+
|country_name        |max_daily_confirmed|
+--------------------+-------------------+
|India               |414188             |
|Vietnam             |263376             |
|Turkey              |111157             |
|Japan               |104497             |
|Israel              |85277              |
|Indonesia           |64718              |
|Malaysia            |33406              |
|Thailand            |28379              |
|Philippines         |22734              |
|Jordan              |21626              |
|Pakistan            |18785              |
|Bangladesh          |16230              |
|Sri Lanka           |9962               |
|Mongolia            |9518               |
|Iraq                |9149               |
|Nepal               |8730               |
|Bahrain             |8064               |
|Kuwait              |6913               |
|Saudi Arabia        |5928               |
|United Arab Emirates|3102               |
|Myanmar   

In [20]:

result14 = df.withColumn("month", month("date")) \
    .withColumn("year", year("date")) \
    .groupBy("year", "month") \
    .agg(spark_sum(col("new_confirmed")).alias("monthly_confirmed"), spark_sum(col("new_deceased")).alias("monthly_deceased")) \
    .orderBy("year", "month")
result14.show(40, truncate=False)
result14.coalesce(1) \
      .write \
      .option("header", "true") \
      .mode("overwrite") \
      .csv("dataframes/result14")

+----+-----+-----------------+----------------+
|year|month|monthly_confirmed|monthly_deceased|
+----+-----+-----------------+----------------+
|2020|12   |357271           |7861            |
|2021|1    |3817576          |111323          |
|2021|2    |4237525          |111535          |
|2021|3    |7840319          |147319          |
|2021|4    |14139572         |225023          |
|2021|5    |14029006         |254067          |
|2021|6    |8017857          |197300          |
|2021|7    |8897079          |170218          |
|2021|8    |9439359          |163373          |
|2021|9    |6172657          |95673           |
|2021|10   |4902271          |53447           |
|2021|11   |5916563          |49986           |
|2021|12   |11517596         |54384           |
|2022|1    |43862437         |86092           |
|2022|2    |22504896         |92174           |
|2022|3    |16682720         |51119           |
|2022|4    |10527271         |31050           |
|2022|5    |5389784          |15948     

In [21]:
# inner join
inner_join = result274.join(result275,result274.country_name ==  result275.country_name,"inner")
inner_join.show(40, truncate=False)
inner_join.coalesce(1) \
      .write \
      .option("header", "true") \
      .mode("overwrite") \
      .csv("dataframes/inner_join")

25/05/30 19:49:06 WARN Column: Constructing trivially true equals predicate, 'country_name == country_name'. Perhaps you need to use aliases.


+--------------------+---------------------+---------------+--------------+--------------------+-------------------+
|country_name        |mortality_rate       |total_confirmed|total_deceased|country_name        |max_daily_confirmed|
+--------------------+---------------------+---------------+--------------+--------------------+-------------------+
|Philippines         |0.01683108038942077  |556827         |9372          |Philippines         |22734              |
|Malaysia            |0.007785592971896696 |4523098        |35215         |Malaysia            |33406              |
|Turkey              |0.005750856308512526 |12349813       |71022         |Turkey              |111157             |
|Iraq                |0.008292166549047283 |56680          |470           |Iraq                |9149               |
|Maldives            |0.0016977637536013171|116624         |198           |Maldives            |2813               |
|Jordan              |0.0075989795514772775|202264         |1537

In [22]:
# left join
left_join = result274.join(result275,result274.country_name ==  result275.country_name,"left")
left_join.show(100, truncate=False)
left_join.coalesce(1) \
      .write \
      .option("header", "true") \
      .mode("overwrite") \
      .csv("dataframes/left_join")

25/05/30 19:49:06 WARN Column: Constructing trivially true equals predicate, 'country_name == country_name'. Perhaps you need to use aliases.


+----------------------+---------------------+---------------+--------------+--------------------+-------------------+
|country_name          |mortality_rate       |total_confirmed|total_deceased|country_name        |max_daily_confirmed|
+----------------------+---------------------+---------------+--------------+--------------------+-------------------+
|Paraguay              |0.04091783088750869  |320887         |13130         |NULL                |NULL               |
|Senegal               |0.02793391245103049  |5871           |164           |NULL                |NULL               |
|Sweden                |0.006996882577069622 |144350         |1010          |NULL                |NULL               |
|Philippines           |0.01683108038942077  |556827         |9372          |Philippines         |22734              |
|Malaysia              |0.007785592971896696 |4523098        |35215         |Malaysia            |33406              |
|Fiji                  |0.015473300970873786 |65

In [23]:
# right join
right_join = result274.join(result275,result274.country_name ==  result275.country_name,"right")
right_join.show(40, truncate=False)
right_join.coalesce(1) \
      .write \
      .option("header", "true") \
      .mode("overwrite") \
      .csv("dataframes/right_join")

25/05/30 19:49:07 WARN Column: Constructing trivially true equals predicate, 'country_name == country_name'. Perhaps you need to use aliases.


+--------------------+---------------------+---------------+--------------+--------------------+-------------------+
|country_name        |mortality_rate       |total_confirmed|total_deceased|country_name        |max_daily_confirmed|
+--------------------+---------------------+---------------+--------------+--------------------+-------------------+
|Japan               |0.0026694066624059986|8424344        |22488         |Japan               |104497             |
|Pakistan            |0.014860507500350483 |328118         |4876          |Pakistan            |18785              |
|Jordan              |0.0075989795514772775|202264         |1537          |Jordan              |21626              |
|United Arab Emirates|0.0029675687133469937|84918          |252           |United Arab Emirates|3102               |
|Philippines         |0.01683108038942077  |556827         |9372          |Philippines         |22734              |
|Israel              |0.0020020348972294745|3895037        |7798

In [24]:
# left semi join
left_semi_join = result274.join(result275,result274.country_name ==  result275.country_name,"leftsemi")
left_semi_join.show(100, truncate=False)
left_semi_join.coalesce(1) \
      .write \
      .option("header", "true") \
      .mode("overwrite") \
      .csv("dataframes/left_semi_join")

25/05/30 19:49:08 WARN Column: Constructing trivially true equals predicate, 'country_name == country_name'. Perhaps you need to use aliases.


+--------------------+---------------------+---------------+--------------+
|country_name        |mortality_rate       |total_confirmed|total_deceased|
+--------------------+---------------------+---------------+--------------+
|Qatar               |2.878526194588371E-4 |10422          |3             |
|Kuwait              |9.002716336825766E-4 |64425          |58            |
|Maldives            |0.0016977637536013171|116624         |198           |
|Israel              |0.0020020348972294745|3895037        |7798          |
|Japan               |0.0026694066624059986|8424344        |22488         |
|Mongolia            |0.002909068755306956 |262627         |764           |
|United Arab Emirates|0.0029675687133469937|84918          |252           |
|Vietnam             |0.0036674794507163978|1658087        |6081          |
|Saudi Arabia        |0.0048566264037106085|247291         |1201          |
|Bahrain             |0.005576100214912196 |154947         |864           |
|Turkey     

In [25]:
# left anti join
left_anti_join = result274.join(result275,result274.country_name ==  result275.country_name,"leftanti")
left_anti_join.show(100, truncate=False)
left_anti_join.coalesce(1) \
      .write \
      .option("header", "true") \
      .mode("overwrite") \
      .csv("dataframes/left_anti_join")

25/05/30 19:49:08 WARN Column: Constructing trivially true equals predicate, 'country_name == country_name'. Perhaps you need to use aliases.


+----------------------+---------------------+---------------+--------------+
|country_name          |mortality_rate       |total_confirmed|total_deceased|
+----------------------+---------------------+---------------+--------------+
|New Zealand           |2.4270920581740654E-4|210128         |51            |
|Iceland               |3.8061060816137887E-4|36783          |14            |
|Australia             |0.001339602121519039 |10089563       |13516         |
|Netherlands           |0.0013462674981696936|906952         |1221          |
|Denmark               |0.0017202495899035918|2992589        |5148          |
|Switzerland           |0.0018868292506054802|3640499        |6869          |
|Norway                |0.002028407800792189 |1387295        |2814          |
|Finland               |0.0025427898036703128|349616         |889           |
|Cyprus                |0.0027326103814576384|81241          |222           |
|France                |0.002950096330480756 |27898750       |82

In [26]:
# full join
full_join = result274.join(result275,result274.country_name ==  result275.country_name,"full")
full_join.show(100, truncate=False)
full_join.coalesce(1) \
      .write \
      .option("header", "true") \
      .mode("overwrite") \
      .csv("dataframes/full_join")

25/05/30 19:49:09 WARN Column: Constructing trivially true equals predicate, 'country_name == country_name'. Perhaps you need to use aliases.


+----------------------+---------------------+---------------+--------------+--------------------+-------------------+
|country_name          |mortality_rate       |total_confirmed|total_deceased|country_name        |max_daily_confirmed|
+----------------------+---------------------+---------------+--------------+--------------------+-------------------+
|Argentina             |0.010887135289233732 |7621656        |82978         |NULL                |NULL               |
|Australia             |0.001339602121519039 |10089563       |13516         |NULL                |NULL               |
|Austria               |0.006417799810803992 |1533859        |9844          |NULL                |NULL               |
|Bahrain               |0.005576100214912196 |154947         |864           |Bahrain             |8064               |
|Bangladesh            |0.016627850072446982 |729499         |12130         |Bangladesh          |16230              |
|Belarus               |0.008774890313871077 |29

In [27]:
# UDF (czesc 3)
grouped_df = df.withColumn("month", month("date")) \
    .withColumn("year", year("date")) \
    .groupBy("year", "month", "country_name") \
    .agg(spark_sum("new_confirmed").alias("total_confirmed_monthly"))

@udf(returnType=StringType())
def hazard_level(new_confirmed):
    if new_confirmed > 10000:
        return "high"
    if new_confirmed > 1000:
        return "medium"
    return "low"

grouped_df = grouped_df.withColumn("hazard_level", hazard_level(col("total_confirmed_monthly")))
grouped_df.show()

                                                                                

+----+-----+------------------+-----------------------+------------+
|year|month|      country_name|total_confirmed_monthly|hazard_level|
+----+-----+------------------+-----------------------+------------+
|2022|    2|           Finland|                 189379|        high|
|2021|    6|            Canada|                      9|         low|
|2021|   12|         Guatemala|                   9923|      medium|
|2021|   12|            Norway|                 126270|        high|
|2021|    5|Dominican Republic|                   2583|      medium|
|2021|    6|    Czech Republic|                   5446|      medium|
|2021|    6|         Lithuania|                   3643|      medium|
|2021|    8|       Philippines|                  60358|        high|
|2021|   11|            Greece|                 130374|        high|
|2021|    5|         Guatemala|                  27141|        high|
|2021|    6|          Malaysia|                 179622|        high|
|2021|    7|          Colombia|   

In [28]:
grouped_df.groupBy("hazard_level") \
    .agg(spark_count("*").alias("num_of_occurrences")) \
    .show()

+------------+------------------+
|hazard_level|num_of_occurrences|
+------------+------------------+
|         low|               202|
|        high|               653|
|      medium|               313|
+------------+------------------+



In [29]:
# map (czesc 5)
schema = df.schema
def to_sq_mi(x):
    row_list = list(x)
    row_list[-6] /= 2.59
    return tuple(row_list)
df = df.rdd.map(to_sq_mi).toDF(schema=schema)
df = df.withColumnRenamed("area_sq_km", "area_sq_mi")
df.select("country_name", "area_sq_mi").distinct().show()

[Stage 238:>                                                        (0 + 1) / 1]

+-------------------+------------------+
|       country_name|        area_sq_mi|
+-------------------+------------------+
|     Czech Republic|30451.737451737452|
|               Iraq|167973.35907335908|
|         Costa Rica| 19729.72972972973|
|        Netherlands|16038.610038610039|
|            Croatia| 21849.42084942085|
|             Israel| 8521.235521235521|
|             Kuwait|  6880.30888030888|
|Trinidad and Tobago|1980.6949806949808|
|              Malta|123.55212355212356|
|              Gabon|103347.49034749035|
|             Bhutan|14823.552123552125|
|             France|212002.31660231663|
|         Mozambique|303621.62162162166|
|            Denmark|16571.428571428572|
|             Serbia| 34115.83011583012|
|             Mexico|  758445.945945946|
|           Thailand|198115.83011583012|
|            Belarus| 80154.44015444016|
|          Guatemala| 42042.47104247104|
|          Australia|2988888.0308880312|
+-------------------+------------------+
only showing top

                                                                                