In [1]:
from pyspark.sql import SparkSession, Row
from pyspark.sql.window import Window
from pyspark.sql.functions import (
    min, max, sum, sum_distinct, avg, struct,
    count, countDistinct,
    stddev, variance, skewness, kurtosis,
    collect_list, collect_set,
    first, last,
    corr, year, month, desc, asc, col, row_number, concat_ws,
    udf, concat, lit, date_format, when
)
from pyspark.sql.types import StructType, StructField, StringType, DoubleType, IntegerType, DateType, StructType, StructField, BooleanType
from datetime import datetime

spark = SparkSession.builder.appName("sesja").master("local[*]").config("spark.hadoop.fs.defaultFS", "file:///").getOrCreate()
print("Spark version:", spark.version)


Using Spark's default log4j profile: org/apache/spark/log4j2-defaults.properties
25/06/06 01:48:30 WARN Utils: Your hostname, DESKTOP-332M713, resolves to a loopback address: 127.0.1.1; using 10.255.255.254 instead (on interface lo)
25/06/06 01:48:30 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Using Spark's default log4j profile: org/apache/spark/log4j2-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/06/06 01:48:31 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


Spark version: 4.0.0


In [2]:
df = spark.read.option("header", "true").option("inferSchema", "true").csv("data/combined.csv")

df.printSchema()


root
 |-- location_key: string (nullable = true)
 |-- date: date (nullable = true)
 |-- iso_3166_1_alpha_3: string (nullable = true)
 |-- country_name: string (nullable = true)
 |-- new_confirmed: integer (nullable = true)
 |-- cumulative_confirmed: double (nullable = true)
 |-- new_tested: integer (nullable = true)
 |-- cumulative_tested: double (nullable = true)
 |-- new_recovered: integer (nullable = true)
 |-- cumulative_recovered: double (nullable = true)
 |-- new_deceased: integer (nullable = true)
 |-- cumulative_deceased: double (nullable = true)
 |-- new_persons_vaccinated: integer (nullable = true)
 |-- cumulative_persons_vaccinated: double (nullable = true)
 |-- new_persons_fully_vaccinated: integer (nullable = true)
 |-- cumulative_persons_fully_vaccinated: double (nullable = true)
 |-- new_vaccine_doses_administered: integer (nullable = true)
 |-- cumulative_vaccine_doses_administered: double (nullable = true)
 |-- smoking_prevalence: double (nullable = true)
 |-- diabetes

                                                                                

In [3]:
df.select(
    col("location_key"),
    col("date"),
    col("country_name"),
    col("new_confirmed"),
    col("new_deceased"),
    col("new_tested"),
    col("Population"),
    col("hospital_beds_per_1000"),
    col("population_age_80_and_older"),
    col("GDP")
).show(5, truncate=False)


+------------+----------+------------+-------------+------------+----------+----------+----------------------+---------------------------+------------------+
|location_key|date      |country_name|new_confirmed|new_deceased|new_tested|Population|hospital_beds_per_1000|population_age_80_and_older|GDP               |
+------------+----------+------------+-------------+------------+----------+----------+----------------------+---------------------------+------------------+
|AD          |2020-01-01|Andorra     |0            |0           |0         |77700.0   |2.5                   |4881                       |2.85851769911504E9|
|AD          |2020-01-02|Andorra     |0            |0           |180       |77700.0   |2.5                   |4881                       |2.85851769911504E9|
|AD          |2020-01-03|Andorra     |0            |0           |180       |77700.0   |2.5                   |4881                       |2.85851769911504E9|
|AD          |2020-01-04|Andorra     |0            |

In [4]:
df = df.withColumn(
    "population_density",
    when(col("Area (km²)") != 0,
         col("Population") / col("Area (km²)")
    ).otherwise(lit(0))
)

df = df.withColumn(
    "gdp_per_capita",
    when(col("Population") != 0,
        col("GDP") / col("Population")
    ).otherwise(lit(0))
)

df = df.withColumn(
    "per_capita_doses_vaccinated",
    when(col("Population") != 0,
         col("cumulative_vaccine_doses_administered") / col("Population")
    ).otherwise(lit(0))
)

df.select("country_name", "Population", "Area (km²)", "population_density", "GDP", "gdp_per_capita", "per_capita_doses_vaccinated").show(10, truncate=False)


+------------+----------+----------+------------------+------------------+-----------------+---------------------------+
|country_name|Population|Area (km²)|population_density|GDP               |gdp_per_capita   |per_capita_doses_vaccinated|
+------------+----------+----------+------------------+------------------+-----------------+---------------------------+
|Andorra     |77700.0   |468.0     |166.02564102564102|2.85851769911504E9|36789.15957677014|0.0                        |
|Andorra     |77700.0   |468.0     |166.02564102564102|2.85851769911504E9|36789.15957677014|1.287001287001287E-5       |
|Andorra     |77700.0   |468.0     |166.02564102564102|2.85851769911504E9|36789.15957677014|2.574002574002574E-5       |
|Andorra     |77700.0   |468.0     |166.02564102564102|2.85851769911504E9|36789.15957677014|3.861003861003861E-5       |
|Andorra     |77700.0   |468.0     |166.02564102564102|2.85851769911504E9|36789.15957677014|5.148005148005148E-5       |
|Andorra     |77700.0   |468.0  

In [5]:
df = df.withColumn(
    "country_iso",
    concat_ws(" - ", col("iso_3166_1_alpha_3"), col("country_name"))
)

