In [53]:
import pyspark
from pyspark.sql import SparkSession
import pandas as pd
import os

spark = pyspark.sql.SparkSession.builder.appName("DeltaTest") \
    .config("spark.jars.packages", "io.delta:delta-core_2.12:0.7.0") \
    .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
    .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") \
    .getOrCreate()

from delta.tables import *

In [54]:
aws_access_key = os.getenv("AWS_ACCESS_KEY")
aws_secret = os.getenv("AWS_SECRET_ACCESS_KEY_ID")

spark.sparkContext._jsc.hadoopConfiguration().set("fs.s3n.awsAccessKeyId", aws_access_key)
spark.sparkContext._jsc.hadoopConfiguration().set("fs.s3n.awsSecretAccessKey", aws_secret)
spark.sparkContext._jsc.hadoopConfiguration().set("fs.s3n.endpoint", "s3.amazonaws.com")

In [55]:
df = spark.read.parquet("s3n://covid-delta-lake/test/cases/*.parquet")

In [56]:
df.show(10)

+----------+--------------+--------+-------+-----+------+
|      date|        county|   state|   fips|cases|deaths|
+----------+--------------+--------+-------+-----+------+
|2020-08-28| Isle of Wight|Virginia|51093.0|  519|    11|
|2020-08-28|    James City|Virginia|51095.0|  726|    18|
|2020-08-28|   King George|Virginia|51099.0|  202|     4|
|2020-08-28|  King William|Virginia|51101.0|  111|     2|
|2020-08-28|King and Queen|Virginia|51097.0|   49|     1|
|2020-08-28|     Lancaster|Virginia|51103.0|   66|     0|
|2020-08-28|           Lee|Virginia|51105.0|  183|     3|
|2020-08-28|Lexington city|Virginia|51678.0|   42|     0|
|2020-08-28|       Loudoun|Virginia|51107.0| 5878|   118|
|2020-08-28|        Louisa|Virginia|51109.0|  243|     2|
+----------+--------------+--------+-------+-----+------+
only showing top 10 rows



In [57]:
df.printSchema()

root
 |-- date: string (nullable = true)
 |-- county: string (nullable = true)
 |-- state: string (nullable = true)
 |-- fips: double (nullable = true)
 |-- cases: long (nullable = true)
 |-- deaths: long (nullable = true)



In [58]:
df_testing = spark.read.parquet("s3n://covid-delta-lake/test/tests/*.parquet")

In [59]:
df_testing.printSchema()

root
 |-- date: long (nullable = true)
 |-- state: string (nullable = true)
 |-- positive: double (nullable = true)
 |-- negative: double (nullable = true)
 |-- death: double (nullable = true)
 |-- total: double (nullable = true)
 |-- hash: string (nullable = true)
 |-- dateChecked: string (nullable = true)
 |-- totalTestResults: double (nullable = true)
 |-- fips: long (nullable = true)
 |-- deathIncrease: long (nullable = true)
 |-- hospitalizedIncrease: long (nullable = true)
 |-- negativeIncrease: long (nullable = true)
 |-- positiveIncrease: long (nullable = true)
 |-- totalTestResultsIncrease: long (nullable = true)
 |-- hospitalized: double (nullable = true)
 |-- pending: double (nullable = true)



In [60]:
df_testing.select("date","state","positive", "negative", "death","totalTestResults").show()

+--------+-----+--------+---------+-------+----------------+
|    date|state|positive| negative|  death|totalTestResults|
+--------+-----+--------+---------+-------+----------------+
|20200813|   NE| 29244.0| 280400.0|  356.0|        309644.0|
|20200813|   NH|  6921.0| 174695.0|  422.0|        272051.0|
|20200813|   NJ|186594.0|2217096.0|15841.0|       2403690.0|
|20200813|   NM| 22816.0| 622007.0|  695.0|        644823.0|
|20200813|   NV| 58650.0| 474494.0| 1030.0|        533144.0|
|20200813|   NY|423440.0|6392941.0|25228.0|       6816381.0|
|20200813|   OH|105426.0|1647881.0| 3755.0|       1753307.0|
|20200813|   OK| 46103.0| 685169.0|  638.0|        731272.0|
|20200813|   OR| 22300.0| 444963.0|  383.0|        467263.0|
|20200813|   PA|122121.0|1288873.0| 7409.0|       1410994.0|
|20200813|   PR| 24446.0| 305972.0|  306.0|        330418.0|
|20200813|   RI| 20240.0| 208234.0| 1019.0|        416203.0|
|20200813|   SC|103909.0| 702119.0| 2186.0|        806028.0|
|20200813|   SD|  9897.0

In [64]:
df_populations = spark.read.parquet("s3n://covid-delta-lake/delta/populations/parquets/*.parquet")

