<a href="https://colab.research.google.com/github/Benji8bit/dpro_spark/blob/main/spark_hw_02.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

###Шаг 1: Импорт необходимых библиотек и создание Spark Session

In [1]:
pip install pyspark

Collecting pyspark
  Downloading pyspark-3.5.3.tar.gz (317.3 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m317.3/317.3 MB[0m [31m4.9 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.5.3-py2.py3-none-any.whl size=317840625 sha256=8fa83eb20df0a793bd74cdd771856122962438c0c921b2514439a775a63847b9
  Stored in directory: /root/.cache/pip/wheels/1b/3a/92/28b93e2fbfdbb07509ca4d6f50c5e407f48dce4ddbda69a4ab
Successfully built pyspark
Installing collected packages: pyspark
Successfully installed pyspark-3.5.3


In [2]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, lit, sum, datediff, to_date, when

# Создаем Spark Session
spark = SparkSession.builder.appName("COVID-19 Analysis").getOrCreate()

### Шаг 2: Загрузка данных
У нас есть CSV-файл с данными о COVID-19. Загрузим его в DataFrame:

In [3]:
# Файл называется 'covid-data.csv' и находится в той же директории
df = spark.read.csv('covid-data.csv', header=True, inferSchema=True)

In [4]:
df.show(5)

+--------+---------+-----------+----------+-----------+---------+------------------+------------+----------+-------------------+-----------------------+---------------------+------------------------------+------------------------+----------------------+-------------------------------+-----------------+------------+------------------------+-------------+-------------------------+---------------------+---------------------------------+----------------------+----------------------------------+---------+-----------+------------------------+----------------------+------------------+-------------------------------+-------------+--------------+-----------+------------------+-----------------+-----------------------+----------------+-------------------------+------------------------------+-----------------------------+-----------------------------------+-------------------------------------+----------------+-----------+------------------+----------+-------------+-------------+--------------+--

In [5]:
df.printSchema()

root
 |-- iso_code: string (nullable = true)
 |-- continent: string (nullable = true)
 |-- location: string (nullable = true)
 |-- date: date (nullable = true)
 |-- total_cases: double (nullable = true)
 |-- new_cases: double (nullable = true)
 |-- new_cases_smoothed: double (nullable = true)
 |-- total_deaths: double (nullable = true)
 |-- new_deaths: double (nullable = true)
 |-- new_deaths_smoothed: double (nullable = true)
 |-- total_cases_per_million: double (nullable = true)
 |-- new_cases_per_million: double (nullable = true)
 |-- new_cases_smoothed_per_million: double (nullable = true)
 |-- total_deaths_per_million: double (nullable = true)
 |-- new_deaths_per_million: double (nullable = true)
 |-- new_deaths_smoothed_per_million: double (nullable = true)
 |-- reproduction_rate: double (nullable = true)
 |-- icu_patients: double (nullable = true)
 |-- icu_patients_per_million: double (nullable = true)
 |-- hosp_patients: double (nullable = true)
 |-- hosp_patients_per_million: 

###Шаг 3: Подготовка данных
Отфильтруем датасет по дате

In [6]:
df.count()

82289

In [7]:
filtered_df = df.filter(col("date") <= lit("2020-03-31"))
filtered_df.count()

6252

###Шаг 4: Выбор 15 стран с наибольшим процентом переболевших

In [8]:
# Группируем данные по стране и дате
grouped_df = filtered_df.groupBy("iso_code", "location", "population")

In [9]:
# Подсчитываем общее количество случаев заболевания
total_cases_df = grouped_df.agg(sum("total_cases").alias("total_cases_by_country"))

total_cases_df = total_cases_df.sort("total_cases_by_country")

In [10]:
total_cases_df.sort(total_cases_df.total_cases_by_country.desc()).show(10)

+--------+--------------+-------------+----------------------+
|iso_code|      location|   population|total_cases_by_country|
+--------+--------------+-------------+----------------------+
|OWID_WRL|         World|7.794798729E9|           1.0767888E7|
|OWID_ASI|          Asia|4.639847425E9|             5287364.0|
|     CHN|         China|1.439323774E9|             4186795.0|
|OWID_EUR|        Europe| 7.48680069E8|             4067030.0|
|OWID_EUN|European Union|  4.4491906E8|             3523793.0|
|     ITA|         Italy|  6.0461828E7|             1213740.0|
|OWID_NAM| North America| 5.92072204E8|             1201658.0|
|     USA| United States| 3.31002647E8|             1120565.0|
|     ESP|         Spain|  4.6754783E7|              742697.0|
|     DEU|       Germany|  8.3783945E7|              588758.0|
+--------+--------------+-------------+----------------------+
only showing top 10 rows



In [11]:
cases_percent_df = total_cases_df.withColumn("cases_percent", (total_cases_df.total_cases_by_country * 100)/total_cases_df.population)

In [12]:
cases_percent_df = cases_percent_df.select("iso_code", "location", "cases_percent").orderBy(cases_percent_df.cases_percent.desc()).show(15)

+--------+--------------+------------------+
|iso_code|      location|     cases_percent|
+--------+--------------+------------------+
|     SMR|    San Marino| 10.20684778124816|
|     VAT|       Vatican| 7.169344870210136|
|     AND|       Andorra|3.6070665890118425|
|     ISL|       Iceland| 3.311355311355311|
|     LUX|    Luxembourg|  2.74930668268432|
|     ITA|         Italy| 2.007448401990095|
|     LIE| Liechtenstein|1.8197550934787738|
|     CHE|   Switzerland|1.7925343440923678|
|     ESP|         Spain| 1.588494165399078|
|     MCO|        Monaco|1.1059015390887779|
|     NOR|        Norway|0.9494318829522829|
|     AUT|       Austria|0.9417858411796056|
|OWID_EUN|European Union|0.7920076519086415|
|     BEL|       Belgium|0.7682308024700732|
|     DEU|       Germany|0.7027098091406414|
+--------+--------------+------------------+
only showing top 15 rows



###Шаг 5: Выбор 10 стран с максимальным зафиксированным количеством новых случаев за последнюю неделю марта 2021

In [14]:
# Фильтруем данные за последнюю неделю марта 2021
df_march_21 = df.filter((col("date") >= "2021-03-23") & (col("date") <= "2021-03-31"))

In [17]:
# Группируем данные по стране и подсчитываем новые случаи
new_cases = df_march_21.groupBy("iso_code", "location").agg(sum("new_cases").alias("total_new_cases"))

In [19]:
# Находим страны с максимальным количеством новых случаев
top_10_countries = new_cases.orderBy(new_cases.total_new_cases.desc()).limit(10)

In [20]:
# Выводим результат
top_10_countries.select("location", "total_new_cases").show()

+--------------+---------------+
|      location|total_new_cases|
+--------------+---------------+
|         World|      5204960.0|
|        Europe|      1909681.0|
|European Union|      1529997.0|
|          Asia|      1405330.0|
| South America|      1074115.0|
| North America|       711840.0|
|        Brazil|       701221.0|
| United States|       588863.0|
|         India|       534869.0|
|        France|       346158.0|
+--------------+---------------+



###Шаг 6: Изменение случаев относительно предыдущего дня в России за последнюю неделю марта 2021

In [34]:
from pyspark.sql.window import Window
from pyspark.sql.functions import lag

In [50]:
# Фильтруем данные за последнюю неделю марта 2021
df_march_21 = df.filter((col("date") >= "2021-03-23") & (col("date") <= "2021-03-31")).filter(col("iso_code") == "RUS")
df_march_21.show(5)

+--------+---------+--------+----------+-----------+---------+------------------+------------+----------+-------------------+-----------------------+---------------------+------------------------------+------------------------+----------------------+-------------------------------+-----------------+------------+------------------------+-------------+-------------------------+---------------------+---------------------------------+----------------------+----------------------------------+---------+------------+------------------------+----------------------+------------------+-------------------------------+-------------+--------------+---------------+------------------+-----------------+-----------------------+----------------+-------------------------+------------------------------+-----------------------------+-----------------------------------+-------------------------------------+----------------+-----------+------------------+----------+-------------+-------------+--------------+

In [51]:
# Группируем данные по дате и подсчитываем новые случаи
daily_cases = df_march_21.groupBy("iso_code", "date").agg(sum("new_cases").alias("cases"))
daily_cases.show(5)

+--------+----------+------+
|iso_code|      date| cases|
+--------+----------+------+
|     RUS|2021-03-24|8769.0|
|     RUS|2021-03-28|8979.0|
|     RUS|2021-03-27|8783.0|
|     RUS|2021-03-30|8162.0|
|     RUS|2021-03-25|9128.0|
+--------+----------+------+
only showing top 5 rows



In [52]:
# Преобразуем данные в DataFrame с колонками для каждого дня
daily_cases_df = daily_cases.withColumn("prev_day", lag(col("cases"), 1).over(Window.orderBy("date")))
daily_cases_df.show(5)

+--------+----------+------+--------+
|iso_code|      date| cases|prev_day|
+--------+----------+------+--------+
|     RUS|2021-03-23|8369.0|    NULL|
|     RUS|2021-03-24|8769.0|  8369.0|
|     RUS|2021-03-25|9128.0|  8769.0|
|     RUS|2021-03-26|9073.0|  9128.0|
|     RUS|2021-03-27|8783.0|  9073.0|
+--------+----------+------+--------+
only showing top 5 rows



In [55]:
# Вычисляем изменение случаев
delta_cases_rus = daily_cases_df.withColumn("delta", col("cases") - col("prev_day"))
delta_cases_rus.show(5)

+--------+----------+------+--------+------+
|iso_code|      date| cases|prev_day| delta|
+--------+----------+------+--------+------+
|     RUS|2021-03-23|8369.0|    NULL|  NULL|
|     RUS|2021-03-24|8769.0|  8369.0| 400.0|
|     RUS|2021-03-25|9128.0|  8769.0| 359.0|
|     RUS|2021-03-26|9073.0|  9128.0| -55.0|
|     RUS|2021-03-27|8783.0|  9073.0|-290.0|
+--------+----------+------+--------+------+
only showing top 5 rows



In [56]:
# Выводим результат
delta_cases_rus.select("date", "cases", "delta").orderBy("date").show()

+----------+------+------+
|      date| cases| delta|
+----------+------+------+
|2021-03-23|8369.0|  NULL|
|2021-03-24|8769.0| 400.0|
|2021-03-25|9128.0| 359.0|
|2021-03-26|9073.0| -55.0|
|2021-03-27|8783.0|-290.0|
|2021-03-28|8979.0| 196.0|
|2021-03-29|8589.0|-390.0|
|2021-03-30|8162.0|-427.0|
|2021-03-31|8156.0|  -6.0|
+----------+------+------+

