In [1]:
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql import Window
spark = SparkSession.builder.appName('HomeWork').master('local').getOrCreate()
df = spark.read.option('header', True).option('inferSchema', True).csv('c:\degp6\owid-covid-data.csv')

In [2]:
df1 = df.select('iso_code', 'location', F.col('total_cases') / F.col('population') * 100 ) \
        .where(F.col('date')=='2021-03-31')                                                \
        .where(F.col('iso_code').contains('OWID_')==False)                                 \
        .withColumnRenamed('((total_cases / population) * 100)', 'cases_percent')          \
        .orderBy(F.col('cases_percent').desc())
df1.show(15)

+--------+-------------+------------------+
|iso_code|     location|     cases_percent|
+--------+-------------+------------------+
|     AND|      Andorra|15.543907331909661|
|     MNE|   Montenegro|14.523725364693293|
|     CZE|      Czechia|14.308848404077997|
|     SMR|   San Marino|13.937179562732041|
|     SVN|     Slovenia|10.370805779121204|
|     LUX|   Luxembourg| 9.847342390123583|
|     ISR|       Israel| 9.625106044786802|
|     USA|United States| 9.203010995860707|
|     SRB|       Serbia| 8.826328557933492|
|     BHR|      Bahrain| 8.488860079114566|
|     PAN|       Panama| 8.228739065460761|
|     PRT|     Portugal| 8.058699735120369|
|     EST|      Estonia| 8.022681579659551|
|     SWE|       Sweden| 7.969744347858805|
|     LTU|    Lithuania| 7.938864728274825|
+--------+-------------+------------------+
only showing top 15 rows



In [3]:
df2 = df.select('date', 'iso_code', 'location', 'new_cases')                                                      \
        .where(F.col('date').between('2021-03-25', '2021-03-31'))                                                 \
        .where(F.col('iso_code').contains('OWID_')==False)                                                        \
        .withColumn('rn', F.row_number().over(Window.partitionBy('location').orderBy(F.col('new_cases').desc()))) \
        .where(F.col('rn') == 1)                                                                                  \
        .orderBy(F.col('new_cases').desc())                                                                       \
        .select('date', 'iso_code', 'location', 'new_cases')
df2.show(10)

+----------+--------+-------------+---------+
|      date|iso_code|     location|new_cases|
+----------+--------+-------------+---------+
|2021-03-25|     BRA|       Brazil| 100158.0|
|2021-03-26|     USA|United States|  77321.0|
|2021-03-31|     IND|        India|  72330.0|
|2021-03-31|     FRA|       France|  59054.0|
|2021-03-31|     TUR|       Turkey|  39302.0|
|2021-03-26|     POL|       Poland|  35145.0|
|2021-03-31|     DEU|      Germany|  25014.0|
|2021-03-26|     ITA|        Italy|  24076.0|
|2021-03-25|     PER|         Peru|  19206.0|
|2021-03-26|     UKR|      Ukraine|  18226.0|
+----------+--------+-------------+---------+
only showing top 10 rows



In [4]:
df3 = df.select('date', 'iso_code', 'location', 'new_cases')                                         \
        .where(F.col('location')=='Russia')                                                          \
        .where(F.col('date').between('2021-03-24', '2021-03-31'))                                    \
        .withColumn('new_cases_ystd', F.lag('new_cases').over(Window.orderBy(F.col('date'))))        \
        .select('date', 'new_cases_ystd', 'new_cases', F.col('new_cases') - F.col('new_cases_ystd')) \
        .where(F.col('date').between('2021-03-25', '2021-03-31'))
df3.show()

+----------+--------------+---------+----------------------------+
|      date|new_cases_ystd|new_cases|(new_cases - new_cases_ystd)|
+----------+--------------+---------+----------------------------+
|2021-03-25|        8769.0|   9128.0|                       359.0|
|2021-03-26|        9128.0|   9073.0|                       -55.0|
|2021-03-27|        9073.0|   8783.0|                      -290.0|
|2021-03-28|        8783.0|   8979.0|                       196.0|
|2021-03-29|        8979.0|   8589.0|                      -390.0|
|2021-03-30|        8589.0|   8162.0|                      -427.0|
|2021-03-31|        8162.0|   8156.0|                        -6.0|
+----------+--------------+---------+----------------------------+

