In [1]:
pip install --quiet pyspark

In [2]:
#импорт нужных библиотек и типов данных
from pyspark.sql import SparkSession, Window
from pyspark.sql import functions as f
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, BooleanType, DateType
from pyspark import SparkFiles
#запуск сессии
spark = SparkSession.builder.appName("Practice").getOrCreate()

In [3]:
#открытие файла, его чтение
covid_data_file_url = "https://raw.githubusercontent.com/glincow/netology-spark-sql/main/data/covid-data.csv"
spark.sparkContext.addFile(covid_data_file_url)
file_path  = 'file://' + SparkFiles.get('covid-data.csv')
df = spark.read.option('inferSchema', 'true').option('header', 'true').csv(file_path)

In [4]:
#изучим колонки в наличии, их типы
df.printSchema()

root
 |-- iso_code: string (nullable = true)
 |-- continent: string (nullable = true)
 |-- location: string (nullable = true)
 |-- date: date (nullable = true)
 |-- total_cases: double (nullable = true)
 |-- new_cases: double (nullable = true)
 |-- new_cases_smoothed: double (nullable = true)
 |-- total_deaths: double (nullable = true)
 |-- new_deaths: double (nullable = true)
 |-- new_deaths_smoothed: double (nullable = true)
 |-- total_cases_per_million: double (nullable = true)
 |-- new_cases_per_million: double (nullable = true)
 |-- new_cases_smoothed_per_million: double (nullable = true)
 |-- total_deaths_per_million: double (nullable = true)
 |-- new_deaths_per_million: double (nullable = true)
 |-- new_deaths_smoothed_per_million: double (nullable = true)
 |-- reproduction_rate: double (nullable = true)
 |-- icu_patients: double (nullable = true)
 |-- icu_patients_per_million: double (nullable = true)
 |-- hosp_patients: double (nullable = true)
 |-- hosp_patients_per_million: 

In [5]:
#посмотрим на первые 20 строк
df.show()

+--------+---------+-----------+----------+-----------+---------+------------------+------------+----------+-------------------+-----------------------+---------------------+------------------------------+------------------------+----------------------+-------------------------------+-----------------+------------+------------------------+-------------+-------------------------+---------------------+---------------------------------+----------------------+----------------------------------+---------+-----------+------------------------+----------------------+------------------+-------------------------------+-------------+--------------+-----------+------------------+-----------------+-----------------------+----------------+-------------------------+------------------------------+-----------------------------+-----------------------------------+-------------------------------------+----------------+-----------+------------------+----------+-------------+-------------+--------------+--

In [6]:
df.columns