df = df.withColumn(
    "new_cases_desc",
    concat(
        date_format(col("date"), "yyyy-MM-dd"),
        lit(": "),
        col("new_confirmed"),
        lit(" new cases")
    )
)

df.select("country_iso", "new_cases_desc").show(5, truncate=False)


+-------------+-----------------------+
|country_iso  |new_cases_desc         |
+-------------+-----------------------+
|AND - Andorra|2020-01-01: 0 new cases|
|AND - Andorra|2020-01-02: 0 new cases|
|AND - Andorra|2020-01-03: 0 new cases|
|AND - Andorra|2020-01-04: 0 new cases|
|AND - Andorra|2020-01-05: 0 new cases|
+-------------+-----------------------+
only showing top 5 rows


# 2.6 Filtering data

In [6]:
# 1) Kraje o hospital_beds_per_1000 > 8
filter1 = df.filter(col("hospital_beds_per_1000") > 8)

# 2) Wiersze, gdzie cumulative_confirmed > 1_000_000
filter2 = df.filter(col("cumulative_confirmed") > 1_000_000)

# 3) Wiersze, gdzie new_confirmed >= 30_000
filter3 = df.filter(col("new_confirmed") >= 30_000)

# 4) Kraje o population_density > 500
filter4 = df.filter(col("population_density") > 500)

# 5) population_age_80_and_older > 1_000_000
filter5 = df.filter(col("population_age_80_and_older") > 1_000_000)

# 6) Dni, w których new_deceased == 0
filter6 = df.filter(col("new_deceased") == 0)

# 7) Kraj, gdzie health_expenditure_usd < 1000
filter7 = df.filter(col("health_expenditure_usd") < 1000)

# 8) nurses_per_1000 < 5
filter8 = df.filter(col("nurses_per_1000") < 5)

# 9) stringency_index > 70
filter9 = df.filter(col("stringency_index") > 70)

# 10) new_tested < 1000
filter10 = df.filter(col("new_tested") < 1000)

# 11) country_name = Poland
filter11 = df.filter(col("country_name") == "Poland")

print("Przykład 1: hospital_beds_per_1000 > 8")
filter1.select("country_name", "hospital_beds_per_1000").show(10)

print("Przykład 2: cumulative_confirmed > 1_000_000")
filter2.select("country_name", "cumulative_confirmed").show(10)

print("Przykład 3: new_confirmed >= 30_000")
filter3.select("country_name", "date", "new_confirmed").show(10)

print("Przykład 4: population_density > 500")
filter4.select("country_name", "population_density").show(10)

print("Przykład 5: population_age_80_and_older > 1_000_000")
filter5.select("country_name", "population_age_80_and_older").show(10)

print("Przykład 6: new_deceased == 0")
filter6.select("country_name", "date", "new_deceased").show(10)

print("Przykład 7: health_expenditure_usd < 1000")
filter7.select("country_name", "health_expenditure_usd").show(10)

print("Przykład 8: nurses_per_1000 < 5")
filter8.select("country_name", "nurses_per_1000").show(10)

print("Przykład 9: stringency_index > 70")
filter9.select("country_name", "date", "stringency_index").show(10)

print("Przykład 10: new_tested < 1000")
filter10.select("country_name", "date", "new_tested").show(10)

print("Przykład 11: country_name == Poland")
filter11.select("country_name", "date", "new_confirmed").show(10)


Przykład 1: hospital_beds_per_1000 > 8
+------------+----------------------+
|country_name|hospital_beds_per_1000|
+------------+----------------------+
|     Belarus|                  9.69|
|     Belarus|                  9.69|
|     Belarus|                  9.69|
|     Belarus|                  9.69|
|     Belarus|                  9.69|
|     Belarus|                  9.69|
|     Belarus|                  9.69|
|     Belarus|                  9.69|
|     Belarus|                  9.69|
|     Belarus|                  9.69|
+------------+----------------------+
only showing top 10 rows
Przykład 2: cumulative_confirmed > 1_000_000
+--------------------+--------------------+
|        country_name|cumulative_confirmed|
+--------------------+--------------------+
|United Arab Emirates|           1000556.0|
|United Arab Emirates|           1001445.0|
|United Arab Emirates|           1002306.0|
|United Arab Emirates|           1003129.0|
|United Arab Emirates|           1003929.0|
|United

In [7]:
window_cases = Window.partitionBy("country_name").orderBy(col("new_confirmed").desc())
window_deaths = Window.partitionBy("country_name").orderBy(col("new_deceased").desc())

df_cases_ranked = df.withColumn(
    "rn_cases",
    row_number().over(window_cases)
)

df_max_cases_per_country = df_cases_ranked \
    .filter(col("rn_cases") == 1) \
    .select("country_name", "date", "new_confirmed") \
    .orderBy("country_name")

df_max_cases_per_country.show(truncate=False)

df_deaths_ranked = df.withColumn(
    "rn_deaths",
    row_number().over(window_deaths)
)

df_max_deaths_per_country = df_deaths_ranked \
    .filter(col("rn_deaths") == 1) \
    .select("country_name", "date", "new_deceased") \
    .orderBy("country_name")

df_max_deaths_per_country.show(truncate=False)


