### Домашнее задание по теме «Spark SQL»

In [1]:
!head -2 owid-covid-data.csv

iso_code,continent,location,date,total_cases,new_cases,new_cases_smoothed,total_deaths,new_deaths,new_deaths_smoothed,total_cases_per_million,new_cases_per_million,new_cases_smoothed_per_million,total_deaths_per_million,new_deaths_per_million,new_deaths_smoothed_per_million,reproduction_rate,icu_patients,icu_patients_per_million,hosp_patients,hosp_patients_per_million,weekly_icu_admissions,weekly_icu_admissions_per_million,weekly_hosp_admissions,weekly_hosp_admissions_per_million,new_tests,total_tests,total_tests_per_thousand,new_tests_per_thousand,new_tests_smoothed,new_tests_smoothed_per_thousand,positive_rate,tests_per_case,tests_units,total_vaccinations,people_vaccinated,people_fully_vaccinated,new_vaccinations,new_vaccinations_smoothed,total_vaccinations_per_hundred,people_vaccinated_per_hundred,people_fully_vaccinated_per_hundred,new_vaccinations_smoothed_per_million,stringency_index,population,population_density,median_age,aged_65_older,aged_70_older,gdp_per_capita,extreme_pover

In [1]:
from pyspark.sql.functions import col, format_number, lag
from pyspark.sql.window import Window

In [2]:
spark = SparkSession.builder.getOrCreate()

In [3]:
df = spark.read.csv("owid-covid-data.csv", inferSchema=True, header=True)

                                                                                

In [4]:
df.printSchema()

root
 |-- iso_code: string (nullable = true)
 |-- continent: string (nullable = true)
 |-- location: string (nullable = true)
 |-- date: timestamp (nullable = true)
 |-- total_cases: double (nullable = true)
 |-- new_cases: double (nullable = true)
 |-- new_cases_smoothed: double (nullable = true)
 |-- total_deaths: double (nullable = true)
 |-- new_deaths: double (nullable = true)
 |-- new_deaths_smoothed: double (nullable = true)
 |-- total_cases_per_million: double (nullable = true)
 |-- new_cases_per_million: double (nullable = true)
 |-- new_cases_smoothed_per_million: double (nullable = true)
 |-- total_deaths_per_million: double (nullable = true)
 |-- new_deaths_per_million: double (nullable = true)
 |-- new_deaths_smoothed_per_million: double (nullable = true)
 |-- reproduction_rate: double (nullable = true)
 |-- icu_patients: double (nullable = true)
 |-- icu_patients_per_million: double (nullable = true)
 |-- hosp_patients: double (nullable = true)
 |-- hosp_patients_per_mill

### Задание 1
Выберите 15 стран с наибольшим процентом переболевших на 31 марта (в выходящем датасете необходимы колонки: iso_code, страна, процент переболевших)

In [6]:
df.select("iso_code", "location", (100*(df.total_cases / df.population)) \
            .alias("cases_percentage")) \
            .where(df.date == "2021-03-31") \
            .orderBy(col("cases_percentage").desc()) \
            .show(15)

+--------+-------------+------------------+
|iso_code|     location|  cases_percentage|
+--------+-------------+------------------+
|     AND|      Andorra|15.543907331909661|
|     MNE|   Montenegro|14.523725364693293|
|     CZE|      Czechia|14.308848404077997|
|     SMR|   San Marino|13.937179562732041|
|     SVN|     Slovenia|10.370805779121204|
|     LUX|   Luxembourg| 9.847342390123583|
|     ISR|       Israel| 9.625106044786802|
|     USA|United States| 9.203010995860707|
|     SRB|       Serbia| 8.826328557933492|
|     BHR|      Bahrain| 8.488860079114566|
|     PAN|       Panama| 8.228739065460761|
|     PRT|     Portugal| 8.058699735120369|
|     EST|      Estonia| 8.022681579659551|
|     SWE|       Sweden| 7.969744347858805|
|     LTU|    Lithuania| 7.938864728274825|
+--------+-------------+------------------+
only showing top 15 rows



### Задание 2
Top 10 стран с максимальным зафиксированным кол-вом новых случаев за последнюю неделю марта 2021 в отсортированном порядке по убыванию
(в выходящем датасете необходимы колонки: число, страна, кол-во новых случаев)

In [5]:
df_range = df.select("date", "location", "new_cases") \
    .where(df.date.between("2021-03-25", "2021-03-31"))

In [6]:
df_max_new_cases = df_range.groupBy("location").max("new_cases")

In [14]:
exclude_list = ['World', 'Europe', 'European Union', 
                'Asia', 'South America', 'North America']

df_max_new_cases \
        .withColumnRenamed("location", "location_") \
        .join(df_range, 
                      (df_max_new_cases["max(new_cases)"] == df_range.new_cases) 
                    & (df_max_new_cases.location == df_range.location)) \
        .orderBy(col("new_cases").desc()) \
        .select("date", "location", "new_cases") \
        .where(~col("location").isin(exclude_list)) \
        .show(10)

+-------------------+-------------+---------+
|               date|     location|new_cases|
+-------------------+-------------+---------+
|2021-03-25 00:00:00|       Brazil| 100158.0|
|2021-03-26 00:00:00|United States|  77321.0|
|2021-03-31 00:00:00|        India|  72330.0|
|2021-03-31 00:00:00|       France|  59054.0|
|2021-03-31 00:00:00|       Turkey|  39302.0|
|2021-03-26 00:00:00|       Poland|  35145.0|
|2021-03-31 00:00:00|      Germany|  25014.0|
|2021-03-26 00:00:00|        Italy|  24076.0|
|2021-03-25 00:00:00|         Peru|  19206.0|
|2021-03-26 00:00:00|       Africa|  18544.0|
+-------------------+-------------+---------+
only showing top 10 rows



### Задание 3
Посчитайте изменение случаев относительно предыдущего дня в России за последнюю неделю марта 2021. (например: в россии вчера было 9150 , сегодня 8763, итог: -387) (в выходящем датасете необходимы колонки: число, кол-во новых случаев вчера, кол-во новых случаев сегодня, дельта)


In [17]:
df_range = df.select("date", "location", "new_cases") \
    .where(df.date.between("2021-03-24", "2021-03-31"))

window = Window().partitionBy(df.location).orderBy(df.date)

df_range.select("date", "new_cases", lag(col("new_cases")).over(window).alias("new_cases_prev")) \
    .where(df.location == "Russia") \
    .select("date", "new_cases_prev", "new_cases", col("new_cases") - col("new_cases_prev")) \
    .withColumnRenamed("(new_cases - new_cases_prev)", "delta") \
    .where(df.date.between("2021-03-25", "2021-03-31")) \
    .show()

+-------------------+--------------+---------+------+
|               date|new_cases_prev|new_cases| delta|
+-------------------+--------------+---------+------+
|2021-03-25 00:00:00|        8769.0|   9128.0| 359.0|
|2021-03-26 00:00:00|        9128.0|   9073.0| -55.0|
|2021-03-27 00:00:00|        9073.0|   8783.0|-290.0|
|2021-03-28 00:00:00|        8783.0|   8979.0| 196.0|
|2021-03-29 00:00:00|        8979.0|   8589.0|-390.0|
|2021-03-30 00:00:00|        8589.0|   8162.0|-427.0|
|2021-03-31 00:00:00|        8162.0|   8156.0|  -6.0|
+-------------------+--------------+---------+------+