In [65]:
df_populations.show(10)

+--------------+----+--------+-------+-----------------+
|            Id| Id2|  County|  State|pop_estimate_2018|
+--------------+----+--------+-------+-----------------+
|0500000US01001|1001| Autauga|Alabama|            55601|
|0500000US01003|1003| Baldwin|Alabama|           218022|
|0500000US01005|1005| Barbour|Alabama|            24881|
|0500000US01007|1007|    Bibb|Alabama|            22400|
|0500000US01009|1009|  Blount|Alabama|            57840|
|0500000US01011|1011| Bullock|Alabama|            10138|
|0500000US01013|1013|  Butler|Alabama|            19680|
|0500000US01015|1015| Calhoun|Alabama|           114277|
|0500000US01017|1017|Chambers|Alabama|            33615|
|0500000US01019|1019|Cherokee|Alabama|            26032|
+--------------+----+--------+-------+-----------------+
only showing top 10 rows



In [66]:
df_populations.printSchema()

root
 |-- Id: string (nullable = true)
 |-- Id2: long (nullable = true)
 |-- County: string (nullable = true)
 |-- State: string (nullable = true)
 |-- pop_estimate_2018: long (nullable = true)



In [23]:
df_states= spark.read.option("header", True).csv("s3n://covid-delta-lake/static/*.csv")

In [24]:
df_states.printSchema()

root
 |-- State: string (nullable = true)
 |-- Abbrev: string (nullable = true)
 |-- Code: string (nullable = true)



In [45]:
df_states = df_states.withColumnRenamed("State", "state_full")
df_states.show(10)

+--------------------+------+----+
|          state_full|Abbrev|Code|
+--------------------+------+----+
|             Alabama|  Ala.|  AL|
|              Alaska|Alaska|  AK|
|             Arizona| Ariz.|  AZ|
|            Arkansas|  Ark.|  AR|
|          California|Calif.|  CA|
|            Colorado| Colo.|  CO|
|         Connecticut| Conn.|  CT|
|            Delaware|  Del.|  DE|
|District of Columbia|  D.C.|  DC|
|             Florida|  Fla.|  FL|
+--------------------+------+----+
only showing top 10 rows



In [51]:
df = df.join(df_states.select("state_full", "Code"), df_states["state_full"] == df["state"], "inner").select("date", "county", "state_full", "fips", "cases", "deaths", "code")

In [52]:
df.show()

+----------+------------------+----------+-------+-----+------+----+
|      date|            county|state_full|   fips|cases|deaths|code|
+----------+------------------+----------+-------+-----+------+----+
|2020-08-28|     Isle of Wight|  Virginia|51093.0|  519|    11|  VA|
|2020-08-28|        James City|  Virginia|51095.0|  726|    18|  VA|
|2020-08-28|       King George|  Virginia|51099.0|  202|     4|  VA|
|2020-08-28|      King William|  Virginia|51101.0|  111|     2|  VA|
|2020-08-28|    King and Queen|  Virginia|51097.0|   49|     1|  VA|
|2020-08-28|         Lancaster|  Virginia|51103.0|   66|     0|  VA|
|2020-08-28|               Lee|  Virginia|51105.0|  183|     3|  VA|
|2020-08-28|    Lexington city|  Virginia|51678.0|   42|     0|  VA|
|2020-08-28|           Loudoun|  Virginia|51107.0| 5878|   118|  VA|
|2020-08-28|            Louisa|  Virginia|51109.0|  243|     2|  VA|
|2020-08-28|         Lunenburg|  Virginia|51111.0|   93|     2|  VA|
|2020-08-28|    Lynchburg city|  V

In [53]:
df.printSchema()

root
 |-- date: string (nullable = true)
 |-- county: string (nullable = true)
 |-- state_full: string (nullable = true)
 |-- fips: double (nullable = true)
 |-- cases: long (nullable = true)
 |-- deaths: long (nullable = true)
 |-- code: string (nullable = true)



In [29]:
import configparser

In [33]:
config = configparser.ConfigParser()
config.read('redshift.cfg')

['redshift.cfg']

In [35]:
from sql_queries import population_table_create, copy_table_population
import psycopg2

In [67]:
conn = psycopg2.connect("host={} dbname={} user={} password={} port={}".format(*config['CLUSTER'].values()))
cur = conn.cursor()

In [68]:
drop = ("DROP TABLE IF EXISTS populations")

In [69]:
cur.execute(drop)
conn.commit()

In [70]:
pop_create_query= ("""CREATE TABLE IF NOT EXISTS populations( 
                                    Id varchar, 
                                    Id2 bigint, 
                                    County varchar, 
                                    state varchar, 
                                    pop_estimate_2018 bigint);""")

