<a href="https://colab.research.google.com/github/grant88/education/blob/main/spark_covid.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
!pip install pyspark

Collecting pyspark
  Downloading pyspark-3.2.1.tar.gz (281.4 MB)
[K     |████████████████████████████████| 281.4 MB 35 kB/s 
[?25hCollecting py4j==0.10.9.3
  Downloading py4j-0.10.9.3-py2.py3-none-any.whl (198 kB)
[K     |████████████████████████████████| 198 kB 64.3 MB/s 
[?25hBuilding wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.2.1-py2.py3-none-any.whl size=281853642 sha256=4debda6836378cf5b38010fb78b27defdc846e609e658c157803b77189e28abc
  Stored in directory: /root/.cache/pip/wheels/9f/f5/07/7cd8017084dce4e93e84e92efd1e1d5334db05f2e83bcef74f
Successfully built pyspark
Installing collected packages: py4j, pyspark
Successfully installed py4j-0.10.9.3 pyspark-3.2.1


In [97]:
from pyspark.sql import SparkSession
from pyspark.sql import functions as f
from pyspark.sql.window import Window

In [10]:
spark = SparkSession.builder.master("local[*]").appName("PySpark").getOrCreate()

In [16]:
df = spark.read.csv("owid-covid-data.csv", header=True, inferSchema=True)

### Задание 1
5 стран с наибольшим процентом переболевших на 31 марта (в выходящем датасете необходимы колонки: iso_code, страна, процент переболевших)

In [38]:
(
    df
    .filter(f.col("date") == "2021-03-31")
    .select("iso_code", "location", f.round(f.col("total_cases") / f.col("population"), 3).alias("ill_percent"))
    .sort(f.col("ill_percent").desc())
    .show(15)
)

+--------+-------------+-----------+
|iso_code|     location|ill_percent|
+--------+-------------+-----------+
|     AND|      Andorra|      0.155|
|     MNE|   Montenegro|      0.145|
|     CZE|      Czechia|      0.143|
|     SMR|   San Marino|      0.139|
|     SVN|     Slovenia|      0.104|
|     LUX|   Luxembourg|      0.098|
|     ISR|       Israel|      0.096|
|     USA|United States|      0.092|
|     SRB|       Serbia|      0.088|
|     BHR|      Bahrain|      0.085|
|     PAN|       Panama|      0.082|
|     PRT|     Portugal|      0.081|
|     EST|      Estonia|       0.08|
|     SWE|       Sweden|       0.08|
|     LTU|    Lithuania|      0.079|
+--------+-------------+-----------+
only showing top 15 rows



### Задание 2
Top 10 стран с максимальным зафиксированным кол-вом новых случаев за последнюю неделю марта 2021 в отсортированном порядке по убыванию
(в выходящем датасете необходимы колонки: число, страна, кол-во новых случаев)

In [96]:
(
    df
    .where(
        (df.date >= "2021-03-25") 
        & (df.date <= "2021-03-31")
        & (f.length(df.iso_code) == 3)
    )
    .groupBy("iso_code")
    .agg(
        f.max("date").alias("report_date"),
        f.sum("new_cases").alias("new_cases_for_week")
    )
    .select(
        "report_date",
        "iso_code",
        "new_cases_for_week"
    )
    .sort(f.col("new_cases_for_week").desc())
    .show(10)
)

+-----------+--------+------------------+
|report_date|iso_code|new_cases_for_week|
+-----------+--------+------------------+
| 2021-03-31|     BRA|          528736.0|
| 2021-03-31|     USA|          448300.0|
| 2021-03-31|     IND|          434131.0|
| 2021-03-31|     FRA|          266069.0|
| 2021-03-31|     TUR|          225900.0|
| 2021-03-31|     POL|          201046.0|
| 2021-03-31|     ITA|          144037.0|
| 2021-03-31|     DEU|          120656.0|
| 2021-03-31|     UKR|           95016.0|
| 2021-03-31|     ARG|           78944.0|
+-----------+--------+------------------+
only showing top 10 rows



### Задание 3
Изменение случаев относительно предыдущего дня в России за последнюю неделю марта 2021.

(например: в россии вчера было 9150 , сегодня 8763, итог: -387)

(в выходящем датасете необходимы колонки: число, кол-во новых случаев вчера, кол-во новых случаев сегодня, дельта)

In [117]:
window = Window.partitionBy("iso_code").orderBy(f.col("date"))

(
    df
    .where(
        (df.date >= "2021-03-24") 
        & (df.date <= "2021-03-31")
        & (df.iso_code == 'RUS')
    )
    .withColumn("yesterday_new_cases", f.lag("new_cases", 1).over(window))
    .select("date", "yesterday_new_cases", "new_cases", (f.col("new_cases") - f.col("yesterday_new_cases")).alias("delta"))
    .sort(f.col("date").desc())
    .limit(7)
    .sort(f.col("date").asc())
    .show()
)

+----------+-------------------+---------+------+
|      date|yesterday_new_cases|new_cases| delta|
+----------+-------------------+---------+------+
|2021-03-25|             8769.0|   9128.0| 359.0|
|2021-03-26|             9128.0|   9073.0| -55.0|
|2021-03-27|             9073.0|   8783.0|-290.0|
|2021-03-28|             8783.0|   8979.0| 196.0|
|2021-03-29|             8979.0|   8589.0|-390.0|
|2021-03-30|             8589.0|   8162.0|-427.0|
|2021-03-31|             8162.0|   8156.0|  -6.0|
+----------+-------------------+---------+------+