+-------------------+----------+-------------+
|country_name       |date      |new_confirmed|
+-------------------+----------+-------------+
|Afghanistan        |2021-06-15|3243         |
|Albania            |2022-01-17|2832         |
|Algeria            |2022-01-24|2521         |
|American Samoa     |2022-03-24|955          |
|Andorra            |2022-01-24|1676         |
|Angola             |2021-12-31|5611         |
|Anguilla           |2022-05-09|196          |
|Antarctica         |2020-01-01|0            |
|Antigua and Barbuda|2022-02-09|468          |
|Argentina          |2022-01-10|174174       |
|Armenia            |2022-02-01|4388         |
|Aruba              |2022-01-09|2138         |
|Australia          |2022-01-13|150702       |
|Austria            |2022-03-15|63512        |
|Azerbaijan         |2022-02-08|7779         |
|Bahamas            |2022-01-08|1846         |
|Bahrain            |2022-01-31|8173         |
|Bangladesh         |2021-07-26|16230        |
|Barbados    

# 2.7 Aggregations

In [8]:
agg_per_country = df.groupBy("country_name").agg(
    max("new_deceased").alias("max_daily_deceases"),
    max("new_confirmed").alias("max_daily_cases"),
    sum("investment_in_vaccines").alias("sum_investment_in_vaccines"),
    sum("emergency_investment_in_healthcare").alias("sum_emergency_investment_in_healthcare"),
    count("*").alias("record_count"),
    countDistinct("date").alias("distinct_date_days"),
    stddev("new_confirmed").alias("stddev_daily_cases"),
    variance("new_confirmed").alias("var_daily_cases"),
    skewness("new_confirmed").alias("skew_daily_cases"),
    kurtosis("new_confirmed").alias("kurt_daily_cases"),
     sum(
        when(col("new_confirmed") > 0, 1).otherwise(0)
    ).alias("days_with_new_cases"),
)

agg_per_country.select(
    "country_name",
    "max_daily_deceases",
    "max_daily_cases",
    "sum_investment_in_vaccines",
    "sum_emergency_investment_in_healthcare",
    "record_count",
    "distinct_date_days",
    "stddev_daily_cases",
    "var_daily_cases",
    "skew_daily_cases",
    "kurt_daily_cases",
    "days_with_new_cases",
).show(10, truncate=False)




+---------------------------------+------------------+---------------+--------------------------+--------------------------------------+------------+------------------+------------------+--------------------+------------------+------------------+-------------------+
|country_name                     |max_daily_deceases|max_daily_cases|sum_investment_in_vaccines|sum_emergency_investment_in_healthcare|record_count|distinct_date_days|stddev_daily_cases|var_daily_cases     |skew_daily_cases  |kurt_daily_cases  |days_with_new_cases|
+---------------------------------+------------------+---------------+--------------------------+--------------------------------------+------------+------------------+------------------+--------------------+------------------+------------------+-------------------+
|Chad                             |12                |596            |0                         |75306751                              |991         |991               |31.479925569303404|990.98571384

                                                                                

In [9]:
df_country_max_vacc = df.groupBy("Continent", "country_name").agg(
    max("per_capita_doses_vaccinated").alias("max_per_capita_doses_vaccinated"),
    max("Population"),
    max("cumulative_persons_fully_vaccinated")
)

df_country_max_vacc.orderBy(col("max_per_capita_doses_vaccinated").desc()).show(truncate=False)

avg_of_max_per_continent = df_country_max_vacc.groupBy("Continent").agg(
    avg("max_per_capita_doses_vaccinated").alias("avg_of_per_capita_doses_vaccinated_per_country")
)

avg_of_max_per_continent.show(truncate=False)

+-------------+--------------+-------------------------------+---------------+----------------------------------------+
|Continent    |country_name  |max_per_capita_doses_vaccinated|max(Population)|max(cumulative_persons_fully_vaccinated)|
+-------------+--------------+-------------------------------+---------------+----------------------------------------+
|Asia         |Brunei        |5.073623280074476              |449002.0       |758782.0                                |
|Europe       |Portugal      |4.848028379303983              |1.0298192E7    |1.782873E7                              |
|Europe       |Guernsey      |4.625756307167343              |63301.0        |102135.0                                |
|Oceania      |Samoa         |4.614069484040974              |222382.0       |421685.0                                |
|North America|Costa Rica    |4.518187533307893              |5180829.0      |8383317.0                               |
|Europe       |Gibraltar     |4.50393580

In [10]:
beds_more_than_zero =  df.filter(col("hospital_beds_per_1000") > 0)
agg_per_continent = beds_more_than_zero.groupBy("Continent").agg(
    min(struct(col("hospital_beds_per_1000"), col("country_name"))).alias("min_hospital_beds_per_1000"),
    max(struct(col("hospital_beds_per_1000"), col("country_name"))).alias("max_hospital_beds_per_1000"),
)

result = agg_per_continent.select(
    col("Continent"),
    col("min_hospital_beds_per_1000.country_name").alias("country_with_min_beds"),
    col("min_hospital_beds_per_1000.hospital_beds_per_1000").alias("min_hospital_beds_per_1000"),
    col("max_hospital_beds_per_1000.country_name").alias("country_with_max_beds"),
    col("max_hospital_beds_per_1000.hospital_beds_per_1000").alias("max_hospital_beds_per_1000")
)

result.show(truncate=False)


