# ***Домашнее задание по теме «Spark SQL»***

    1. Выберите 15 стран с наибольшим процентом переболевших на 31 марта
    (в выходящем датасете необходимы колонки: iso_code, страна, процент переболевших)
    2. Top 10 стран с максимальным зафиксированным кол-вом новых случаев за
    последнюю неделю марта 2021 в отсортированном порядке по убыванию
    (в выходящем датасете необходимы колонки: число, страна, кол-во новых случаев)
    3. Посчитайте изменение случаев относительно предыдущего дня в России за
    последнюю неделю марта 2021. (например: в россии вчера было 9150 , сегодня 8763, итог: -387) (в выходящем датасете необходимы колонки: число,
    кол-во новых случаев вчера, кол-во новых случаев сегодня, дельта)

In [30]:
pip install pyspark



In [31]:
from pyspark.sql import SparkSession
from pyspark.sql.window import Window
from pyspark.sql import functions as f
import os

In [32]:
spark = SparkSession.builder.appName('MyPySpark').getOrCreate()

In [None]:
spark

In [33]:
df = spark.read.load("owid-covid-data.csv",
                     format="csv", sep=",", inferSchema="true", header="true")

In [34]:
windowSpec = Window()

In [35]:
df.printSchema()

root
 |-- iso_code: string (nullable = true)
 |-- continent: string (nullable = true)
 |-- location: string (nullable = true)
 |-- date: date (nullable = true)
 |-- total_cases: double (nullable = true)
 |-- new_cases: double (nullable = true)
 |-- new_cases_smoothed: double (nullable = true)
 |-- total_deaths: double (nullable = true)
 |-- new_deaths: double (nullable = true)
 |-- new_deaths_smoothed: double (nullable = true)
 |-- total_cases_per_million: double (nullable = true)
 |-- new_cases_per_million: double (nullable = true)
 |-- new_cases_smoothed_per_million: double (nullable = true)
 |-- total_deaths_per_million: double (nullable = true)
 |-- new_deaths_per_million: double (nullable = true)
 |-- new_deaths_smoothed_per_million: double (nullable = true)
 |-- reproduction_rate: double (nullable = true)
 |-- icu_patients: double (nullable = true)
 |-- icu_patients_per_million: double (nullable = true)
 |-- hosp_patients: double (nullable = true)
 |-- hosp_patients_per_million: 

In [None]:
df.select([
    'date',
    'iso_code',
    'location',
    'total_cases',
    'population']).show()

+----------+--------+-----------+-----------+-----------+
|      date|iso_code|   location|total_cases| population|
+----------+--------+-----------+-----------+-----------+
|2020-02-24|     AFG|Afghanistan|        1.0|3.8928341E7|
|2020-02-25|     AFG|Afghanistan|        1.0|3.8928341E7|
|2020-02-26|     AFG|Afghanistan|        1.0|3.8928341E7|
|2020-02-27|     AFG|Afghanistan|        1.0|3.8928341E7|
|2020-02-28|     AFG|Afghanistan|        1.0|3.8928341E7|
|2020-02-29|     AFG|Afghanistan|        1.0|3.8928341E7|
|2020-03-01|     AFG|Afghanistan|        1.0|3.8928341E7|
|2020-03-02|     AFG|Afghanistan|        1.0|3.8928341E7|
|2020-03-03|     AFG|Afghanistan|        2.0|3.8928341E7|
|2020-03-04|     AFG|Afghanistan|        4.0|3.8928341E7|
|2020-03-05|     AFG|Afghanistan|        4.0|3.8928341E7|
|2020-03-06|     AFG|Afghanistan|        4.0|3.8928341E7|
|2020-03-07|     AFG|Afghanistan|        4.0|3.8928341E7|
|2020-03-08|     AFG|Afghanistan|        5.0|3.8928341E7|
|2020-03-09|  

Проверим какие локации маркируются OWID

In [None]:
data = df.filter(f.col('iso_code').contains('OWID')).select([
    'iso_code',
    'location']).distinct().show()

+--------+--------+
|iso_code|location|
+--------+--------+
|OWID_ASI|    Asia|
|OWID_AFR|  Africa|
+--------+--------+



In [None]:
data_ = df.filter(
    (f.col('date') <= f.lit('2022-03-31')) &
    (~f.col('iso_code').contains('OWID'))).select([
    'iso_code',
    'location',
    'total_cases',
    'population'])

In [None]:
data_1 = data_.groupBy('iso_code', 'location').agg((f.max('total_cases')/ f.max('population') * 100).alias('percent')).orderBy(f.col(
    'percent').desc()).limit(15)
data_1 = data_1.withColumn('percent', f.concat(f.round(data_1['percent'], 2), f.lit('%')))

In [None]:
data_1.show()

