https://github.com/owid/covid-19-data/blob/master/public/data/README.md

In [1]:
!pip install pyspark
from pyspark.sql import SparkSession



In [2]:
spark=SparkSession.builder.appName('data_processing').getOrCreate()

In [3]:
import pyspark.sql.functions as F
from pyspark.sql.types import *

### Чтение данных из файла и их анализ                                                       

In [4]:
df=spark.read.csv("/content/drive/MyDrive/Netology/owid-covid-data.csv",header=True,inferSchema=True)

In [5]:
 # посмотрим схему 
 df.printSchema()

root
 |-- iso_code: string (nullable = true)
 |-- continent: string (nullable = true)
 |-- location: string (nullable = true)
 |-- date: string (nullable = true)
 |-- total_cases: double (nullable = true)
 |-- new_cases: double (nullable = true)
 |-- new_cases_smoothed: double (nullable = true)
 |-- total_deaths: double (nullable = true)
 |-- new_deaths: double (nullable = true)
 |-- new_deaths_smoothed: double (nullable = true)
 |-- total_cases_per_million: double (nullable = true)
 |-- new_cases_per_million: double (nullable = true)
 |-- new_cases_smoothed_per_million: double (nullable = true)
 |-- total_deaths_per_million: double (nullable = true)
 |-- new_deaths_per_million: double (nullable = true)
 |-- new_deaths_smoothed_per_million: double (nullable = true)
 |-- reproduction_rate: double (nullable = true)
 |-- icu_patients: double (nullable = true)
 |-- icu_patients_per_million: double (nullable = true)
 |-- hosp_patients: double (nullable = true)
 |-- hosp_patients_per_million

In [6]:
df.show(15)

+--------+---------+-----------+----------+-----------+---------+------------------+------------+----------+-------------------+-----------------------+---------------------+------------------------------+------------------------+----------------------+-------------------------------+-----------------+------------+------------------------+-------------+-------------------------+---------------------+---------------------------------+----------------------+----------------------------------+---------+-----------+------------------------+----------------------+------------------+-------------------------------+-------------+--------------+-----------+------------------+-----------------+-----------------------+----------------+-------------------------+------------------------------+-----------------------------+-----------------------------------+-------------------------------------+----------------+-----------+------------------+----------+-------------+-------------+--------------+--

### Очистка данных

In [7]:
# Удаляем все строки с OWID, так как это, по всей видимости, агрегированные данные по регионам
df_clean = df.filter(~df['iso_code'].startswith('OWID'))

# изменяем null на 0
df_clean = df_clean.fillna({'total_cases': 0, 'new_cases': 0})
df_clean.show(5)

+--------+---------+-----------+----------+-----------+---------+------------------+------------+----------+-------------------+-----------------------+---------------------+------------------------------+------------------------+----------------------+-------------------------------+-----------------+------------+------------------------+-------------+-------------------------+---------------------+---------------------------------+----------------------+----------------------------------+---------+-----------+------------------------+----------------------+------------------+-------------------------------+-------------+--------------+-----------+------------------+-----------------+-----------------------+----------------+-------------------------+------------------------------+-----------------------------+-----------------------------------+-------------------------------------+----------------+-----------+------------------+----------+-------------+-------------+--------------+--

### Задние 1. Выберите 15 стран с наибольшим процентом переболевших на 31 марта (в выходящем датасете необходимы колонки: iso_code, страна, процент переболевших) 

In [8]:
# Считаем процент заболевших относительно популяции
df_clean = df_clean.withColumn("recover_per_population", 100*(df_clean['total_cases'] - df_clean['total_deaths']) /df_clean['population']) 
df_clean.show()

+--------+---------+-----------+----------+-----------+---------+------------------+------------+----------+-------------------+-----------------------+---------------------+------------------------------+------------------------+----------------------+-------------------------------+-----------------+------------+------------------------+-------------+-------------------------+---------------------+---------------------------------+----------------------+----------------------------------+---------+-----------+------------------------+----------------------+------------------+-------------------------------+-------------+--------------+-----------+------------------+-----------------+-----------------------+----------------+-------------------------+------------------------------+-----------------------------+-----------------------------------+-------------------------------------+----------------+-----------+------------------+----------+-------------+-------------+--------------+--

In [9]:
df_clean.select(['iso_code','location','date', 'recover_per_population']).filter(df['date']=='2021-03-31').orderBy('recover_per_population', ascending=False).show(15)

