In [1]:
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.window import Window
from pyspark.sql import types as T
import os
import sys
import traceback
import datetime as dt

In [2]:
spark = SparkSession.builder \
    .master("local[*]") \
    .appName("airflow_app") \
    .config('spark.executor.memory', '6g') \
    .config('spark.driver.memory', '6g') \
    .config("spark.driver.maxResultSize", "1048MB") \
    .config("spark.port.maxRetries", "100") \
    .getOrCreate()

sc = spark.sparkContext
#sc.setLogLevel("INFO")

your 131072x1 screen size is bogus. expect trouble
23/09/14 18:06:14 WARN Utils: Your hostname, DESKTOP-TME356J resolves to a loopback address: 127.0.1.1; using 172.29.31.176 instead (on interface eth0)
23/09/14 18:06:14 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
23/09/14 18:06:17 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [3]:
read_path = 'datalake/raw/covid19'
write_path = 'datalake/trusted'
partition_fields = ['ano', 'mes']
partition = 1

In [4]:
try:
    df = spark.read.format("csv") \
        .options(header='true', delimiter=',') \
        .load(read_path)
except Exception:
    print(traceback.format_exc())

                                                                                

In [5]:
cols_to_unpivot = [f"`{c}`, \'{c}\'" for c in df.columns if c not in ['Province/State','Country/Region','Lat','Long']]
stack_string = ", ".join(cols_to_unpivot)

In [6]:
df_unpivot = df.select(
    'Province/State','Country/Region','Lat','Long',
    F.expr(f"stack({len(cols_to_unpivot)}, {stack_string}) as (Cases, Date)")
)

In [7]:
df_unpivot = df_unpivot.withColumn('Date', F.to_timestamp('Date','M/d/yy'))

In [8]:
df_unpivot.show(10)

23/09/14 18:07:29 WARN package: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.
[Stage 1:>                                                          (0 + 1) / 1]

+--------------+--------------+--------+---------+-----+-------------------+
|Province/State|Country/Region|     Lat|     Long|Cases|               Date|
+--------------+--------------+--------+---------+-----+-------------------+
|          null|   Afghanistan|33.93911|67.709953|    0|2020-01-22 00:00:00|
|          null|   Afghanistan|33.93911|67.709953|    0|2020-01-23 00:00:00|
|          null|   Afghanistan|33.93911|67.709953|    0|2020-01-24 00:00:00|
|          null|   Afghanistan|33.93911|67.709953|    0|2020-01-25 00:00:00|
|          null|   Afghanistan|33.93911|67.709953|    0|2020-01-26 00:00:00|
|          null|   Afghanistan|33.93911|67.709953|    0|2020-01-27 00:00:00|
|          null|   Afghanistan|33.93911|67.709953|    0|2020-01-28 00:00:00|
|          null|   Afghanistan|33.93911|67.709953|    0|2020-01-29 00:00:00|
|          null|   Afghanistan|33.93911|67.709953|    0|2020-01-30 00:00:00|
|          null|   Afghanistan|33.93911|67.709953|    0|2020-01-31 00:00:00|

                                                                                

In [None]:
df_unpivot.filter("Date is null").show(5)

In [None]:
df_unpivot.show(5)

In [9]:
df_unpivot = df_unpivot.withColumn('Source_file', F.input_file_name())
df_unpivot.printSchema()



root
 |-- Province/State: string (nullable = true)
 |-- Country/Region: string (nullable = true)
 |-- Lat: string (nullable = true)
 |-- Long: string (nullable = true)
 |-- Cases: string (nullable = true)
 |-- Date: timestamp (nullable = true)
 |-- Source_file: string (nullable = false)



In [10]:
df_unpivot = df_unpivot.withColumn('Source', F.when(F.col('Source_file').contains('confirmed') , 'confirmed')
                                .when(F.col('Source_file').contains('deaths') , 'deaths')
                                .when(F.col('Source_file').contains('recovered') , 'recovered')
                                ).drop('Source_file')
df_unpivot.printSchema()

root
 |-- Province/State: string (nullable = true)
 |-- Country/Region: string (nullable = true)
 |-- Lat: string (nullable = true)
 |-- Long: string (nullable = true)
 |-- Cases: string (nullable = true)
 |-- Date: timestamp (nullable = true)
 |-- Source: string (nullable = true)