+-------------+---------------------+--------------------------+----------------------------+--------------------------+
|Continent    |country_with_min_beds|min_hospital_beds_per_1000|country_with_max_beds       |max_hospital_beds_per_1000|
+-------------+---------------------+--------------------------+----------------------------+--------------------------+
|Africa       |Burkina Faso         |0.2                       |Mauritius                   |3.71                      |
|Asia         |Nepal                |0.28                      |North Korea                 |13.2                      |
|Europe       |Isle of Man          |1.584279384               |Monaco                      |22.02                     |
|North America|Guatemala            |0.43                      |United States Virgin Islands|18.68000031               |
|Oceania      |Papua New Guinea     |0.17                      |Nauru                       |4.88                      |
|South America|Venezuela        

In [11]:
beds_more_than_zero = beds_more_than_zero.groupBy("Continent", "country_name").agg(
    avg("hospital_beds_per_1000").alias("hospital_beds_per_1000")
)

window_low = Window.partitionBy("Continent").orderBy(asc("hospital_beds_per_1000"))

df_with_rank_low = beds_more_than_zero.withColumn(
    "rank_low",
    row_number().over(window_low)
)

top5_low_beds = df_with_rank_low.filter(col("rank_low") <= 5) \
    .select(
        col("Continent"),
        col("country_name"),
        col("hospital_beds_per_1000"),
        col("rank_low")
    ).orderBy("Continent", "rank_low")

print("Top 5 krajów o NAJNIŻSZEJ liczbie łóżek na 1000 mieszkańców, per kontynent")
top5_low_beds.show(30, truncate=False)

window_high = Window.partitionBy("Continent").orderBy(desc("hospital_beds_per_1000"))

df_with_rank_high = beds_more_than_zero.withColumn(
    "rank_high",
    row_number().over(window_high)
)

top5_high_beds = df_with_rank_high.filter(col("rank_high") <= 5) \
    .select(
        col("Continent"),
        col("country_name"),
        col("hospital_beds_per_1000"),
        col("rank_high")
    ).orderBy("Continent", "rank_high")

print("Top 5 krajów o NAJWYŻSZEJ liczbie łóżek na 1000 mieszkańców, per kontynent")
top5_high_beds.show(30, truncate=False)


Top 5 krajów o NAJNIŻSZEJ liczbie łóżek na 1000 mieszkańców, per kontynent
+-------------+----------------------+----------------------+--------+
|Continent    |country_name          |hospital_beds_per_1000|rank_low|
+-------------+----------------------+----------------------+--------+
|Africa       |Burkina Faso          |0.19999999999999726   |1       |
|Africa       |Mali                  |0.25                  |2       |
|Africa       |Guinea                |0.3000000000000056    |3       |
|Africa       |Madagascar            |0.3199999999999946    |4       |
|Africa       |Ivory Coast           |0.3999999999999945    |5       |
|Asia         |Nepal                 |0.3493743693239079    |1       |
|Asia         |Pakistan              |0.5099999999999938    |2       |
|Asia         |Yemen                 |0.7100000000000033    |3       |
|Asia         |Philippines           |0.9710797174571298    |4       |
|Asia         |Myanmar               |1.0599999999999732    |5       |
|E

In [12]:
df_positive = df.filter(col("new_confirmed") > 0)

first_last_positive = df_positive.groupBy("country_name").agg(
    min("date").alias("first_positive_day"),
    max("date").alias("last_positive_day")
)

first_last_positive.show(truncate=False)


+----------------------+------------------+-----------------+
|country_name          |first_positive_day|last_positive_day|
+----------------------+------------------+-----------------+
|Anguilla              |2020-03-25        |2022-09-13       |
|Afghanistan           |2020-02-22        |2022-09-13       |
|Argentina             |2020-01-01        |2022-09-09       |
|Angola                |2020-01-02        |2022-09-04       |
|Albania               |2020-03-07        |2022-09-13       |
|American Samoa        |2021-09-17        |2022-09-04       |
|Andorra               |2020-03-01        |2022-09-13       |
|Barbados              |2020-03-16        |2022-09-13       |
|Aruba                 |2020-03-15        |2022-09-05       |
|Azerbaijan            |2020-02-27        |2022-09-13       |
|Armenia               |2020-02-29        |2022-09-10       |
|United Arab Emirates  |2020-01-27        |2022-09-13       |
|Antigua and Barbuda   |2020-03-13        |2022-09-13       |
|Bosnia 

# 2.8 Date functions

In [13]:
df_months = df.withColumn("year", year(col("date"))).withColumn("month", month(col("date")))

df_monthly_avg = df_months.groupBy("country_name", "year", "month").agg(avg("new_confirmed").alias("avg_new_confirmed"))

window_high = Window.partitionBy("country_name").orderBy(desc("avg_new_confirmed"))

window_low = Window.partitionBy("country_name").orderBy(asc("avg_new_confirmed"))

df_ranked = df_monthly_avg.withColumn("rank_high", row_number().over(window_high)).withColumn("rank_low", row_number().over(window_low))

top5_high = df_ranked.filter(col("rank_high") <= 5).select("country_name", "year", "month", "avg_new_confirmed", "rank_high").orderBy("country_name", "rank_high")

top5_low = df_ranked.filter(col("rank_low") <= 5).select("country_name", "year", "month", "avg_new_confirmed", "rank_low").orderBy("country_name", "rank_low")

print("Top 5 miesięcy o NAJWYŻSZEJ średniej zakażeń na kraj")
top5_high.show(100, truncate=False)

print("Top 5 miesięcy o NAJNIŻSZEJ średniej zakażeń na kraj")
top5_low.show(100, truncate=False)