+--------+-------------+----------+----------------------+
|iso_code|     location|      date|recover_per_population|
+--------+-------------+----------+----------------------+
|     AND|      Andorra|2021-03-31|    15.395068918656571|
|     MNE|   Montenegro|2021-03-31|    14.320879148873837|
|     CZE|      Czechia|2021-03-31|    14.062130275314685|
|     SMR|   San Marino|2021-03-31|    13.689669397135955|
|     SVN|     Slovenia|2021-03-31|     10.17613851727714|
|     LUX|   Luxembourg|2021-03-31|     9.728168492082764|
|     ISR|       Israel|2021-03-31|      9.55337164944398|
|     USA|United States|2021-03-31|     9.036138614323528|
|     SRB|       Serbia|2021-03-31|     8.748322457350884|
|     BHR|      Bahrain|2021-03-31|      8.45824153156208|
|     PAN|       Panama|2021-03-31|     8.087039673975518|
|     EST|      Estonia|2021-03-31|     7.954685086529683|
|     PRT|     Portugal|2021-03-31|     7.893469921220645|
|     SWE|       Sweden|2021-03-31|     7.83641787970813

### Задние 2.Top 10 стран с максимальным зафиксированным кол-вом новых случаев за последнюю неделю марта 2021 в отсортированном порядке по убыванию
(в выходящем датасете необходимы колонки: число, страна, кол-во новых случаев)

In [10]:
# Фильтруем записи по условию поподанию в требуемый временной промежуток
df_in_time_range = df_clean.select(
    ['iso_code',
    'location',
    'date',
    'new_cases']).filter( 
        (df['date'] > '2021-03-23') & (df['date'] < '2021-04-01'))

df_in_time_range.show()

+--------+-----------+----------+---------+
|iso_code|   location|      date|new_cases|
+--------+-----------+----------+---------+
|     AFG|Afghanistan|2021-03-24|     15.0|
|     AFG|Afghanistan|2021-03-25|     34.0|
|     AFG|Afghanistan|2021-03-26|     28.0|
|     AFG|Afghanistan|2021-03-27|     36.0|
|     AFG|Afghanistan|2021-03-28|      4.0|
|     AFG|Afghanistan|2021-03-29|     28.0|
|     AFG|Afghanistan|2021-03-30|     62.0|
|     AFG|Afghanistan|2021-03-31|     70.0|
|     ALB|    Albania|2021-03-24|    448.0|
|     ALB|    Albania|2021-03-25|    472.0|
|     ALB|    Albania|2021-03-26|    449.0|
|     ALB|    Albania|2021-03-27|    425.0|
|     ALB|    Albania|2021-03-28|    493.0|
|     ALB|    Albania|2021-03-29|    285.0|
|     ALB|    Albania|2021-03-30|    304.0|
|     ALB|    Albania|2021-03-31|    434.0|
|     DZA|    Algeria|2021-03-24|     89.0|
|     DZA|    Algeria|2021-03-25|    105.0|
|     DZA|    Algeria|2021-03-26|    114.0|
|     DZA|    Algeria|2021-03-27

In [11]:
# Группируем по странам и считаем сумму случаев 
df_sum_new_cases_per_time = df_in_time_range.groupby('iso_code').max()
df_sum_new_cases_per_time.show()

+--------+--------------+
|iso_code|max(new_cases)|
+--------+--------------+
|     HTI|          22.0|
|     BRB|          20.0|
|     LVA|         776.0|
|     JAM|         480.0|
|     BRA|      100158.0|
|     ARM|        1257.0|
|     CUB|        1051.0|
|     JOR|        9130.0|
|     FRA|       65392.0|
|     BRN|           3.0|
|     COD|          93.0|
|     FSM|           0.0|
|     BOL|        1072.0|
|     GIB|           0.0|
|     LBY|         912.0|
|     ETH|        2173.0|
|     ATG|          23.0|
|     GNQ|          71.0|
|     ITA|       24076.0|
|     CMR|        7047.0|
+--------+--------------+
only showing top 20 rows



# calculate using python

In [12]:
# Выводим TOP10 стран по количеству новых
df_sum_new_cases_per_time.orderBy('max(new_cases)', ascending=False).show(10)

+--------+--------------+
|iso_code|max(new_cases)|
+--------+--------------+
|     BRA|      100158.0|
|     USA|       86960.0|
|     IND|       72330.0|
|     FRA|       65392.0|
|     TUR|       39302.0|
|     POL|       35145.0|
|     DEU|       25014.0|
|     ITA|       24076.0|
|     PER|       19206.0|
|     UKR|       18226.0|
+--------+--------------+
only showing top 10 rows



# calculate using sql 

In [13]:
df_in_time_range.registerTempTable("df_table")
df_sum_new_cases_per_time = spark.sql(
          "SELECT iso_code, max(new_cases) "
          "FROM df_table "
          "GROUP BY iso_code " 
          "ORDER BY max(new_cases) DESC "
          "LIMIT 10 " 
           )

df_sum_new_cases_per_time.show() 



+--------+--------------+
|iso_code|max(new_cases)|
+--------+--------------+
|     BRA|      100158.0|
|     USA|       86960.0|
|     IND|       72330.0|
|     FRA|       65392.0|
|     TUR|       39302.0|
|     POL|       35145.0|
|     DEU|       25014.0|
|     ITA|       24076.0|
|     PER|       19206.0|
|     UKR|       18226.0|
+--------+--------------+



# Не придумал, ничего лучше, кроме того как сделать join, чтобы получить дату