['iso_code',
 'continent',
 'location',
 'date',
 'total_cases',
 'new_cases',
 'new_cases_smoothed',
 'total_deaths',
 'new_deaths',
 'new_deaths_smoothed',
 'total_cases_per_million',
 'new_cases_per_million',
 'new_cases_smoothed_per_million',
 'total_deaths_per_million',
 'new_deaths_per_million',
 'new_deaths_smoothed_per_million',
 'reproduction_rate',
 'icu_patients',
 'icu_patients_per_million',
 'hosp_patients',
 'hosp_patients_per_million',
 'weekly_icu_admissions',
 'weekly_icu_admissions_per_million',
 'weekly_hosp_admissions',
 'weekly_hosp_admissions_per_million',
 'new_tests',
 'total_tests',
 'total_tests_per_thousand',
 'new_tests_per_thousand',
 'new_tests_smoothed',
 'new_tests_smoothed_per_thousand',
 'positive_rate',
 'tests_per_case',
 'tests_units',
 'total_vaccinations',
 'people_vaccinated',
 'people_fully_vaccinated',
 'new_vaccinations',
 'new_vaccinations_smoothed',
 'total_vaccinations_per_hundred',
 'people_vaccinated_per_hundred',
 'people_fully_vaccinate

**Задача 1**

*Выберите 15 стран с наибольшим процентом переболевших на 31 марта (в выходящем датасете необходимы колонки: iso_code, страна, процент переболевших)*

In [7]:
'''
Сперва фильтруем по нужной дате, затем выбор колонок,
с которыми проведем манипуляции.
Далее для удобства присваиваем имя производной колонке,
сортируем ее значения, снова селектим колонки, но уже с точными названиями.
'''
top_15_infected = df.filter(f.col('date') == '2021-03-31')\
.select('iso_code', 'location','total_cases','location','population')\
.withColumn('percent', \
             f.col('total_cases') / f.col('population') * 100)\
.sort(f.col('percent').desc()).select('iso_code', 'location',
                                      'total_cases','percent').limit(15).show()
#repartition(1).write.csv("task1.1", header = True)

+--------+-------------+-----------+------------------+
|iso_code|     location|total_cases|           percent|
+--------+-------------+-----------+------------------+
|     AND|      Andorra|    12010.0|15.543907331909661|
|     MNE|   Montenegro|    91218.0|14.523725364693293|
|     CZE|      Czechia|  1532332.0|14.308848404077997|
|     SMR|   San Marino|     4730.0|13.937179562732041|
|     SVN|     Slovenia|   215602.0|10.370805779121204|
|     LUX|   Luxembourg|    61642.0| 9.847342390123583|
|     ISR|       Israel|   833105.0| 9.625106044786802|
|     USA|United States| 3.046221E7| 9.203010995860707|
|     SRB|       Serbia|   600596.0| 8.826328557933492|
|     BHR|      Bahrain|   144445.0| 8.488860079114566|
|     PAN|       Panama|   355051.0| 8.228739065460761|
|     PRT|     Portugal|   821722.0| 8.058699735120369|
|     EST|      Estonia|   106424.0| 8.022681579659551|
|     SWE|       Sweden|   804886.0| 7.969744347858805|
|     LTU|    Lithuania|   216119.0| 7.938864728

**Задание 2**

Top 10 стран с максимальным зафиксированным кол-вом новых случаев за последнюю неделю марта 2021 в отсортированном порядке по убыванию
(в выходящем датасете необходимы колонки: число, страна, кол-во новых случаев)

In [8]:
#для нахождения максимального числа случаев, понадобится агрегация
'''
*nc - new cases. Чтобы получить все данные, сперва отфильтруем их,
проведем группировку, отсортируем и будем работать с ними.
'''
top_nc = df.filter('date between "2021-03-24" and "2021-03-31" \
and location not in ("World", "Europe", "Asia", "European Union", \
"South America",  "Africa", "North America")')

top_nc.groupBy('location').agg(f.max("new_cases").alias("new_cases")).\
sort(f.col('new_cases').desc()).\
limit(10).select('location','new_cases').\
join(top_nc, on = ['location', 'new_cases']).\
sort(f.col("new_cases").desc()).\
select('date','location','new_cases').show()

#.repartition(1).write.csv("task1.2", header = True)


+----------+-------------+---------+
|      date|     location|new_cases|
+----------+-------------+---------+
|2021-03-25|       Brazil| 100158.0|
|2021-03-24|United States|  86960.0|
|2021-03-31|        India|  72330.0|
|2021-03-24|       France|  65392.0|
|2021-03-31|       Turkey|  39302.0|
|2021-03-26|       Poland|  35145.0|
|2021-03-31|      Germany|  25014.0|
|2021-03-26|        Italy|  24076.0|
|2021-03-25|         Peru|  19206.0|
|2021-03-26|      Ukraine|  18226.0|
+----------+-------------+---------+



**Задача 3**

Посчитайте изменение случаев относительно предыдущего дня в России за последнюю неделю марта 2021. (например: в россии вчера было 9150 , сегодня 8763, итог: -387) (в выходящем датасете необходимы колонки: число, кол-во новых случаев вчера, кол-во новых случаев сегодня, дельта)

In [9]:
#Здесь необходима фильтрация по стране и датам
df.filter('iso_code = "RUS"').filter('date between "2021-03-24" and "2021-03-31"').\
withColumn('day_before', f.lag(f.col('new_cases')).over(Window.orderBy('date'))).\
withColumn('delta', f.col('new_cases') - f.col ('day_before')).\
select('date','day_before','new_cases','delta').show()
#.repartition(1).write.csv("task1.3", header = True)

+----------+----------+---------+------+
|      date|day_before|new_cases| delta|
+----------+----------+---------+------+
|2021-03-24|      NULL|   8769.0|  NULL|
|2021-03-25|    8769.0|   9128.0| 359.0|
|2021-03-26|    9128.0|   9073.0| -55.0|
|2021-03-27|    9073.0|   8783.0|-290.0|
|2021-03-28|    8783.0|   8979.0| 196.0|
|2021-03-29|    8979.0|   8589.0|-390.0|
|2021-03-30|    8589.0|   8162.0|-427.0|
|2021-03-31|    8162.0|   8156.0|  -6.0|
+----------+----------+---------+------+