Top 5 miesięcy o NAJWYŻSZEJ średniej zakażeń na kraj
+-------------------+----+-----+------------------+---------+
|country_name       |year|month|avg_new_confirmed |rank_high|
+-------------------+----+-----+------------------+---------+
|Afghanistan        |2021|6    |1604.3333333333333|1        |
|Afghanistan        |2021|7    |833.1935483870968 |2        |
|Afghanistan        |2020|6    |517.6666666666666 |3        |
|Afghanistan        |2020|5    |455.1290322580645 |4        |
|Afghanistan        |2021|5    |448.51612903225805|5        |
|Albania            |2022|1    |1592.5806451612902|1        |
|Albania            |2021|2    |1033.5357142857142|2        |
|Albania            |2022|7    |1007.9354838709677|3        |
|Albania            |2021|9    |780.3             |4        |
|Albania            |2021|1    |653.8387096774194 |5        |
|Algeria            |2022|1    |1119.4193548387098|1        |
|Algeria            |2021|7    |1048.032258064516 |2        |
|Algeria         

In [14]:
df_time = df.withColumn("month", month(col("date"))).withColumn("year", year(col("date")))

pivot_per_month = df_time.groupBy("year", "month").pivot("Continent").sum("new_confirmed").orderBy("year", "month")

pivot_per_month.show(50, truncate=False)


+----+-----+-----+-------+--------+--------+-------------+-------+-------------+
|year|month|0    |Africa |Asia    |Europe  |North America|Oceania|South America|
+----+-----+-----+-------+--------+--------+-------------+-------+-------------+
|2020|1    |0    |63     |14568   |32      |11           |9      |9572         |
|2020|2    |0    |16     |71323   |1835    |81           |18     |2867         |
|2020|3    |131  |6891   |106016  |511277  |166040       |5423   |15281        |
|2020|4    |682  |35107  |344795  |831867  |804401       |3095   |174257       |
|2020|5    |273  |114166 |636941  |594561  |837453       |531    |714567       |
|2020|6    |1779 |270445 |1210617 |449807  |1088541      |800    |1330961      |
|2020|7    |5466 |524656 |2058977 |469269  |2305439      |9653   |1925904      |
|2020|8    |5137 |315930 |3036239 |787173  |1793679      |11731  |2209972      |
|2020|9    |2182 |234342 |3773132 |1514273 |1496375      |3997   |1790423      |
|2020|10   |4245 |306815 |32

# 2.9 joiny

In [15]:
filter1_small = filter1.select("country_name", "hospital_beds_per_1000")

agg_small = agg_per_country.select("country_name", "max_daily_cases", "max_daily_deceases")

inner_join_df = filter1_small.join(
    agg_small,
    on="country_name",
    how="inner"
)

print("INNER JOIN - Wpływ ilości łóżek szpitalnych na 1000 os. na śmiertelność")
inner_join_df.orderBy(col("hospital_beds_per_1000").asc()).show(10, truncate=False)


INNER JOIN - Wpływ ilości łóżek szpitalnych na 1000 os. na śmiertelność
+------------+----------------------+---------------+------------------+
|country_name|hospital_beds_per_1000|max_daily_cases|max_daily_deceases|
+------------+----------------------+---------------+------------------+
|Mongolia    |8.22                  |24556          |19                |
|Mongolia    |8.22                  |24556          |19                |
|Mongolia    |8.22                  |24556          |19                |
|Mongolia    |8.22                  |24556          |19                |
|Mongolia    |8.22                  |24556          |19                |
|Mongolia    |8.22                  |24556          |19                |
|Mongolia    |8.22                  |24556          |19                |
|Mongolia    |8.22                  |24556          |19                |
|Mongolia    |8.22                  |24556          |19                |
|Mongolia    |8.22                  |24556          

In [16]:
pop_vacc_by_country = df_country_max_vacc.select("country_name", "max_per_capita_doses_vaccinated")
agg_small = agg_per_country.select("country_name", "sum_investment_in_vaccines")

left_join_df = pop_vacc_by_country.join(
    agg_small,
    on="country_name",
    how="left"
)

print("LEFT JOIN - Sumaryczne wydatki na szczepienia według państwa")
left_join_df.sort(col("max_per_capita_doses_vaccinated").desc()).show(10, truncate=False)
 

LEFT JOIN - Sumaryczne wydatki na szczepienia według państwa
+------------+-------------------------------+--------------------------+
|country_name|max_per_capita_doses_vaccinated|sum_investment_in_vaccines|
+------------+-------------------------------+--------------------------+
|Brunei      |5.073623280074476              |0                         |
|Portugal    |4.848028379303983              |242461000                 |
|Guernsey    |4.625756307167343              |0                         |
|Samoa       |4.614069484040974              |0                         |
|Costa Rica  |4.518187533307893              |40000000                  |
|Gibraltar   |4.503935802015375              |0                         |
|Cook Islands|4.4458291693610015             |0                         |
|Finland     |4.3861726175812095             |5543828                   |
|Sweden      |4.354826038047663              |9900000                   |
|Niue        |4.280248190279214              |0    

In [17]:
filter2_small = filter2.select("country_name", "cumulative_confirmed", "date")
filter4_small = filter4.select("country_name", "population_density", "date")

right_join_df = filter2_small.join(
    filter4_small,
    on=["country_name", "date"],
    how="right"
)