In [71]:
cur.execute(pop_create_query)
conn.commit()

In [77]:
pop_copy_query = ("""
    COPY populations 
    FROM 's3://covid-delta-lake/delta/populations/parquets'
    IAM_ROLE '{}' 
    FORMAT AS PARQUET;
    """).format(config.get("IAM_ROLE", "ARN"))

In [73]:
cur.execute(pop_copy_query)
conn.commit()

In [151]:
conn = psycopg2.connect("host={} dbname={} user={} password={} port={}".format(*config['CLUSTER'].values()))
cur = conn.cursor()

In [141]:
from sql_queries import covid_table_create, copy_table_cases

In [142]:
drop_cases = ("DROP TABLE IF EXISTS covid_cases")

In [143]:
cur.execute(drop_cases)
conn.commit()

In [144]:
cur.execute("""CREATE TABLE IF NOT EXISTS covid_cases(
                            date VARCHAR,
                            county VARCHAR,
                            state_full VARCHAR,
                            fips DOUBLE PRECISION,
                            cases BIGINT,
                            deaths BIGINT,
                            code VARCHAR                           
                                );""")
conn.commit()

In [145]:
copy_table_cases2 = ("""
    COPY covid_cases
    FROM 's3://covid-delta-lake/delta/cases/parquets'
    IAM_ROLE '{}'
    FORMAT AS PARQUET;
    """).format(config.get("IAM_ROLE", "ARN"))

In [146]:
cur.execute(copy_table_cases2)
conn.commit()

In [147]:
tests_table_create= ("""CREATE TABLE IF NOT EXISTS covid_tests(
                            date BIGINT,
                            state_full VARCHAR,
                            positive DOUBLE PRECISION,
                            negative DOUBLE PRECISION,
                            death DOUBLE PRECISION,
                            total DOUBLE PRECISION,
                            hash VARCHAR,
                            dateChecked VARCHAR,
                            totalTestResults DOUBLE PRECISION,
                            fips BIGINT,
                            deathIncrease BIGINT,
                            hospitalizedIncrease BIGINT,
                            negativeIncrease BIGINT,
                            positiveIncrease BIGINT,
                            totalTestResultsIncrease BIGINT,
                            hospitalized DOUBLE PRECISION,
                            pending DOUBLE PRECISION                           
                                );""")

In [148]:
cur.execute(tests_table_create)
conn.commit()

In [152]:
copy_table_tests = ("""
    COPY covid_tests 
    FROM 's3://covid-delta-lake/delta/tests/parquets'
    IAM_ROLE '{}'
    FORMAT AS PARQUET;
    """).format(config.get("IAM_ROLE", "ARN"))

In [153]:
cur.execute(copy_table_tests)
conn.commit()

In [None]:
😂😂

In [154]:
time_table_create = ("""CREATE TABLE IF NOT EXISTS time(
                        date VARCHAR,
                        date_ts DATE,
                        day INT, 
                        week INT, 
                        month INT,
                        year INT,
                        weekday INT);
                    """)

In [168]:
time_table_insert = ("""
INSERT INTO time (
                date,
                date_ts, 
                day, 
                week, 
                month,
                year,
                weekday)
SELECT  date as date,
        TO_DATE(date, 'YYYY-MM-DD') as date_ts,
        EXTRACT(day FROM date_ts) as day,
        EXTRACT(week FROM date_ts) as week,
        EXTRACT(month FROM date_ts) as month,
        EXTRACT(year FROM date_ts) as year,
        EXTRACT(weekday FROM date_ts) as weekday
FROM(
  SELECT DISTINCT date
  FROM covid_cases
  WHERE date IS NOT null)
""")

In [169]:
conn = psycopg2.connect("host={} dbname={} user={} password={} port={}".format(*config['CLUSTER'].values()))
cur = conn.cursor()

In [165]:
cur.execute("DROP TABLE IF EXISTS time")
conn.commit()

In [166]:
cur.execute(time_table_create)
conn.commit()

In [170]:
cur.execute(time_table_insert)
conn.commit()

In [172]:
config = configparser.ConfigParser()
config.read('redshift.cfg')

['redshift.cfg']

In [176]:
cur.execute("select count(*)from covid_tests")
result = cur.fetchall()

In [180]:
result[0][0]

33614

In [181]:
for table in ["covid_tests", "covid_cases", "populations", "time"]:
    cur.execute(f"select count(*) from {table}")
    result = cur.fetchall()
    if result[0][0] > 1:
        print("Data quality inspection passed")
    else:
        raise ValueError(f"QA failed for {table}: Table contained 0 rows")

Data quality inspection passed


ValueError: QA failed for covid_cases: Table contained 0 rows