In [None]:
from pathlib import Path
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.functions import *
from pyspark.sql.types import *
from pyspark.sql import Window as W

# init SparkSession
ss = SparkSession.builder.appName("SparkByExamples").getOrCreate()

In [15]:
df = ss.read.option("header", True).option("delimiter", ",") \
.format("csv").load("./owid-covid-data.csv")

                                                                                

Задание:

Выберите 15 стран с наибольшим процентом переболевших на 31 марта (в выходящем датасете необходимы колонки: iso_code, страна, процент переболевших)

Top 10 стран с максимальным зафиксированным кол-вом новых случаев за последнюю неделю марта 2021 в отсортированном порядке по убыванию (в выходящем датасете необходимы колонки: число, страна, кол-во новых случаев)

Посчитайте изменение случаев относительно предыдущего дня в России за последнюю неделю марта 2021. (например: в россии вчера было 9150 , сегодня 8763, итог: -387) (в выходящем датасете необходимы колонки: число, кол-во новых случаев вчера, кол-во новых случаев сегодня, дельта)

In [6]:
# Описание атрибутов
# https://github.com/owid/covid-19-data/blob/master/public/data/README.md

In [17]:
# exec_1

# Выберите 15 стран с наибольшим процентом 
# переболевших на 31 марта (в выходящем датасете необходимы колонки: 
# iso_code, страна, процент переболевших)

df \
.filter(F.col("date") == "2021-03-31") \
.select('iso_code', 'location',
       F.lit(
           F.round((F.col('total_cases') / F.col('population') * 100), 3) 
       ).alias("pers_infected")) \
.orderBy(F.col("pers_infected").desc()) \
.show(15, False)

# # 'population', 'total_cases', 'date',

+--------+-------------+-------------+
|iso_code|location     |pers_infected|
+--------+-------------+-------------+
|AND     |Andorra      |15.544       |
|MNE     |Montenegro   |14.524       |
|CZE     |Czechia      |14.309       |
|SMR     |San Marino   |13.937       |
|SVN     |Slovenia     |10.371       |
|LUX     |Luxembourg   |9.847        |
|ISR     |Israel       |9.625        |
|USA     |United States|9.203        |
|SRB     |Serbia       |8.826        |
|BHR     |Bahrain      |8.489        |
|PAN     |Panama       |8.229        |
|PRT     |Portugal     |8.059        |
|EST     |Estonia      |8.023        |
|SWE     |Sweden       |7.97         |
|LTU     |Lithuania    |7.939        |
+--------+-------------+-------------+
only showing top 15 rows



In [22]:
# exec_2

# Top 10 стран с максимальным зафиксированным 
# кол-вом новых случаев за последнюю неделю марта 2021 в 
# отсортированном порядке по убыванию
# (в выходящем датасете необходимы колонки: число, 
#  страна, кол-во новых случаев)


# судя по комментариям к задаче нужно: 
# а) разбить через окно по условиям "location", "date" 
#    для подсчета внутри страны за конкретную дату
# б) Найти максимальное кол-во новых случаев для каждой страны в периоде
# в) Вывести только максимальное кол-во для каждой страны в формате :
#    |date      |location     |max_new_cases|


# подготовка ДатаФрейма для работы (для возможности использовать SQL)
df.createOrReplaceTempView("table_1")
# тянем нужные атрибуты из всего отношения
new_df = ss.sql(
"""SELECT location, date , new_cases
FROM table_1 
WHERE date BETWEEN '2021-03-22' AND '2021-03-28'
""")

# Задаем правила группировки
windowSpecAgg = W.partitionBy("location", "date")
windowSpecAgg_2 = W.partitionBy("location") 

# ищем топ 10 стран
pre_result = new_df \
.withColumn("sum_new_cases", F.sum(F.col("new_cases")).over(windowSpecAgg)) \
.withColumn("max_new_cases", F.max(F.col("sum_new_cases")).over(windowSpecAgg_2)) \
.select("date", "location", "max_new_cases") \
.orderBy(F.col("max_new_cases").desc()) \
.filter(F.col("max_new_cases") == F.col("sum_new_cases"))
# .show(10, False)

# Исключаем статистику по континентам и полит-блокам
# т.о. получаем результирующий набор
pre_result \
.filter(~F.col("location") \
        .rlike("Africa|Asia|World|Europe|European\ Union|South America|North America")) \
.show(10, False)

+----------+-------------+-------------+
|date      |location     |max_new_cases|
+----------+-------------+-------------+
|2021-03-25|Brazil       |100158.0     |
|2021-03-24|United States|86960.0      |
|2021-03-28|India        |68020.0      |
|2021-03-24|France       |65392.0      |
|2021-03-26|Poland       |35145.0      |
|2021-03-27|Turkey       |30021.0      |
|2021-03-22|Italy        |24501.0      |
|2021-03-24|Germany      |23757.0      |
|2021-03-25|Peru         |19206.0      |
|2021-03-26|Ukraine      |18226.0      |
+----------+-------------+-------------+
only showing top 10 rows



In [43]:
# exec_3

# Посчитайте изменение случаев относительно предыдущего дня в
# России за последнюю неделю марта 2021. (например: в России вчера 
# было 9150 , сегодня 8763, итог: -387) (в выходящем датасете необходимы колонки: 
# число, кол-во новых случаев вчера, кол-во новых случаев сегодня, дельта)

# Подготовка новой фактуру по измененным условиям

# добавл один день для вывода дельты за всю неделю

df_for_exec_3 = ss.read.options(header=True, delimiter=",") \
.format("csv").load("./owid-covid-data.csv") \
.withColumn("date_d", F.to_date(F.col("date"))) \
.filter(
    (F.col("date_d").between("2021-03-21", "2021-03-28") & \
     (F.col("location") == "Russia"))) \
.select("location", "date_d" , "new_cases") \
.orderBy("date_d")

"""Правка в атрибуте дельты (параметры расчета - поменял местам) """

windowSpec  = W.partitionBy("location").orderBy("date_d")

df_for_exec_3 \
.withColumn("lag", F.lag("new_cases", 1).over(windowSpec)) \
.withColumn("qwe", F.col("lag").cast(IntegerType())) \
.withColumn("rty", F.col("new_cases").cast(IntegerType())) \
.select("date_d",
        F.col("qwe").alias("yesterday_cases"),
        F.col("rty").alias("today_cases"),
        F.lit(F.col("rty") - F.col("qwe")).alias("delta")) \
.show(10, False)

+----------+---------------+-----------+-----+
|date_d    |yesterday_cases|today_cases|delta|
+----------+---------------+-----------+-----+
|2021-03-21|null           |9215       |null |
|2021-03-22|9215           |9195       |-20  |
|2021-03-23|9195           |8369       |-826 |
|2021-03-24|8369           |8769       |400  |
|2021-03-25|8769           |9128       |359  |
|2021-03-26|9128           |9073       |-55  |
|2021-03-27|9073           |8783       |-290 |
|2021-03-28|8783           |8979       |196  |
+----------+---------------+-----------+-----+



In [None]:
ss.stop()