In [82]:
from pyspark.sql import SparkSession
import pyspark.sql.functions as f
from pyspark.sql.window import Window



In [37]:
df = spark.read.csv("s3a://my-bucket/covid-data.csv", header=True,inferSchema=True )
spark = SparkSession.builder \
    .master("spark://spark-master:7077") \
    .config("spark.submit.deployMode", "client") \
    .config("spark.driver.host", "jupyter") \
    .appName("FixedSpark") \
    .config("spark.jars.packages",
            "org.apache.hadoop:hadoop-aws:3.3.4,com.amazonaws:aws-java-sdk-bundle:1.12.262") \
    .getOrCreate()
spark._jsc.hadoopConfiguration().set("fs.s3a.endpoint", "http://minio:9000")
spark._jsc.hadoopConfiguration().set("fs.s3a.access.key", "minio")
spark._jsc.hadoopConfiguration().set("fs.s3a.secret.key", "minio123")
spark._jsc.hadoopConfiguration().set("fs.s3a.path.style.access", "true")
spark._jsc.hadoopConfiguration().set("fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem")

In [38]:
top_15_location  = (
    df.select('iso_code', 'location', 'total_cases_per_million').where(f.col('date')=='2020-03-04')
      .groupBy('iso_code', 'location')
      .agg(f.max('total_cases_per_million').alias('max_total_cases_per_million')).orderBy(f.col('max_total_cases_per_million').desc())
)
top_15_location.show(15)

+--------+-------------+---------------------------+
|iso_code|     location|max_total_cases_per_million|
+--------+-------------+---------------------------+
|     SMR|   San Marino|                    471.448|
|     KOR|  South Korea|                    109.637|
|     ISL|      Iceland|                      76.19|
|     CHN|        China|                      55.85|
|     ITA|        Italy|                      51.09|
|     IRN|         Iran|                     34.789|
|     BHR|      Bahrain|                      30.56|
|     LIE|Liechtenstein|                     26.221|
|     MCO|       Monaco|                     25.482|
|OWID_ASI|         Asia|                     19.354|
|     SGP|    Singapore|                     18.802|
|     KWT|       Kuwait|                     13.113|
|     AND|      Andorra|                     12.942|
|OWID_WRL|        World|                     12.223|
|     CHE|  Switzerland|                     10.399|
+--------+-------------+----------------------

In [63]:
top_10_locatio = (
    df.select('total_cases','new_cases','location').where(f.col('date').between('2021-03-24','2021-03-31')).groupBy('location').agg(f.sum('total_cases'),f.sum('new_cases')).orderBy(f.sum('total_cases').desc())
    )
top_10_locatio.show(10)

+--------------+----------------+--------------+
|      location|sum(total_cases)|sum(new_cases)|
+--------------+----------------+--------------+
|         World|   1.015129809E9|     4690715.0|
|        Europe|      3.130306E8|     1729953.0|
| North America|    2.78949704E8|      643334.0|
| United States|    2.41931173E8|      535260.0|
|          Asia|    2.23464588E8|     1271046.0|
|European Union|    2.09661006E8|     1390356.0|
| South America|    1.65956486E8|      951391.0|
|        Brazil|     9.9950115E7|      618728.0|
|         India|     9.6021219E7|      487607.0|
|        France|     3.6591914E7|      331461.0|
+--------------+----------------+--------------+
only showing top 10 rows



In [123]:
rf = (
    df2.select(
        'location',
        'date',
        "total_cases",
        'new_cases',
        f.lag('new_cases', 1).over(Window.partitionBy('location').orderBy('date')).alias('new_cases_1_day_ago')
    )
    .where(f.col('location') == 'Russia').where(f.col('date').between('2021-03-24','2021-03-31'))
    .groupBy('location','date')
    .agg(
        f.sum('total_cases').alias('total_cases'),
        f.sum('new_cases').alias('new_cases'),
        f.sum('new_cases_1_day_ago').alias('new_cases_1_day_ago'),
        f.sum( (f.col('new_cases') - f.col('new_cases_1_day_ago')).cast('double') ).alias("sum_delta_vs_prev")
    )
)

rf.select('total_cases','new_cases','new_cases_1_day_ago','sum_delta_vs_prev').show()


+-----------+---------+-------------------+-----------------+
|total_cases|new_cases|new_cases_1_day_ago|sum_delta_vs_prev|
+-----------+---------+-------------------+-----------------+
|  4433364.0|   8769.0|             8369.0|            400.0|
|  4442492.0|   9128.0|             8769.0|            359.0|
|  4451565.0|   9073.0|             9128.0|            -55.0|
|  4460348.0|   8783.0|             9073.0|           -290.0|
|  4469327.0|   8979.0|             8783.0|            196.0|
|  4477916.0|   8589.0|             8979.0|           -390.0|
|  4486078.0|   8162.0|             8589.0|           -427.0|
|  4494234.0|   8156.0|             8162.0|             -6.0|
+-----------+---------+-------------------+-----------------+

