<a href="https://colab.research.google.com/github/etherest18/netology/blob/main/Homework_PySpark_SQL.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [2]:
pip install --quiet pyspark

In [3]:
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("Practice").getOrCreate()

In [4]:
from pyspark import SparkFiles

covid_data_file_url = "https://raw.githubusercontent.com/glincow/netology-spark-sql/main/data/covid-data.csv"
spark.sparkContext.addFile(covid_data_file_url)
file_path  = 'file://' + SparkFiles.get('covid-data.csv')
df = spark.read.option('inferSchema', 'true').option('header', 'true').csv(file_path)

In [8]:
from pyspark.sql import functions as F

In [6]:
df.printSchema()

root
 |-- iso_code: string (nullable = true)
 |-- continent: string (nullable = true)
 |-- location: string (nullable = true)
 |-- date: date (nullable = true)
 |-- total_cases: double (nullable = true)
 |-- new_cases: double (nullable = true)
 |-- new_cases_smoothed: double (nullable = true)
 |-- total_deaths: double (nullable = true)
 |-- new_deaths: double (nullable = true)
 |-- new_deaths_smoothed: double (nullable = true)
 |-- total_cases_per_million: double (nullable = true)
 |-- new_cases_per_million: double (nullable = true)
 |-- new_cases_smoothed_per_million: double (nullable = true)
 |-- total_deaths_per_million: double (nullable = true)
 |-- new_deaths_per_million: double (nullable = true)
 |-- new_deaths_smoothed_per_million: double (nullable = true)
 |-- reproduction_rate: double (nullable = true)
 |-- icu_patients: double (nullable = true)
 |-- icu_patients_per_million: double (nullable = true)
 |-- hosp_patients: double (nullable = true)
 |-- hosp_patients_per_million: 

Задание 1

In [63]:
df_1 = df.select("iso_code", "location", "population", "total_cases", ((F.col("total_cases") / F.col("population")) * 100).alias("% переболевших"))\
  .where(F.col("date") == '2021-03-31')\
  .sort(F.col("% переболевших").desc()).limit(15)

df_1.show()

+--------+-------------+------------+-----------+------------------+
|iso_code|     location|  population|total_cases|    % переболевших|
+--------+-------------+------------+-----------+------------------+
|     AND|      Andorra|     77265.0|    12010.0|15.543907331909661|
|     MNE|   Montenegro|    628062.0|    91218.0|14.523725364693293|
|     CZE|      Czechia| 1.0708982E7|  1532332.0|14.308848404077997|
|     SMR|   San Marino|     33938.0|     4730.0|13.937179562732041|
|     SVN|     Slovenia|   2078932.0|   215602.0|10.370805779121204|
|     LUX|   Luxembourg|    625976.0|    61642.0| 9.847342390123583|
|     ISR|       Israel|   8655541.0|   833105.0| 9.625106044786802|
|     USA|United States|3.31002647E8| 3.046221E7| 9.203010995860707|
|     SRB|       Serbia|   6804596.0|   600596.0| 8.826328557933492|
|     BHR|      Bahrain|   1701583.0|   144445.0| 8.488860079114566|
|     PAN|       Panama|   4314768.0|   355051.0| 8.228739065460761|
|     PRT|     Portugal| 1.0196707

Задание 2

In [72]:
from pyspark.sql import Window

window = Window.partitionBy("iso_code", "location")

df_2 =df.select("iso_code", "location", "new_cases", "date")\
  .where(F.col("date").between('2021-03-25', '2021-03-31'))

df_2 = df_2.withColumn("max_cases", F.max("new_cases").over(window))\
  .select("date", "location", "max_cases")\
  .where((F.col("new_cases") == F.col("max_cases")) & ~(F.col("iso_code").startswith("OWID")))\
  .sort(F.col("max_cases").desc()).limit(10)

df_2.show()

+----------+-------------+---------+
|      date|     location|max_cases|
+----------+-------------+---------+
|2021-03-25|       Brazil| 100158.0|
|2021-03-26|United States|  77321.0|
|2021-03-31|        India|  72330.0|
|2021-03-31|       France|  59054.0|
|2021-03-31|       Turkey|  39302.0|
|2021-03-26|       Poland|  35145.0|
|2021-03-31|      Germany|  25014.0|
|2021-03-26|        Italy|  24076.0|
|2021-03-25|         Peru|  19206.0|
|2021-03-26|      Ukraine|  18226.0|
+----------+-------------+---------+



Задание 3

In [87]:
w = Window.orderBy("date")

df_3 = df.select("date", "new_cases" )\
  .where((F.col("iso_code") == 'RUS') & (F.col("date").between('2021-03-24', '2021-03-31')))

df_3 = df_3.withColumn("yesterday_cases", F.lag("new_cases").over(w))\
  .select("date", "yesterday_cases", "new_cases", (F.col("new_cases") - F.col("yesterday_cases")).alias("delta")).where(F.col("delta").isNotNull())

df_3.show()

+----------+---------------+---------+------+
|      date|yesterday_cases|new_cases| delta|
+----------+---------------+---------+------+
|2021-03-25|         8769.0|   9128.0| 359.0|
|2021-03-26|         9128.0|   9073.0| -55.0|
|2021-03-27|         9073.0|   8783.0|-290.0|
|2021-03-28|         8783.0|   8979.0| 196.0|
|2021-03-29|         8979.0|   8589.0|-390.0|
|2021-03-30|         8589.0|   8162.0|-427.0|
|2021-03-31|         8162.0|   8156.0|  -6.0|
+----------+---------------+---------+------+