print("RIGHT JOIN - Wpływ gęstości zaludnienia na przekroczenie bariery miliona zachorowań")
right_join_df.sort(["date", "country_name"], ascending=True).show(10, truncate=False)


RIGHT JOIN - Wpływ gęstości zaludnienia na przekroczenie bariery miliona zachorowań
+------------+----------+--------------------+------------------+
|country_name|date      |cumulative_confirmed|population_density|
+------------+----------+--------------------+------------------+
|Aruba       |2020-01-01|NULL                |592.1388888888889 |
|Bahrain     |2020-01-01|NULL                |1931.332026143791 |
|Bangladesh  |2020-01-01|NULL                |1134.5188791759842|
|Barbados    |2020-01-01|NULL                |652.7744186046511 |
|Bermuda     |2020-01-01|NULL                |1185.7592592592594|
|Gibraltar   |2020-01-01|NULL                |5451.5            |
|Guernsey    |2020-01-01|NULL                |805.0512820512821 |
|Hong Kong   |2020-01-01|NULL                |6794.346014492754 |
|Jersey      |2020-01-01|NULL                |933.7844827586207 |
|Lebanon     |2020-01-01|NULL                |541.802812858783  |
+------------+----------+--------------------+------------

In [18]:
filter3_small = filter3.select("country_name", "date")
filter9_small = filter9.select("country_name", "stringency_index", "date", "new_confirmed")

outer_join_df = filter3_small.join(
    filter9_small,
    on=["country_name", "date"],
    how="outer"
)

print("OUTER JOIN - Wpływ restrykcji na ilość nowych zachorowań")
outer_join_df.sort(["new_confirmed"], ascending=False).show(50, truncate=False)


OUTER JOIN - Wpływ restrykcji na ilość nowych zachorowań
+------------------------+----------+----------------+-------------+
|country_name            |date      |stringency_index|new_confirmed|
+------------------------+----------+----------------+-------------+
|France                  |2022-01-25|72.22           |501635       |
|China                   |2022-05-19|79.17           |436310       |
|France                  |2022-01-26|72.22           |428008       |
|France                  |2022-02-01|72.22           |416896       |
|India                   |2021-05-05|73.61           |414188       |
|India                   |2021-05-04|73.61           |412262       |
|India                   |2021-05-07|73.61           |403738       |
|India                   |2021-04-29|73.61           |401993       |
|India                   |2021-05-06|73.61           |401078       |
|India                   |2021-04-30|73.61           |392488       |
|France                  |2022-01-27|72.22    

In [19]:
filter9_small = filter9.select("country_name", "stringency_index", "date")
filter11_small = filter11.select("country_name", "date", "new_confirmed")

left_semi_join_df = filter11_small.join(
    filter9_small,
    on=["country_name", "date"],
    how="left_semi"
)

print("LEFT SEMI JOIN - Wpływ restrykcji w Polsce na poziom zachorowań")
left_semi_join_df.sort(["date"], ascending=True).show(50, truncate=False)


LEFT SEMI JOIN - Wpływ restrykcji w Polsce na poziom zachorowań
+------------+----------+-------------+
|country_name|date      |new_confirmed|
+------------+----------+-------------+
|Poland      |2020-03-31|243          |
|Poland      |2020-04-01|392          |
|Poland      |2020-04-02|437          |
|Poland      |2020-04-03|244          |
|Poland      |2020-04-04|475          |
|Poland      |2020-04-05|311          |
|Poland      |2020-04-06|435          |
|Poland      |2020-04-07|357          |
|Poland      |2020-04-08|370          |
|Poland      |2020-04-09|380          |
|Poland      |2020-04-10|401          |
|Poland      |2020-04-11|318          |
|Poland      |2020-04-12|260          |
|Poland      |2020-04-13|268          |
|Poland      |2020-04-14|380          |
|Poland      |2020-04-15|336          |
|Poland      |2020-04-16|461          |
|Poland      |2020-04-17|363          |
|Poland      |2020-04-18|545          |
|Poland      |2020-04-19|306          |
|Poland      |20

In [20]:
filter1_small = filter1.select("country_name", "date", "hospital_beds_per_1000")
filter7_small = filter7.select("country_name", "date", "health_expenditure_usd", "new_confirmed", "new_deceased")

left_anti_join_df = filter7_small.join(
    filter1_small,
    on=["country_name", "date"],
    how="left_anti"
)

print("LEFT ANTI JOIN - Wpływ małej pojemności szpitali i niskich wydatków, na ilość zachorowań i śmierci")
left_anti_join_df.sort(["country_name", "date"], ascending=True).show(50, truncate=False)


LEFT ANTI JOIN - Wpływ małej pojemności szpitali i niskich wydatków, na ilość zachorowań i śmierci
+------------+----------+----------------------+-------------+------------+
|country_name|date      |health_expenditure_usd|new_confirmed|new_deceased|
+------------+----------+----------------------+-------------+------------+
|Afghanistan |2020-01-01|0.0                   |0            |0           |
|Afghanistan |2020-01-02|0.0                   |0            |0           |
|Afghanistan |2020-01-03|0.0                   |0            |0           |
|Afghanistan |2020-01-04|0.0                   |0            |0           |
|Afghanistan |2020-01-05|0.0                   |0            |0           |
|Afghanistan |2020-01-06|0.0                   |0            |0           |
|Afghanistan |2020-01-07|0.0                   |0            |0           |
|Afghanistan |2020-01-08|0.0                   |0            |0           |
|Afghanistan |2020-01-09|0.0                   |0            |0  

