**Цель: изучить датасет по заболеваемости ковид-19 и получить навык работы с DataFrame API**

Задание:

Выберите 15 стран с наибольшим процентом переболевших на 31 марта (в выходящем датасете необходимы колонки: iso_code, страна, процент переболевших)

Top 10 стран с максимальным зафиксированным кол-вом новых случаев за последнюю неделю марта 2021 в отсортированном порядке по убыванию
(в выходящем датасете необходимы колонки: число, страна, кол-во новых случаев)

Посчитайте изменение случаев относительно предыдущего дня в России за последнюю неделю марта 2021. (например: в россии вчера было 9150 , сегодня 8763, итог: -387) (в выходящем датасете необходимы колонки: число, кол-во новых случаев вчера, кол-во новых случаев сегодня, дельта)

Формат выполнения работы: загрузите скрипт и результат выборки на гитхаб и пришлите ссылку на выполненную работу для проверки экспертом

In [1]:
!pip install pyspark

Collecting pyspark
  Downloading pyspark-3.5.2.tar.gz (317.3 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m317.3/317.3 MB[0m [31m4.0 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.5.2-py2.py3-none-any.whl size=317812365 sha256=7b60aecf054e26cee26d718efd2631974c0352d97d3477cbd05c4ae34bf6328d
  Stored in directory: /root/.cache/pip/wheels/34/34/bd/03944534c44b677cd5859f248090daa9fb27b3c8f8e5f49574
Successfully built pyspark
Installing collected packages: pyspark
Successfully installed pyspark-3.5.2


In [2]:
# Данные за 2020 год

from pyspark.sql import SparkSession
from pyspark.sql.functions import col, round

# Создаем сессию Spark
spark = SparkSession.builder \
    .appName("COVID Data Analysis") \
    .getOrCreate()

# Чтение данных из CSV файла
df = (spark.read.option('header', True)
                .option('sep', ',')
                .option('inferSchema', True)
                .csv('covid-data.csv'))

# Выполнение запроса на фильтрацию и выборку данных
(df.where((col('date') == '2020-03-31')
          & (~col('iso_code').like('%OWID_%')))
   .select('iso_code',
           col('location').alias('страна'),
           (round((col('total_cases') - col('total_deaths')) / col('population') * 100, 3)).alias('процент переболевших'))
   .orderBy(col('процент переболевших').desc())
   .limit(15)
   .show())

# Останавливаем сессию Spark после завершения работы
spark.stop()


+--------+-----------+--------------------+
|iso_code|     страна|процент переболевших|
+--------+-----------+--------------------+
|     SMR| San Marino|               0.619|
|     AND|    Andorra|               0.471|
|     LUX| Luxembourg|               0.344|
|     ISL|    Iceland|               0.332|
|     ESP|      Spain|               0.187|
|     CHE|Switzerland|               0.187|
|     ITA|      Italy|               0.154|
|     MCO|     Monaco|                0.13|
|     AUT|    Austria|               0.112|
|     BEL|    Belgium|               0.104|
|     NOR|     Norway|               0.085|
|     DEU|    Germany|               0.085|
|     FRA|     France|               0.072|
|     PRT|   Portugal|               0.071|
|     NLD|Netherlands|               0.068|
+--------+-----------+--------------------+



In [3]:
# Данные за 2021 год

from pyspark.sql import SparkSession
from pyspark.sql.functions import col, round

# Создаем сессию Spark
spark = SparkSession.builder \
    .appName("COVID Data Analysis") \
    .getOrCreate()

# Чтение данных из CSV файла
df = (spark.read.option('header', True)
                .option('sep', ',')
                .option('inferSchema', True)
                .csv('covid-data.csv'))

# Выполнение запроса на фильтрацию и выборку данных
(df.where((col('date') == '2021-03-31')
          & (~col('iso_code').like('%OWID_%')))
   .select('iso_code',
           col('location').alias('страна'),
           (round((col('total_cases') - col('total_deaths')) / col('population') * 100, 3)).alias('процент переболевших'))
   .orderBy(col('процент переболевших').desc())
   .limit(15)
   .show())

# Останавливаем сессию Spark после завершения работы
spark.stop()

+--------+-------------+--------------------+
|iso_code|       страна|процент переболевших|
+--------+-------------+--------------------+
|     AND|      Andorra|              15.395|
|     MNE|   Montenegro|              14.321|
|     CZE|      Czechia|              14.062|
|     SMR|   San Marino|               13.69|
|     SVN|     Slovenia|              10.176|
|     LUX|   Luxembourg|               9.728|
|     ISR|       Israel|               9.553|
|     USA|United States|               9.036|
|     SRB|       Serbia|               8.748|
|     BHR|      Bahrain|               8.458|
|     PAN|       Panama|               8.087|
|     EST|      Estonia|               7.955|
|     PRT|     Portugal|               7.893|
|     SWE|       Sweden|               7.836|
|     LTU|    Lithuania|               7.808|
+--------+-------------+--------------------+



Top 10 стран с максимальным зафиксированным кол-вом новых случаев за последнюю неделю марта 2021 в отсортированном порядке по убыванию (в выходящем датасете необходимы колонки: число, страна, кол-во новых случаев)

In [4]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, first, date_format

# Создаем или возобновляем сессию Spark
spark = SparkSession.builder \
    .appName("COVID Data Analysis") \
    .getOrCreate()

# Чтение данных из CSV файла
df = (spark.read.option('header', True)
                .option('sep', ',')
                .option('inferSchema', True)
                .csv('covid-data.csv'))

# Выполнение запроса на фильтрацию и выборку данных
(df.where((col('date').between('2021-03-24', '2021-03-31'))
          & (~col('iso_code').like('%OWID_%')))
   .orderBy(col('location'), col('new_cases').desc())
   .groupBy(col('location'))
   .agg(first('new_cases').alias('new_cases'), first('date').alias('date'))
   .orderBy(col('new_cases').desc())
   .select(date_format('date', 'yyyy-MM-dd').alias('число'),
           col('location').alias('страна'),
           col('new_cases').alias('кол-во новых случаев'))
   .limit(10)
   .show())

# Останавливаем сессию Spark после завершения работы (если необходимо)
# spark.stop()

+----------+-------------+--------------------+
|     число|       страна|кол-во новых случаев|
+----------+-------------+--------------------+
|2021-03-25|       Brazil|            100158.0|
|2021-03-24|United States|             86960.0|
|2021-03-31|        India|             72330.0|
|2021-03-24|       France|             65392.0|
|2021-03-31|       Turkey|             39302.0|
|2021-03-26|       Poland|             35145.0|
|2021-03-31|      Germany|             25014.0|
|2021-03-26|        Italy|             24076.0|
|2021-03-25|         Peru|             19206.0|
|2021-03-26|      Ukraine|             18226.0|
+----------+-------------+--------------------+



Посчитайте изменение случаев относительно предыдущего дня в России за последнюю неделю марта 2021. (например: в россии вчера было 9150 , сегодня 8763, итог: -387) (в выходящем датасете необходимы колонки: число, кол-во новых случаев вчера, кол-во новых случаев сегодня, дельта)

In [8]:
from pyspark.sql.window import Window
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, lag, date_format

(df.where((col('date').between('2021-03-28', '2021-03-31'))
          & (col('iso_code') == 'RUS'))
   .withColumn("кол-во новых случаев вчера",lag('new_cases').over(Window.orderBy("date")))
   .select(date_format('date', 'yyyy-MM-dd').alias('число'),
           col('new_cases').alias('кол-во новых случаев сегодня'),
           'кол-во новых случаев вчера',
           (col('new_cases') - col('кол-во новых случаев вчера')).alias('дельта'))
   .where(col('date') != '2021-03-28')
   .show())

+----------+----------------------------+--------------------------+------+
|     число|кол-во новых случаев сегодня|кол-во новых случаев вчера|дельта|
+----------+----------------------------+--------------------------+------+
|2021-03-29|                      8589.0|                    8979.0|-390.0|
|2021-03-30|                      8162.0|                    8589.0|-427.0|
|2021-03-31|                      8156.0|                    8162.0|  -6.0|
+----------+----------------------------+--------------------------+------+

