In [2]:
from pyspark.sql import SparkSession
import pyspark.sql.functions as F
from pyspark.sql.window import Window


spark = SparkSession \
    .builder \
    .getOrCreate()

df = spark.read.format("csv").option("header", "true").option("inferSchema", "true").load("owid-covid-data.csv")

# df.printSchema()


#ЗАДАНИЕ 1
#Выберите 15 стран с наибольшим процентом переболевших на 31 марта (
#в выходящем датасете необходимы колонки: iso_code, страна, процент переболевших)

# Предположение что количество людей, которые болели повторно, равно разнице между общим числом случаев заболевания и количеством полностью привитых людей

df_1_res = df.fillna({'people_fully_vaccinated': 0})\
.select('iso_code', 'location', F.round(((F.col('total_cases') - F.col('people_fully_vaccinated')) / F.col('population')) * 100, 2).alias('percentage_of_recovered')) \
.where((F.col('date') == '2021-03-31')).orderBy(F.col('percentage_of_recovered').desc()).limit(15)

df_1_res.show()

# +--------+---------------+-----------------------+
# |iso_code|       location|percentage_of_recovered|
# +--------+---------------+-----------------------+
# |     AND|        Andorra|                  15.54|
# |     MNE|     Montenegro|                   14.0|
# |     CZE|        Czechia|                    9.5|
# |     PAN|         Panama|                   8.23|
# |     SWE|         Sweden|                   7.97|
# |     NLD|    Netherlands|                   7.54|
# |     GEO|        Georgia|                   7.06|
# |     ARM|        Armenia|                    6.5|
# |     LUX|     Luxembourg|                   6.25|
# |     QAT|          Qatar|                   6.25|
# |     MKD|North Macedonia|                   6.24|
# |     JOR|         Jordan|                   5.99|
# |     MCO|         Monaco|                   5.83|
# |     MDA|        Moldova|                   5.71|
# |     LBN|        Lebanon|                   5.65|
# +--------+---------------+-----------------------+




+--------+---------------+-----------------------+
|iso_code|       location|percentage_of_recovered|
+--------+---------------+-----------------------+
|     AND|        Andorra|                  15.54|
|     MNE|     Montenegro|                   14.0|
|     CZE|        Czechia|                    9.5|
|     PAN|         Panama|                   8.23|
|     SWE|         Sweden|                   7.97|
|     NLD|    Netherlands|                   7.54|
|     GEO|        Georgia|                   7.06|
|     ARM|        Armenia|                    6.5|
|     LUX|     Luxembourg|                   6.25|
|     QAT|          Qatar|                   6.25|
|     MKD|North Macedonia|                   6.24|
|     JOR|         Jordan|                   5.99|
|     MCO|         Monaco|                   5.83|
|     MDA|        Moldova|                   5.71|
|     LBN|        Lebanon|                   5.65|
+--------+---------------+-----------------------+



                                                                                

In [22]:

#ЗАДАНИЕ 2
# Top 10 стран с максимальным зафиксированным кол-вом новых случаев за последнюю неделю марта 2021 в отсортированном порядке по убыванию
# (в выходящем датасете необходимы колонки: число, страна, кол-во новых случаев)

list_of_regions = [x.location for x in df.select('location').where('continent is null').distinct().collect()]

window_2 = Window.partitionBy('location')

df_with_max_cases = df.select('location', 'date', 'new_cases').where(\
                                                                     (F.col('date').between('2021-03-25', '2021-03-31'))\
                                                                     & (~F.col('location').isin(list_of_regions))\
                                                                    )\
                      .withColumn('max_new_cases', F.max('new_cases')\
                      .over(window_2))\
                      .orderBy(F.col('max_new_cases').desc())

df_2_res = df_with_max_cases.where(F.col('new_cases') == F.col('max_new_cases')).select('date', 'location', 'new_cases').limit(10)

df_2_res.show()