In [14]:
cond = [df_sum_new_cases_per_time["max(new_cases)"] == df_in_time_range["new_cases"], df_sum_new_cases_per_time["iso_code"] == df_in_time_range["iso_code"] ]
        
df_sum_new_cases_per_time_with_time = df_sum_new_cases_per_time.join(df_in_time_range, cond, "left_outer")
df_sum_new_cases_per_time_with_time.show()

+--------+--------------+--------+-------------+----------+---------+
|iso_code|max(new_cases)|iso_code|     location|      date|new_cases|
+--------+--------------+--------+-------------+----------+---------+
|     BRA|      100158.0|     BRA|       Brazil|2021-03-25| 100158.0|
|     USA|       86960.0|     USA|United States|2021-03-24|  86960.0|
|     IND|       72330.0|     IND|        India|2021-03-31|  72330.0|
|     FRA|       65392.0|     FRA|       France|2021-03-24|  65392.0|
|     TUR|       39302.0|     TUR|       Turkey|2021-03-31|  39302.0|
|     POL|       35145.0|     POL|       Poland|2021-03-26|  35145.0|
|     DEU|       25014.0|     DEU|      Germany|2021-03-31|  25014.0|
|     ITA|       24076.0|     ITA|        Italy|2021-03-26|  24076.0|
|     PER|       19206.0|     PER|         Peru|2021-03-25|  19206.0|
|     UKR|       18226.0|     UKR|      Ukraine|2021-03-26|  18226.0|
+--------+--------------+--------+-------------+----------+---------+



In [15]:
df = df_sum_new_cases_per_time_with_time.drop(df_in_time_range.new_cases)
df = df.drop(df_in_time_range.iso_code)
df.show()

+--------------+--------+-------------+----------+
|max(new_cases)|iso_code|     location|      date|
+--------------+--------+-------------+----------+
|      100158.0|     BRA|       Brazil|2021-03-25|
|       86960.0|     USA|United States|2021-03-24|
|       72330.0|     IND|        India|2021-03-31|
|       65392.0|     FRA|       France|2021-03-24|
|       39302.0|     TUR|       Turkey|2021-03-31|
|       35145.0|     POL|       Poland|2021-03-26|
|       25014.0|     DEU|      Germany|2021-03-31|
|       24076.0|     ITA|        Italy|2021-03-26|
|       19206.0|     PER|         Peru|2021-03-25|
|       18226.0|     UKR|      Ukraine|2021-03-26|
+--------------+--------+-------------+----------+



### Задние 3. Посчитайте изменение случаев относительно предыдущего дня в России за последнюю неделю марта 2021. (например: в россии вчера было 9150 , сегодня 8763, итог: -387) (в выходящем датасете необходимы колонки: число, кол-во новых случаев вчера, кол-во новых случаев сегодня, дельта)

In [16]:
import pyspark.sql.functions as f
from pyspark.sql import Window

In [17]:
df_in_time_range_RUS = df_in_time_range.filter(df_in_time_range['iso_code'] == 'RUS')
df_in_time_range_RUS.show()

+--------+--------+----------+---------+
|iso_code|location|      date|new_cases|
+--------+--------+----------+---------+
|     RUS|  Russia|2021-03-24|   8769.0|
|     RUS|  Russia|2021-03-25|   9128.0|
|     RUS|  Russia|2021-03-26|   9073.0|
|     RUS|  Russia|2021-03-27|   8783.0|
|     RUS|  Russia|2021-03-28|   8979.0|
|     RUS|  Russia|2021-03-29|   8589.0|
|     RUS|  Russia|2021-03-30|   8162.0|
|     RUS|  Russia|2021-03-31|   8156.0|
+--------+--------+----------+---------+



In [18]:
w = Window.partitionBy('iso_code', 'location').orderBy('date') 
df_in_time_range_RUS.withColumn('lead', f.lag('new_cases', 1).over(w)) \
  .withColumn('new_cases_diff', f.when(f.col('lead').isNotNull(), f.col('new_cases') - f.col('lead')).otherwise(f.lit(None))) \
  .show(10, False)

+--------+--------+----------+---------+------+--------------+
|iso_code|location|date      |new_cases|lead  |new_cases_diff|
+--------+--------+----------+---------+------+--------------+
|RUS     |Russia  |2021-03-24|8769.0   |null  |null          |
|RUS     |Russia  |2021-03-25|9128.0   |8769.0|359.0         |
|RUS     |Russia  |2021-03-26|9073.0   |9128.0|-55.0         |
|RUS     |Russia  |2021-03-27|8783.0   |9073.0|-290.0        |
|RUS     |Russia  |2021-03-28|8979.0   |8783.0|196.0         |
|RUS     |Russia  |2021-03-29|8589.0   |8979.0|-390.0        |
|RUS     |Russia  |2021-03-30|8162.0   |8589.0|-427.0        |
|RUS     |Russia  |2021-03-31|8156.0   |8162.0|-6.0          |
+--------+--------+----------+---------+------+--------------+

