In [1]:
import findspark
findspark.init('C:\\Spark\\spark-2.4.7-bin-hadoop2.7')

In [2]:
from pyspark.sql import SparkSession

spark = SparkSession \
        .builder \
        .appName("EDA") \
        .getOrCreate()

In [3]:
spark.sparkContext.setLogLevel("ERROR")

In [4]:
import pyspark.sql.functions as F
from pyspark.sql.types import StructType, StructField, StringType, FloatType, TimestampType, IntegerType, DoubleType, DateType

In [5]:
df_confirmed = spark.read.format('csv') \
.options(header='true') \
.load('../data/covid19_paris/confirmed_data.csv')

In [6]:
columns_to_drop = ["Lat","Long","Province/State"]

In [7]:
df_confirmed = df_confirmed.filter(df_confirmed.Lat == "46.2276")\
.filter(df_confirmed.Long == "2.2137")\
.drop(*columns_to_drop)\
.withColumnRenamed("Country/Region", "country")\

In [9]:
# source https://stackoverflow.com/questions/37864222/transpose-column-to-row-with-spark
def to_long(df, by):

    # Filter dtypes and split into column names and type description
    cols, dtypes = zip(*((c, t) for (c, t) in df.dtypes if c not in by))
    # Spark SQL supports only homogeneous columns
    assert len(set(dtypes)) == 1, "All columns have to be of the same type"

    # Create and explode an array of (column_name, column_value) structs
    kvs = F.explode(F.array([
      F.struct(F.lit(c).alias("key"), F.col(c).alias("val")) for c in cols
    ])).alias("kvs")

    return df.select(by + [kvs]).select(by + ["kvs.key", "kvs.val"])

In [10]:
df_confirmed = to_long(df_confirmed, ["country"]).withColumnRenamed("val", "confirmed")

In [11]:
df_confirmed.show()

+-------+-------+---------+
|country|    key|confirmed|
+-------+-------+---------+
| France|1/22/20|        0|
| France|1/23/20|        0|
| France|1/24/20|        2|
| France|1/25/20|        3|
| France|1/26/20|        3|
| France|1/27/20|        3|
| France|1/28/20|        4|
| France|1/29/20|        5|
| France|1/30/20|        5|
| France|1/31/20|        5|
| France| 2/1/20|        6|
| France| 2/2/20|        6|
| France| 2/3/20|        6|
| France| 2/4/20|        6|
| France| 2/5/20|        6|
| France| 2/6/20|        6|
| France| 2/7/20|        6|
| France| 2/8/20|       11|
| France| 2/9/20|       11|
| France|2/10/20|       11|
+-------+-------+---------+
only showing top 20 rows



In [12]:
df_deaths = spark.read.format('csv') \
.options(header='true') \
.load('../data/covid19_paris/deaths_data.csv')

In [13]:
df_deaths = df_deaths.filter(df_deaths.Lat == "46.2276")\
.filter(df_deaths.Long == "2.2137")\
.drop(*columns_to_drop)\
.withColumnRenamed("Country/Region", "country")

In [14]:
df_deaths = to_long(df_deaths, ["country"]).withColumnRenamed("val", "deaths")

In [15]:
df_deaths.show()

+-------+-------+------+
|country|    key|deaths|
+-------+-------+------+
| France|1/22/20|     0|
| France|1/23/20|     0|
| France|1/24/20|     0|
| France|1/25/20|     0|
| France|1/26/20|     0|
| France|1/27/20|     0|
| France|1/28/20|     0|
| France|1/29/20|     0|
| France|1/30/20|     0|
| France|1/31/20|     0|
| France| 2/1/20|     0|
| France| 2/2/20|     0|
| France| 2/3/20|     0|
| France| 2/4/20|     0|
| France| 2/5/20|     0|
| France| 2/6/20|     0|
| France| 2/7/20|     0|
| France| 2/8/20|     0|
| France| 2/9/20|     0|
| France|2/10/20|     0|
+-------+-------+------+
only showing top 20 rows



