<a href="https://colab.research.google.com/github/finesun16/netology_dwh_final/blob/master/hw_spark_sql.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Установка PySpark

In [1]:
!apt-get install openjdk-8-jdk-headless -qq > /dev/null

In [2]:
!wget -q https://archive.apache.org/dist/spark/spark-3.0.0/spark-3.0.0-bin-hadoop3.2.tgz

In [3]:
!tar xf spark-3.0.0-bin-hadoop3.2.tgz

In [4]:
!pip install -q findspark

In [5]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.0.0-bin-hadoop3.2"

In [33]:
import findspark
findspark.init()
from pyspark.sql import SparkSession
from pyspark.sql import Window
from pyspark.sql import functions as F

spark = SparkSession.builder.master("local[*]").getOrCreate()

# Загрузка данных из CSV

In [7]:
df = spark.read.csv('owid-covid-data.csv', inferSchema=True, header=True)

# Базовый обзор данных

In [8]:
df.count()

82289

In [9]:
df.dtypes

[('iso_code', 'string'),
 ('continent', 'string'),
 ('location', 'string'),
 ('date', 'string'),
 ('total_cases', 'double'),
 ('new_cases', 'double'),
 ('new_cases_smoothed', 'double'),
 ('total_deaths', 'double'),
 ('new_deaths', 'double'),
 ('new_deaths_smoothed', 'double'),
 ('total_cases_per_million', 'double'),
 ('new_cases_per_million', 'double'),
 ('new_cases_smoothed_per_million', 'double'),
 ('total_deaths_per_million', 'double'),
 ('new_deaths_per_million', 'double'),
 ('new_deaths_smoothed_per_million', 'double'),
 ('reproduction_rate', 'double'),
 ('icu_patients', 'double'),
 ('icu_patients_per_million', 'double'),
 ('hosp_patients', 'double'),
 ('hosp_patients_per_million', 'double'),
 ('weekly_icu_admissions', 'double'),
 ('weekly_icu_admissions_per_million', 'double'),
 ('weekly_hosp_admissions', 'double'),
 ('weekly_hosp_admissions_per_million', 'double'),
 ('new_tests', 'double'),
 ('total_tests', 'double'),
 ('total_tests_per_thousand', 'double'),
 ('new_tests_per_tho

In [23]:
df.take(2)

[Row(iso_code='AFG', continent='Asia', location='Afghanistan', date='2020-02-24', total_cases=1.0, new_cases=1.0, new_cases_smoothed=None, total_deaths=None, new_deaths=None, new_deaths_smoothed=None, total_cases_per_million=0.026, new_cases_per_million=0.026, new_cases_smoothed_per_million=None, total_deaths_per_million=None, new_deaths_per_million=None, new_deaths_smoothed_per_million=None, reproduction_rate=None, icu_patients=None, icu_patients_per_million=None, hosp_patients=None, hosp_patients_per_million=None, weekly_icu_admissions=None, weekly_icu_admissions_per_million=None, weekly_hosp_admissions=None, weekly_hosp_admissions_per_million=None, new_tests=None, total_tests=None, total_tests_per_thousand=None, new_tests_per_thousand=None, new_tests_smoothed=None, new_tests_smoothed_per_thousand=None, positive_rate=None, tests_per_case=None, tests_units=None, total_vaccinations=None, people_vaccinated=None, people_fully_vaccinated=None, new_vaccinations=None, new_vaccinations_smoot

## Выбрать 15 стран с наибольшим процентом переболевших на 31 марта

In [25]:
df.where((df['date'] == '2021-03-31') & (df['continent'] != "null")).orderBy('total_cases_per_million', ascending=False).select('iso_code', 'location', 'total_cases_per_million').show(15)

+--------+-------------+-----------------------+
|iso_code|     location|total_cases_per_million|
+--------+-------------+-----------------------+
|     AND|      Andorra|             155439.073|
|     MNE|   Montenegro|             145237.254|
|     CZE|      Czechia|             143088.484|
|     SMR|   San Marino|             139371.796|
|     SVN|     Slovenia|             103708.058|
|     LUX|   Luxembourg|              98473.424|
|     ISR|       Israel|               96251.06|
|     USA|United States|               92030.11|
|     SRB|       Serbia|              88263.286|
|     BHR|      Bahrain|              84888.601|
|     PAN|       Panama|              82287.391|
|     PRT|     Portugal|              80586.997|
|     EST|      Estonia|              80226.816|
|     SWE|       Sweden|              79697.443|
|     LTU|    Lithuania|              79388.647|
+--------+-------------+-----------------------+
only showing top 15 rows



## Top 10 стран с максимальным зафиксированным кол-вом новых случаев за последнюю неделю марта 2021 в отсортированном порядке по убыванию

In [23]:
df.where((df['date'] <= '2021-03-31') & (df['date'] >= '2021-03-22') & (df['continent'] != "null"))\
             .groupBy(['iso_code', 'location']).max('new_cases')\
             .orderBy('max(new_cases)', ascending=False).show(10)


+--------+-------------+--------------+
|iso_code|     location|max(new_cases)|
+--------+-------------+--------------+
|     BRA|       Brazil|      100158.0|
|     USA|United States|       86960.0|
|     IND|        India|       72330.0|
|     FRA|       France|       65392.0|
|     TUR|       Turkey|       39302.0|
|     POL|       Poland|       35145.0|
|     DEU|      Germany|       25014.0|
|     ITA|        Italy|       24501.0|
|     PER|         Peru|       19206.0|
|     UKR|      Ukraine|       18226.0|
+--------+-------------+--------------+
only showing top 10 rows



## Посчитать изменение случаев относительно предыдущего дня в России за последнюю неделю марта 2021.

In [36]:
window = Window().partitionBy("iso_code").orderBy('date')

In [44]:
df \
    .withColumn('before_cases', F.lag('new_cases').over(window)).\
    select('iso_code', 'location', 'date', 'before_cases', 'new_cases', (F.col('before_cases') - F.col('new_cases')).alias('delta')).\
    where((df['date'].between('2021-03-24', '2021-03-31')) & (df['iso_code'] == "RUS"))\
    .show()

+--------+--------+----------+------------+---------+------+
|iso_code|location|      date|before_cases|new_cases| delta|
+--------+--------+----------+------------+---------+------+
|     RUS|  Russia|2021-03-24|      8369.0|   8769.0|-400.0|
|     RUS|  Russia|2021-03-25|      8769.0|   9128.0|-359.0|
|     RUS|  Russia|2021-03-26|      9128.0|   9073.0|  55.0|
|     RUS|  Russia|2021-03-27|      9073.0|   8783.0| 290.0|
|     RUS|  Russia|2021-03-28|      8783.0|   8979.0|-196.0|
|     RUS|  Russia|2021-03-29|      8979.0|   8589.0| 390.0|
|     RUS|  Russia|2021-03-30|      8589.0|   8162.0| 427.0|
|     RUS|  Russia|2021-03-31|      8162.0|   8156.0|   6.0|
+--------+--------+----------+------------+---------+------+