+--------+--------------------+-------+
|iso_code|            location|percent|
+--------+--------------------+-------+
|     AND|             Andorra| 16.53%|
|     BHR|             Bahrain|  9.59%|
|     BEL|             Belgium|  8.17%|
|     ARM|             Armenia|  7.02%|
|     AUT|             Austria|  6.57%|
|     BRA|              Brazil|  6.54%|
|     ARG|           Argentina|  5.92%|
|     CHL|               Chile|  5.85%|
|     BIH|Bosnia and Herzeg...|  5.76%|
|     BGR|            Bulgaria|  5.55%|
|     COL|            Colombia|  5.18%|
|     ALB|             Albania|   4.5%|
|     BLR|             Belarus|  3.63%|
|     CPV|          Cape Verde|  3.59%|
|     BLZ|              Belize|  3.15%|
+--------+--------------------+-------+



In [None]:
date_2 = df.filter(
    ((f.col('date') >= f.lit('2021-03-25')) & (f.col('date') <= f.lit('2021-03-31'))) &
    ((~f.col('iso_code').contains('OWID')))
).select([
    'location',
    'new_cases',
    'date']).withColumn('date', f.col('date').cast('date'))

In [None]:
date_2.select(f.col('date').cast('date')).distinct().show(10)

+----------+
|      date|
+----------+
|2021-03-26|
|2021-03-25|
|2021-03-30|
|2021-03-27|
|2021-03-29|
|2021-03-28|
|2021-03-31|
+----------+



In [None]:
date_2.show()

+-----------+---------+----------+
|   location|new_cases|      date|
+-----------+---------+----------+
|Afghanistan|     34.0|2021-03-25|
|Afghanistan|     28.0|2021-03-26|
|Afghanistan|     36.0|2021-03-27|
|Afghanistan|      4.0|2021-03-28|
|Afghanistan|     28.0|2021-03-29|
|Afghanistan|     62.0|2021-03-30|
|Afghanistan|     70.0|2021-03-31|
|    Albania|    472.0|2021-03-25|
|    Albania|    449.0|2021-03-26|
|    Albania|    425.0|2021-03-27|
|    Albania|    493.0|2021-03-28|
|    Albania|    285.0|2021-03-29|
|    Albania|    304.0|2021-03-30|
|    Albania|    434.0|2021-03-31|
|    Algeria|    105.0|2021-03-25|
|    Algeria|    114.0|2021-03-26|
|    Algeria|     93.0|2021-03-27|
|    Algeria|     86.0|2021-03-28|
|    Algeria|    110.0|2021-03-29|
|    Algeria|    115.0|2021-03-30|
+-----------+---------+----------+
only showing top 20 rows



In [None]:
date_2_ = date_2.withColumn('row_number',f.row_number().over(
    windowSpec.partitionBy('location').orderBy(f.col('new_cases').desc())
)).filter(
    f.col('row_number') == f.lit('1')).select([
    'location',
    'new_cases',
    'date']).orderBy(f.col(
    'new_cases').desc()).limit(10)

In [None]:
date_2_.show()

+----------+---------+----------+
|  location|new_cases|      date|
+----------+---------+----------+
|    Brazil| 100158.0|2021-03-25|
| Argentina|  16056.0|2021-03-31|
|    Canada|   8728.0|2021-03-31|
|  Colombia|   8646.0|2021-03-31|
|     Chile|   7592.0|2021-03-27|
|  Cameroon|   7047.0|2021-03-26|
|   Belgium|   6123.0|2021-03-26|
|Bangladesh|   5358.0|2021-03-31|
|  Bulgaria|   5176.0|2021-03-30|
|   Austria|   3895.0|2021-03-26|
+----------+---------+----------+



In [62]:
date_3 = df.filter(
    (f.col('date') >= f.lit('2021-03-24')) &
    (f.col('date') <= f.lit('2021-03-31')) &
    (f.col('iso_code') == f.lit('RUS'))
    ).select([
    'date',
    'new_cases']).withColumn('date', f.col('date').cast('date'))

In [63]:
date_3.show()

+----------+---------+
|      date|new_cases|
+----------+---------+
|2021-03-24|   8769.0|
|2021-03-25|   9128.0|
|2021-03-26|   9073.0|
|2021-03-27|   8783.0|
|2021-03-28|   8979.0|
|2021-03-29|   8589.0|
|2021-03-30|   8162.0|
|2021-03-31|   8156.0|
+----------+---------+



In [85]:
date_3 = date_3.withColumn(
    'prev_day_cases', f.lag(f.col('new_cases'),1)\
    .over(windowSpec.partitionBy(f.lit(0)).orderBy('date')))\
.withColumn('prev_day_cases', f.coalesce('prev_day_cases', 'new_cases'))\
  .filter(
     (f.col('date') >= f.lit('2021-03-25')))

In [86]:
date_3_ = date_3.withColumn('delta', f.col('new_cases') - f.col('prev_day_cases'))

In [87]:
date_3_.show()

+----------+---------+--------------+------+
|      date|new_cases|prev_day_cases| delta|
+----------+---------+--------------+------+
|2021-03-25|   9128.0|        8769.0| 359.0|
|2021-03-26|   9073.0|        9128.0| -55.0|
|2021-03-27|   8783.0|        9073.0|-290.0|
|2021-03-28|   8979.0|        8783.0| 196.0|
|2021-03-29|   8589.0|        8979.0|-390.0|
|2021-03-30|   8162.0|        8589.0|-427.0|
|2021-03-31|   8156.0|        8162.0|  -6.0|
+----------+---------+--------------+------+