In [16]:
df_recovered = spark.read.format('csv') \
.options(header='true') \
.load('../data/covid19_paris/recovered_data.csv')

In [17]:
df_recovered = df_recovered.filter(df_recovered.Lat == "46.2276")\
.filter(df_recovered.Long == "2.2137")\
.drop(*columns_to_drop)\
.withColumnRenamed("Country/Region", "country")

In [18]:
df_recovered = to_long(df_recovered, ["country"])\
.withColumnRenamed("val", "recovered")

In [19]:
df_recovered.show()

+-------+-------+---------+
|country|    key|recovered|
+-------+-------+---------+
| France|1/22/20|        0|
| France|1/23/20|        0|
| France|1/24/20|        0|
| France|1/25/20|        0|
| France|1/26/20|        0|
| France|1/27/20|        0|
| France|1/28/20|        0|
| France|1/29/20|        0|
| France|1/30/20|        0|
| France|1/31/20|        0|
| France| 2/1/20|        0|
| France| 2/2/20|        0|
| France| 2/3/20|        0|
| France| 2/4/20|        0|
| France| 2/5/20|        0|
| France| 2/6/20|        0|
| France| 2/7/20|        0|
| France| 2/8/20|        0|
| France| 2/9/20|        0|
| France|2/10/20|        0|
+-------+-------+---------+
only showing top 20 rows



In [20]:
df_covid = df_confirmed.join(df_deaths, ["key","country"])\
                .join(df_recovered, ["key","country"])\
                .withColumnRenamed("key", "date")\
                .drop("country")

In [22]:
split_col = F.split(df_covid['date'], '/')
df_covid = df_covid.withColumn('day', split_col.getItem(1))\
        .withColumn('month', split_col.getItem(0))\
        .withColumn('year', split_col.getItem(2))\

In [23]:
table_covid = df_covid.withColumn('month', F.when(F.length(F.col('month')) == 1, F.concat(F.lit('0'), F.col('month'))).otherwise(F.col('month')))\
        .withColumn('day', F.when(F.length(F.col('day')) == 1, F.concat(F.lit('0'), F.col('day'))).otherwise(F.col('day')))\
        .withColumn('year', F.when(F.length(F.col('year')) == 2, F.concat(F.lit('20'), F.col('year'))).otherwise(F.col('year')))\
        .withColumn('date', F.concat('year', F.lit('-'),'month',F.lit('-'), 'day'))\
        .withColumn("date", F.to_date(F.col("date")))

In [24]:
table_covid.show()

+----------+---------+------+---------+---+-----+----+
|      date|confirmed|deaths|recovered|day|month|year|
+----------+---------+------+---------+---+-----+----+
|2020-01-22|        0|     0|        0| 22|   01|2020|
|2020-01-23|        0|     0|        0| 23|   01|2020|
|2020-01-24|        2|     0|        0| 24|   01|2020|
|2020-01-25|        3|     0|        0| 25|   01|2020|
|2020-01-26|        3|     0|        0| 26|   01|2020|
|2020-01-27|        3|     0|        0| 27|   01|2020|
|2020-01-28|        4|     0|        0| 28|   01|2020|
|2020-01-29|        5|     0|        0| 29|   01|2020|
|2020-01-30|        5|     0|        0| 30|   01|2020|
|2020-01-31|        5|     0|        0| 31|   01|2020|
|2020-02-01|        6|     0|        0| 01|   02|2020|
|2020-02-02|        6|     0|        0| 02|   02|2020|
|2020-02-03|        6|     0|        0| 03|   02|2020|
|2020-02-04|        6|     0|        0| 04|   02|2020|
|2020-02-05|        6|     0|        0| 05|   02|2020|
|2020-02-0

In [25]:
table_covid.write.partitionBy('year', 'month').parquet('../output_tables/covid_table', mode='overwrite')