# 3 udf

In [21]:
def classify_threat(cases):
    if cases is None:
        return "unknown"
    try:
        c = float(cases)
    except:
        return "unknown"
    if c < 1000:
        return "low"
    elif c <= 10000:
        return "medium"
    else:
        return "high"

threat_udf = udf(classify_threat, StringType())

df_with_threat = df.withColumn(
    "threat_level",
    threat_udf(col("new_confirmed"))
)

df_with_threat.select("country_name", "new_confirmed", "threat_level").show(20, truncate=False)

threat_counts = df_with_threat.groupBy("threat_level").count()
threat_counts.show(truncate=False)


+------------+-------------+------------+
|country_name|new_confirmed|threat_level|
+------------+-------------+------------+
|Andorra     |0            |low         |
|Andorra     |0            |low         |
|Andorra     |0            |low         |
|Andorra     |0            |low         |
|Andorra     |0            |low         |
|Andorra     |0            |low         |
|Andorra     |0            |low         |
|Andorra     |0            |low         |
|Andorra     |0            |low         |
|Andorra     |0            |low         |
|Andorra     |0            |low         |
|Andorra     |0            |low         |
|Andorra     |0            |low         |
|Andorra     |0            |low         |
|Andorra     |0            |low         |
|Andorra     |0            |low         |
|Andorra     |0            |low         |
|Andorra     |0            |low         |
|Andorra     |0            |low         |
|Andorra     |0            |low         |
+------------+-------------+------

In [22]:
def classify_threat_normalised(cases, population):
    if cases is None or population is None:
        return "unknown"
    try:
        c = float(cases)
        p = float(population)
    except:
        return "unknown"
    if p <= 0:
        return "unknown"
    
    per100k = (c / p) * 100000
    if per100k < 10:
        return "low"
    elif per100k <= 50:
        return "medium"
    else:
        return "high"

threat_population_udf = udf(classify_threat_normalised, StringType())

df_with_threat_population = df.withColumn(
    "threat_level_population",
    threat_population_udf(col("new_confirmed"), col("Population"))
)

df_with_threat_population.select(
    "country_name",
    "new_confirmed",
    "Population",
    "threat_level_population"
).show(20, truncate=False)

df_with_threat_population.groupBy("threat_level_population").count().show(truncate=False)


+------------+-------------+----------+-----------------------+
|country_name|new_confirmed|Population|threat_level_population|
+------------+-------------+----------+-----------------------+
|Andorra     |0            |77700.0   |low                    |
|Andorra     |0            |77700.0   |low                    |
|Andorra     |0            |77700.0   |low                    |
|Andorra     |0            |77700.0   |low                    |
|Andorra     |0            |77700.0   |low                    |
|Andorra     |0            |77700.0   |low                    |
|Andorra     |0            |77700.0   |low                    |
|Andorra     |0            |77700.0   |low                    |
|Andorra     |0            |77700.0   |low                    |
|Andorra     |0            |77700.0   |low                    |
|Andorra     |0            |77700.0   |low                    |
|Andorra     |0            |77700.0   |low                    |
|Andorra     |0            |77700.0   |l

In [23]:
counts_by_threat = df_with_threat.groupBy("country_name").pivot("threat_level", ["low", "medium", "high", "unknown"]).count().na.fill(0)

print("Dni per country według surowej liczby new_confirmed")
counts_by_threat.orderBy("country_name").select(
    "country_name",
    col("low").alias("count_low"),
    col("medium").alias("count_medium"),
    col("high").alias("count_high"),
    col("unknown").alias("count_unknown")
).orderBy("country_name").show(truncate=False)

counts_by_threat_population = df_with_threat_population.groupBy("country_name").pivot("threat_level_population", ["low", "medium", "high", "unknown"]).count().na.fill(0)

print("Dni per country według liczby na 100k mieszkańców")
counts_by_threat_population.select(
    "country_name",
    col("low").alias("count_low_pc"),
    col("medium").alias("count_medium_pc"),
    col("high").alias("count_high_pc"),
    col("unknown").alias("count_unknown_pc")
).orderBy("country_name").show(200, truncate=False)


Dni per country według surowej liczby new_confirmed
+-------------------+---------+------------+----------+-------------+
|country_name       |count_low|count_medium|count_high|count_unknown|
+-------------------+---------+------------+----------+-------------+
|Afghanistan        |944      |47          |0         |0            |
|Albania            |926      |65          |0         |0            |
|Algeria            |938      |53          |0         |0            |
|American Samoa     |991      |0           |0         |0            |
|Andorra            |988      |3           |0         |0            |
|Angola             |984      |7           |0         |0            |
|Anguilla           |991      |0           |0         |0            |
|Antarctica         |991      |0           |0         |0            |
|Antigua and Barbuda|991      |0           |0         |0            |
|Argentina          |297      |419         |275       |0            |
|Armenia            |850      |141    

# 4 RDD & Map

