In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col

spark = SparkSession.builder.appName("CovidAnalysis").getOrCreate()

covid_df = spark.read.csv("/home/jovyan/work/covid-data.csv", header=True, inferSchema=True)

In [2]:
#Выберите 15 стран с наибольшим процентом переболевших на 31 марта (в выходящем датасете необходимы колонки: iso_code, страна, процент переболевших)
from pyspark.sql.functions import max, lit

march_31_df = covid_df.filter(col("date") == "2021-03-31")
march_31_df = march_31_df.withColumn("perc_recovered", (col("total_cases") / col("population")) * 100)
top_15_countries = march_31_df.select("iso_code", "location", "perc_recovered") \
                               .orderBy(col("perc_recovered").desc()) \
                               .limit(15)
top_15_countries.write.mode("overwrite").csv("/home/jovyan/top_15_countries.csv", header=True)

In [3]:
from pyspark.sql import functions as F
from pyspark.sql.window import Window

non_country_list = ["World", "Asia", "Africa", "Europe", "North America", "South America", "European Union", "Oceania"]
valid_countries_df = covid_df.filter(~covid_df['location'].isin(non_country_list))

filtered_df = valid_countries_df.filter((F.col('date') >= '2021-03-24') & (F.col('date') <= '2021-03-31'))

window_spec = Window.partitionBy("location").orderBy(F.desc("new_cases"))

max_cases_df = filtered_df.withColumn("max_new_cases", F.max("new_cases").over(window_spec)) \
    .withColumn("rank", F.rank().over(window_spec)) \
    .filter(F.col("rank") == 1) \
    .drop("rank") \
    .select("location", "date", "new_cases")

top_10_countries = max_cases_df.orderBy(F.desc("new_cases")).limit(10)

top_10_countries.show()

# Сохраняем результат в файл
top_10_countries.write.mode("overwrite").csv("/home/jovyan/top_10_countries.csv", header=True)

+-------------+----------+---------+
|     location|      date|new_cases|
+-------------+----------+---------+
|       Brazil|2021-03-25| 100158.0|
|United States|2021-03-24|  86960.0|
|        India|2021-03-31|  72330.0|
|       France|2021-03-24|  65392.0|
|       Turkey|2021-03-31|  39302.0|
|       Poland|2021-03-26|  35145.0|
|      Germany|2021-03-31|  25014.0|
|        Italy|2021-03-26|  24076.0|
|         Peru|2021-03-25|  19206.0|
|      Ukraine|2021-03-26|  18226.0|
+-------------+----------+---------+



In [5]:
#Посчитайте изменение случаев относительно предыдущего дня в России за последнюю неделю марта 2021. 
#(например: в россии вчера было 9150 , сегодня 8763, итог: -387) 
#(в выходящем датасете необходимы колонки: число, кол-во новых случаев вчера, кол-во новых случаев сегодня, дельта)
from pyspark.sql.window import Window

window_spec = Window.partitionBy("location").orderBy("date")

russia_df = covid_df.filter(covid_df['location'] == "Russia") \
    .withColumn("daily_new_cases", F.col("new_cases") - F.lag("new_cases", 1).over(window_spec)) \
    .na.fill(0, subset=["daily_new_cases"])

russia_df = russia_df.select("date", "daily_new_cases").filter(F.col("date") >= '2021-03-31')

russia_df.show()
russia_df.write.mode("overwrite").csv("/home/jovyan/russia_delta_cases.csv", header=True)

+----------+---------------+
|      date|daily_new_cases|
+----------+---------------+
|2021-03-24|          400.0|
|2021-03-25|          359.0|
|2021-03-26|          -55.0|
|2021-03-27|         -290.0|
|2021-03-28|          196.0|
|2021-03-29|         -390.0|
|2021-03-30|         -427.0|
|2021-03-31|           -6.0|
|2021-04-01|          901.0|
|2021-04-02|         -375.0|
|2021-04-03|          224.0|
|2021-04-04|         -209.0|
|2021-04-05|         -172.0|
|2021-04-06|         -319.0|
|2021-04-07|          -32.0|
|2021-04-08|          371.0|
|2021-04-09|          482.0|
|2021-04-10|         -447.0|
|2021-04-11|           -4.0|
|2021-04-12|         -385.0|
+----------+---------------+
only showing top 20 rows

