In [1]:
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql import Window

spark = SparkSession.builder.appName("CreateSession").master('local').getOrCreate()

22/04/11 18:53:00 WARN Utils: Your hostname, MacBook-Air-Valentin.local resolves to a loopback address: 127.0.0.1; using 172.20.10.2 instead (on interface en0)
22/04/11 18:53:00 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
22/04/11 18:53:01 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [2]:
df = spark.read.option('header', True).option('sep', ',')\
    .option('inferSchema', True)\
    .csv('owid-covid-data.csv')


                                                                                

In [7]:
df.printSchema()

root
 |-- iso_code: string (nullable = true)
 |-- continent: string (nullable = true)
 |-- location: string (nullable = true)
 |-- date: string (nullable = true)
 |-- total_cases: double (nullable = true)
 |-- new_cases: double (nullable = true)
 |-- new_cases_smoothed: double (nullable = true)
 |-- total_deaths: double (nullable = true)
 |-- new_deaths: double (nullable = true)
 |-- new_deaths_smoothed: double (nullable = true)
 |-- total_cases_per_million: double (nullable = true)
 |-- new_cases_per_million: double (nullable = true)
 |-- new_cases_smoothed_per_million: double (nullable = true)
 |-- total_deaths_per_million: double (nullable = true)
 |-- new_deaths_per_million: double (nullable = true)
 |-- new_deaths_smoothed_per_million: double (nullable = true)
 |-- reproduction_rate: double (nullable = true)
 |-- icu_patients: double (nullable = true)
 |-- icu_patients_per_million: double (nullable = true)
 |-- hosp_patients: double (nullable = true)
 |-- hosp_patients_per_million

## 1. Выберите 15 стран с наибольшим процентом переболевших на 31 марта (в выходящем датасете необходимы колонки: iso_code, страна, процент переболевших)

In [64]:
df.select('iso_code', 'location', \
         (F.col('total_cases').cast("int") * 100/F.col('population').cast("int")).alias("Процент переболевших")
         ).where(F.col('date') == '2021-03-31').orderBy(F.col("Процент переболевших").desc()).show(15)

+--------+----------+--------------------+
|iso_code|  location|Процент переболевших|
+--------+----------+--------------------+
|     AND|   Andorra|  15.543907331909661|
|     MNE|Montenegro|  14.523725364693295|
|     CZE|   Czechia|  14.308848404077997|
|     SMR|San Marino|  13.937179562732041|
|     SVN|  Slovenia|  10.370805779121202|
|     LUX|Luxembourg|   9.847342390123583|
|     ISR|    Israel|   9.625106044786802|
|     SRB|    Serbia|   8.826328557933492|
|     BHR|   Bahrain|   8.488860079114566|
|     PAN|    Panama|   8.228739065460761|
|     PRT|  Portugal|   8.058699735120367|
|     EST|   Estonia|   8.022681579659551|
|     SWE|    Sweden|   7.969744347858805|
|     LTU| Lithuania|   7.938864728274824|
|     BEL|   Belgium|  7.6141694427149265|
+--------+----------+--------------------+
only showing top 15 rows



## 2. Top 10 стран с максимальным зафиксированным кол-вом новых случаев за последнюю неделю марта 2021 в отсортированном порядке по убыванию
(в выходящем датасете необходимы колонки: число, страна, кол-во новых случаев)

In [56]:
w = Window.partitionBy("location")
df.withColumn('число', F.first('date').over(w.orderBy(F.col('new_cases').desc())))\
    .withColumn('кол-во новых случаев', F.max('new_cases').over(w))\
    .where(~F.col("iso_code").like('OWID_%'))\
    .select('число','location','кол-во новых случаев').dropDuplicates()\
    .where(F.col('число').between('2021-03-25','2021-03-31'))\
    .orderBy(F.col('кол-во новых случаев').desc()).show()
    

[Stage 75:>                                                         (0 + 1) / 1]

+----------+--------------------+--------------------+
|     число|            location|кол-во новых случаев|
+----------+--------------------+--------------------+
|2021-03-25|              Brazil|            100158.0|
|2021-03-26|             Hungary|             11265.0|
|2021-03-30|              Serbia|              9983.0|
|2021-03-30|              Greece|              4322.0|
|2021-03-29|Bosnia and Herzeg...|              3755.0|
|2021-03-26|               Kenya|              2008.0|
|2021-03-31|     North Macedonia|              1511.0|
|2021-03-25|       Cote d'Ivoire|               767.0|
|2021-03-26|    Papua New Guinea|               551.0|
|2021-03-28|           Mauritius|                62.0|
+----------+--------------------+--------------------+



                                                                                

## 3. Посчитайте изменение случаев относительно предыдущего дня в России за последнюю неделю марта 2021. (например: в россии вчера было 9150 , сегодня 8763, итог: -387) (в выходящем датасете необходимы колонки: число, кол-во новых случаев вчера, кол-во новых случаев сегодня, дельта)

In [90]:
w = Window.partitionBy("iso_code").orderBy(F.col("date").desc())
df.withColumn('yeastarday',F.lag('new_cases',-1).over(w))\
    .withColumn('delta',F.col('new_cases') - F.col('yeastarday'))\
    .where(F.col('date').between('2021-03-25','2021-03-31'))\
    .where(F.col('iso_code') == 'RUS')\
    .select('date','new_cases','yeastarday', 'delta').show()

[Stage 99:>                                                         (0 + 1) / 1]

+----------+---------+----------+------+
|      date|new_cases|yeastarday| delta|
+----------+---------+----------+------+
|2021-03-31|   8156.0|    8162.0|  -6.0|
|2021-03-30|   8162.0|    8589.0|-427.0|
|2021-03-29|   8589.0|    8979.0|-390.0|
|2021-03-28|   8979.0|    8783.0| 196.0|
|2021-03-27|   8783.0|    9073.0|-290.0|
|2021-03-26|   9073.0|    9128.0| -55.0|
|2021-03-25|   9128.0|    8769.0| 359.0|
+----------+---------+----------+------+



                                                                                

w = Window.partitionBy("iso_code").orderBy(F.col("date").desc())