In [24]:
def map_row(row):
    cn = row["country_name"]
    dt = row["date"]
    nc = row["new_confirmed"]
    nd = row["new_deceased"]
    nt = row["new_tested"]
    pop = row["Population"]
    area = row["Area (km²)"]
    gdp_pc = row["gdp_per_capita"]
    pop_80_plus = row["population_age_80_and_older"]
    beds_p1000 = row["hospital_beds_per_1000"]
    
    cn_lower = cn.lower() if cn is not None else None
    
    date_str = dt.strftime("%d-%m-%Y") if dt is not None else None
    
    if dt is not None:
        wd_index = dt.weekday()
        weekday_str = ["Monday","Tuesday","Wednesday","Thursday","Friday","Saturday","Sunday"][wd_index]
        is_weekend = (wd_index >= 5)
    else:
        weekday_str = None
        is_weekend = None
    
    try:
        cases_per_100k = (float(nc) / float(pop)) * 100000 if pop and pop > 0 else None
    except:
        cases_per_100k = None
    
    try:
        mortality_rate = (float(nd) / float(nc)) if nc and nc > 0 else None
    except:
        mortality_rate = None
    
    try:
        test_positivity = (float(nc) / float(nt)) if nt and nt > 0 else None
    except:
        test_positivity = None
    
    try:
        pop_density = (float(pop) / float(area)) if area and area > 0 else None
    except:
        pop_density = None
    
    if pop_density is None:
        density_bucket = None
    elif pop_density < 100:
        density_bucket = "low"
    elif pop_density < 500:
        density_bucket = "medium"
    else:
        density_bucket = "high"

    try:
        pop_80_plus_percent = ((float(pop_80_plus) / float(pop)) * 100) if pop > 0 else None
    except:
        pop_80_plus_percent = None
    
    if gdp_pc is None:
        gdp_pc_level = None
    elif gdp_pc < 10000:
        gdp_pc_level = "low"
    elif gdp_pc < 35000:
        gdp_pc_level = "medium"
    else:
        gdp_pc_level = "high"
    
    summary = f"{cn} ({date_str}): new={nc}, deaths={nd}, tested={nt}"

    if nc is None or nd is None or nc == 0:
        cf_rate = None
    else:
        try:
            cf_rate = (float(nd) / float(nc)) * 100.0
        except:
            cf_rate = None
    
    if beds_p1000 is None or pop is None or pop <= 0:
        total_beds = None
    else:
        try:
            total_beds = (float(beds_p1000) / 1000.0) * float(pop)
        except:
            total_beds = None
    
    if total_beds is None or total_beds == 0 or nc is None:
        bed_util = None
    else:
        try:
            bed_util = float(nc) / total_beds
        except:
            bed_util = None
    
    return Row(
        country_name=cn,
        country_name_lower=cn_lower,
        date=dt,
        date_string=date_str,
        weekday=weekday_str,
        is_weekend=is_weekend,
        new_confirmed=nc,
        new_deceased=nd,
        new_tested=nt,
        Population=pop,
        Area_km2=area,
        cases_per_100k=cases_per_100k,
        mortality_rate=mortality_rate,
        test_positivity=test_positivity,
        pop_80_plus_percent=pop_80_plus_percent,
        density_bucket=density_bucket,
        gdp_pc_level=gdp_pc_level,
        summary=summary,
        cf_rate=cf_rate,
        total_beds=total_beds,
        bed_util=bed_util,
    )

rdd = df.rdd
mapped_rdd = rdd.map(map_row)

schema = StructType([
    StructField("country_name",          StringType(), True),
    StructField("country_name_lower",    StringType(), True),
    StructField("date",                  DateType(),   True),
    StructField("date_string",           StringType(), True),
    StructField("weekday",               StringType(), True),
    StructField("is_weekend",            BooleanType(),True),
    StructField("new_confirmed",         IntegerType(),True),
    StructField("new_deceased",          IntegerType(),True),
    StructField("new_tested",            IntegerType(),True),
    StructField("Population",            DoubleType(), True),
    StructField("Area_km2",              DoubleType(), True),
    StructField("cases_per_100k",        DoubleType(), True),
    StructField("mortality_rate",        DoubleType(), True),
    StructField("test_positivity",       DoubleType(), True),
    StructField("pop_80_plus_percent",   DoubleType(), True),
    StructField("density_bucket",        StringType(), True),
    StructField("gdp_pc_level",          StringType(), True),
    StructField("summary",               StringType(), True),
    StructField("case_fatality_rate",    DoubleType(), True),
    StructField("total_hospital_beds",   DoubleType(), True),
    StructField("bed_utilization_rate",  DoubleType(), True),
])

df_mapped = spark.createDataFrame(mapped_rdd, schema=schema)

df_mapped.printSchema()
df_mapped.show(truncate=False)


root
 |-- country_name: string (nullable = true)
 |-- country_name_lower: string (nullable = true)
 |-- date: date (nullable = true)
 |-- date_string: string (nullable = true)
 |-- weekday: string (nullable = true)
 |-- is_weekend: boolean (nullable = true)
 |-- new_confirmed: integer (nullable = true)
 |-- new_deceased: integer (nullable = true)
 |-- new_tested: integer (nullable = true)
 |-- Population: double (nullable = true)
 |-- Area_km2: double (nullable = true)
 |-- cases_per_100k: double (nullable = true)
 |-- mortality_rate: double (nullable = true)
 |-- test_positivity: double (nullable = true)
 |-- pop_80_plus_percent: double (nullable = true)
 |-- density_bucket: string (nullable = true)
 |-- gdp_pc_level: string (nullable = true)
 |-- summary: string (nullable = true)
 |-- case_fatality_rate: double (nullable = true)
 |-- total_hospital_beds: double (nullable = true)
 |-- bed_utilization_rate: double (nullable = true)

+------------+------------------+----------+---------

In [25]:
spark.stop()