In [11]:
df_unpivot.show(5)

+--------------+--------------+--------+---------+-----+-------------------+---------+
|Province/State|Country/Region|     Lat|     Long|Cases|               Date|   Source|
+--------------+--------------+--------+---------+-----+-------------------+---------+
|          null|   Afghanistan|33.93911|67.709953|    0|2020-01-22 00:00:00|confirmed|
|          null|   Afghanistan|33.93911|67.709953|    0|2020-01-23 00:00:00|confirmed|
|          null|   Afghanistan|33.93911|67.709953|    0|2020-01-24 00:00:00|confirmed|
|          null|   Afghanistan|33.93911|67.709953|    0|2020-01-25 00:00:00|confirmed|
|          null|   Afghanistan|33.93911|67.709953|    0|2020-01-26 00:00:00|confirmed|
+--------------+--------------+--------+---------+-----+-------------------+---------+
only showing top 5 rows



In [12]:
df_unpivot.filter(F.col("Cases").cast("int").isNull()).show()

                                                                                

+--------------+--------------+---+----+-----+----+------+
|Province/State|Country/Region|Lat|Long|Cases|Date|Source|
+--------------+--------------+---+----+-----+----+------+
+--------------+--------------+---+----+-----+----+------+



                                                                                

In [13]:
window_spec = Window.partitionBy('Country/Region','Source').orderBy('Date')

In [14]:
df_unpivot = df_unpivot.withColumn('Cases_increase', F.col("Cases") - F.lag("Cases", 1, 0).over(window_spec))

In [15]:
df_unpivot = df_unpivot.withColumnRenamed('Province/State','Province_State').withColumnRenamed('Country/Region', 'Country_Region')

In [16]:
df_unpivot.printSchema()

root
 |-- Province_State: string (nullable = true)
 |-- Country_Region: string (nullable = true)
 |-- Lat: string (nullable = true)
 |-- Long: string (nullable = true)
 |-- Cases: string (nullable = true)
 |-- Date: timestamp (nullable = true)
 |-- Source: string (nullable = true)
 |-- Cases_increase: double (nullable = true)



In [17]:
df_unpivot.createOrReplaceTempView('raw_table')

In [None]:
spark.sql("select * from raw_table limit 100").show()

In [18]:
trusted_sql = ("select Country_Region as pais, \
               Province_State as estado, \
               Lat as latitude, \
               Long as longitude, \
               Date as data, \
               case when Source == 'confirmed' then Cases_increase end as quantidade_confirmados, \
               case when Source == 'deaths' then Cases_increase end as quantidade_mortes, \
               case when Source == 'recovered' then Cases end as quantidade_recuperados \
               from raw_table \
               ")

In [19]:
try:
    df_trusted = spark.sql(trusted_sql)
except Exception:
    print(traceback.format_exc())

In [20]:
df_trusted = df_trusted.withColumn("ano", F.year("data")).withColumn("mes", F.month("data"))

In [23]:
df_trusted = df_trusted.withColumn('latitude', F.col('latitude').cast("double"))
df_trusted = df_trusted.withColumn('longitude', F.col('longitude').cast("double"))
df_trusted = df_trusted.withColumn('quantidade_confirmados', F.col('quantidade_confirmados').cast("long"))
df_trusted = df_trusted.withColumn('quantidade_mortes', F.col('quantidade_mortes').cast("long"))
df_trusted = df_trusted.withColumn('quantidade_recuperados', F.col('quantidade_recuperados').cast("long"))

In [24]:
df_trusted.printSchema()

root
 |-- pais: string (nullable = true)
 |-- estado: string (nullable = true)
 |-- latitude: double (nullable = true)
 |-- longitude: double (nullable = true)
 |-- data: timestamp (nullable = true)
 |-- quantidade_confirmados: long (nullable = true)
 |-- quantidade_mortes: long (nullable = true)
 |-- quantidade_recuperados: long (nullable = true)
 |-- ano: integer (nullable = true)
 |-- mes: integer (nullable = true)



In [None]:
df_trusted.show(100)

In [None]:
df_trusted = df_trusted.repartition(partition)

In [None]:
df_trusted.write.mode('overwrite').format('parquet').partitionBy(partition_fields).option("parquet.compress", "snappy").save(write_path)