# +----------+-------------+---------+
# |      date|     location|new_cases|
# +----------+-------------+---------+
# |2021-03-25|       Brazil| 100158.0|
# |2021-03-26|United States|  77321.0|
# |2021-03-31|        India|  72330.0|
# |2021-03-31|       France|  59054.0|
# |2021-03-31|       Turkey|  39302.0|
# |2021-03-26|       Poland|  35145.0|
# |2021-03-31|      Germany|  25014.0|
# |2021-03-26|        Italy|  24076.0|
# |2021-03-25|         Peru|  19206.0|
# |2021-03-26|      Ukraine|  18226.0|
# +----------+-------------+---------+




+----------+-------------+---------+
|      date|     location|new_cases|
+----------+-------------+---------+
|2021-03-25|       Brazil| 100158.0|
|2021-03-26|United States|  77321.0|
|2021-03-31|        India|  72330.0|
|2021-03-31|       France|  59054.0|
|2021-03-31|       Turkey|  39302.0|
|2021-03-26|       Poland|  35145.0|
|2021-03-31|      Germany|  25014.0|
|2021-03-26|        Italy|  24076.0|
|2021-03-25|         Peru|  19206.0|
|2021-03-26|      Ukraine|  18226.0|
+----------+-------------+---------+



                                                                                

                                                                                

['International',
 'World',
 'Europe',
 'Africa',
 'North America',
 'European Union',
 'South America',
 'Oceania',
 'Asia']

In [146]:

#ЗАДАНИЕ 3
# Посчитайте изменение случаев относительно предыдущего дня в России за последнюю неделю марта 2021. 
# (например: в россии вчера было 9150 , сегодня 8763, итог: -387) 
# (в выходящем датасете необходимы колонки: число, кол-во новых случаев вчера, кол-во новых случаев сегодня, дельта)

df_3 = df.select('location', 'new_cases', 'date').where((F.col('date').between('2021-03-24', '2021-03-31')) & (F.col('location') == 'Russia'))

window_3 = Window.orderBy('date')

df_3_res = df_3.withColumn('new_cases_yesterday', F.lag('new_cases').over(window_3))\
.where(F.col('date') != '2021-03-24')\
.withColumn('delta', F.col('new_cases') - F.col('new_cases_yesterday'))\
.select('date', 'new_cases_yesterday', 'new_cases', 'delta')

df_3_res.show()


# +----------+-------------------+---------+------+
# |      date|new_cases_yesterday|new_cases| delta|
# +----------+-------------------+---------+------+
# |2021-03-25|             8769.0|   9128.0| 359.0|
# |2021-03-26|             9128.0|   9073.0| -55.0|
# |2021-03-27|             9073.0|   8783.0|-290.0|
# |2021-03-28|             8783.0|   8979.0| 196.0|
# |2021-03-29|             8979.0|   8589.0|-390.0|
# |2021-03-30|             8589.0|   8162.0|-427.0|
# |2021-03-31|             8162.0|   8156.0|  -6.0|
# +----------+-------------------+---------+------+


23/11/07 17:32:04 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.


+----------+-------------------+---------+------+
|      date|new_cases_yesterday|new_cases| delta|
+----------+-------------------+---------+------+
|2021-03-25|             8769.0|   9128.0| 359.0|
|2021-03-26|             9128.0|   9073.0| -55.0|
|2021-03-27|             9073.0|   8783.0|-290.0|
|2021-03-28|             8783.0|   8979.0| 196.0|
|2021-03-29|             8979.0|   8589.0|-390.0|
|2021-03-30|             8589.0|   8162.0|-427.0|
|2021-03-31|             8162.0|   8156.0|  -6.0|
+----------+-------------------+---------+------+



23/11/07 19:59:56 WARN HeartbeatReceiver: Removing executor driver with no recent heartbeats: 1400326 ms exceeds timeout 120000 ms
23/11/07 19:59:56 WARN SparkContext: Killing executors is not supported by current scheduler.
