In [0]:
from pyspark.sql.types import StructureField, StructureType, IntegerType, StringType, BooleanType, DataType, DecimalType

from pyspark.sql.functions import col, to_date, year, month, dayofweek 

import sys, platform; print(sys.version)

3.11.10 (main, Sep  7 2024, 18:35:41) [GCC 11.4.0]


In [0]:
# Defaul spark session value, we will create ours
# spark

from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("Covid19").getOrCreate()


## READ FILES FROM S3 - EXTRACT

Files:
-   Enigma-JHU.csv
-   countrycode.csv
-   CountyPopulation.csv
-   usa-hospital-beds.jsonl
-   state-abv.csv
-   us_daily.csv
-   us_county.csv
-   states_daily.csv
-   us_states.csv



In [0]:

enigma_jhu_df = spark.read.format("csv").option("header", True).load("s3://covid19-spark-databricks/Enigma-JHU.csv")

# enigma_jhu_df = spark.table("workspace.covid19.enigma_jhu")
# display(enigma_jhu_df.limit(5))



fips,admin2,province_state,country_region,last_update,latitude,longitude,confirmed,deaths,recovered,active,combined_key
,,Anhui,China,2020-01-22T17:00:00.000Z,31.826,117.226,1.0,,,,"Anhui, China"
,,Beijing,China,2020-01-22T17:00:00.000Z,40.182,116.414,14.0,,,,"Beijing, China"
,,Chongqing,China,2020-01-22T17:00:00.000Z,30.057,107.874,6.0,,,,"Chongqing, China"
,,Fujian,China,2020-01-22T17:00:00.000Z,26.079,117.987,1.0,,,,"Fujian, China"
,,Gansu,China,2020-01-22T17:00:00.000Z,36.061,103.834,,,,,"Gansu, China"


In [0]:
countrycode_df = spark.read.format("csv").option("header", True).load("s3://covid19-spark-databricks/countrycode.csv")

countypopulation_df = spark.read.format("csv").option("header", True).load("s3://covid19-spark-databricks/CountyPopulation.csv")

hospital_beds_df = spark.read.format("json").option("header", True).load("s3://covid19-spark-databricks/usa-hospital-beds.jsonl")

state_abv_df = spark.read.format("csv").option("header", True).load("s3://covid19-spark-databricks/state-abv.csv")

states_daily_df = spark.read.format("csv").option("header", True).load("s3://covid19-spark-databricks/us_daily.csv")

us_county_df = spark.read.format("csv").option("header", True).load("s3://covid19-spark-databricks/us_county.csv")

us_daily_df = spark.read.format("csv").option("header", True).load("s3://covid19-spark-databricks/states_daily.csv")

us_states_df = spark.read.format("csv").option("header", True).load("s3://covid19-spark-databricks/us_states.csv")



## ETL job - Transformation, creatind fact and dim tables 

### factCovid

In [0]:
factCovid_1 = enigma_jhud_df.select(
    'fips', 'province_state', 'country_region', 'confirmed', 'deaths', 'recovered', 'active'
)

factCovid_2 = states_daily_df.select(
    'fips', 'date', 'positive', 'negative', 
    'hospitalizedcurrently', 'hospitalized', 'hospitalizeddischarged'
)

factCovid = factCovid_1.join(factCovid_2, on='fips', how='inner')
# factCovid.show()


### dimRegion

In [0]:
dimRegion_1 = enigma_jhud_df.select(
    'fips', 'province_state', 'country_region', 'latitude', 'longitude'
)

dimRegion_2 = us_county_df.select(
    'fips', 'county', 'state'
)

dimRegion = dimRegion_1.join(dimRegion_2, on='fips', how='inner')
# dimRegion.show()

### dimHospital

In [0]:
dimHospital = hospital_beds_df.select(
    'fips', 'state_name', 'latitude', 'longtitude', 'hq_address', 
    'hospital_name', 'hospital_type', 'hq_city', 'hq_state'
)
# dimHospital.show()

### dimDate

In [0]:
dimDate = states_daily_df.select('fips', 'date') \
    .withColumn("date", to_date(col("date").cast("string"), "yyyyMMdd")) \
    .withColumn("year", year("date")) \
    .withColumn("month", month("date")) \
    .withColumn("day_of_week", dayofweek("date"))
# dimDate.show()