In [None]:
from pyspark.context import SparkContext
from pyspark.sql import SparkSession
sc = SparkContext()
spark = SparkSession(sc)


In [None]:

import pyspark.sql.functions as F
from pyspark.sql.window import Window


In [None]:

data = spark.read.csv('file:///home/kulkovil/test/covid-data.csv', header=True)
data = data.filter(F.length(data.iso_code) == 3)


In [None]:

data_1 = data.filter((data.date == '2021-03-31')).select('iso_code','location','population','total_cases')
data_1 = data_1.withColumn('case_percents', F.lit(100) * data.total_cases.cast('double') / data.population.cast('double'))
data_1 = data_1.select('iso_code','location','case_percents').orderBy('case_percents', ascending=False)
data_1.show(15, False)


```
+--------+-------------+------------------+
|iso_code|location     |case_percents     |
+--------+-------------+------------------+
|AND     |Andorra      |15.543907331909661|
|MNE     |Montenegro   |14.523725364693295|
|CZE     |Czechia      |14.308848404077997|
|SMR     |San Marino   |13.937179562732041|
|SVN     |Slovenia     |10.370805779121202|
|LUX     |Luxembourg   |9.847342390123583 |
|ISR     |Israel       |9.625106044786802 |
|USA     |United States|9.203010995860707 |
|SRB     |Serbia       |8.826328557933492 |
|BHR     |Bahrain      |8.488860079114566 |
|PAN     |Panama       |8.228739065460761 |
|PRT     |Portugal     |8.058699735120367 |
|EST     |Estonia      |8.022681579659551 |
|SWE     |Sweden       |7.969744347858805 |
|LTU     |Lithuania    |7.938864728274824 |
+--------+-------------+------------------+
only showing top 15 rows
```


In [None]:

data_2 = data.filter(data.date.between('2021-03-25', '2021-03-31')).select('iso_code','date','location','new_cases')
data_2 = data_2.withColumn('new_cases', data_2.new_cases.cast('double'))
data_m = data_2.groupBy('iso_code', 'location').agg(F.max(data_2.new_cases).alias('max_new_cases'))
data_m = data_m.join(data_2, ['iso_code', 'location']).filter('new_cases == max_new_cases')
data_m = data_m.select('date','location','new_cases').orderBy('new_cases', ascending=False)
data_m.show(10, False)


```
+----------+-------------+---------+
|date      |location     |new_cases|
+----------+-------------+---------+
|2021-03-25|Brazil       |100158.0 |
|2021-03-26|United States|77321.0  |
|2021-03-31|India        |72330.0  |
|2021-03-31|France       |59054.0  |
|2021-03-31|Turkey       |39302.0  |
|2021-03-26|Poland       |35145.0  |
|2021-03-31|Germany      |25014.0  |
|2021-03-26|Italy        |24076.0  |
|2021-03-25|Peru         |19206.0  |
|2021-03-26|Ukraine      |18226.0  |
+----------+-------------+---------+
only showing top 10 rows
```


In [None]:

data_3 = data.filter(data.date.between('2021-03-24', '2021-03-31') & (data.iso_code == 'RUS')).select('iso_code', 'date','new_cases')
window = Window.partitionBy('iso_code').orderBy(data_3.date)
data_3 = data_3.withColumn('prev_cases', F.lag('new_cases').over(window))
data_3 = data_3.withColumn('delta', data_3.new_cases - data_3.prev_cases)
data_3.show()


```
+--------+----------+---------+----------+------+
|iso_code|      date|new_cases|prev_cases| delta|
+--------+----------+---------+----------+------+
|     RUS|2021-03-24|   8769.0|      null|  null|
|     RUS|2021-03-25|   9128.0|    8769.0| 359.0|
|     RUS|2021-03-26|   9073.0|    9128.0| -55.0|
|     RUS|2021-03-27|   8783.0|    9073.0|-290.0|
|     RUS|2021-03-28|   8979.0|    8783.0| 196.0|
|     RUS|2021-03-29|   8589.0|    8979.0|-390.0|
|     RUS|2021-03-30|   8162.0|    8589.0|-427.0|
|     RUS|2021-03-31|   8156.0|    8162.0|  -6.0|
+--------+----------+---------+----------+------+
```

