In [63]:
from delta.tables import *
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, max, min, asc, desc, current_timestamp
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, TimestampType, DateType, DoubleType, ArrayType

In [86]:
spark = (
    SparkSession.builder
    .appName("spark-s3-delta")
    .config(
        "spark.jars.packages",
        ",".join([
            "io.delta:delta-spark_2.12:3.1.0",
            "org.apache.hadoop:hadoop-aws:3.3.4",
            "com.amazonaws:aws-java-sdk-bundle:1.12.262"
        ])
    )
    .config(
        "spark.sql.extensions",
        "io.delta.sql.DeltaSparkSessionExtension"
    )
    .config(
        "spark.sql.catalog.spark_catalog",
        "org.apache.spark.sql.delta.catalog.DeltaCatalog"
    )
    .getOrCreate()
)

hadoop_conf = spark._jsc.hadoopConfiguration()

hadoop_conf.set("fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem")
hadoop_conf.set(
    "fs.s3a.aws.credentials.provider",
    "com.amazonaws.auth.DefaultAWSCredentialsProviderChain"
)

hadoop_conf.set("fs.s3a.path.style.access", "true")

In [87]:
# Schema definition

region_schema = StructType([
    StructField('cities', ArrayType(StringType()), nullable = True),
    StructField('iso', StringType(), nullable = True),
    StructField('lat', StringType(), nullable = True),
    StructField('long', StringType(), nullable = True),
    StructField('name', StringType(), nullable = True),
    StructField('province', StringType(), nullable = True)
])

raw_schema = StructType([
    StructField('active', IntegerType(), nullable = True),
    StructField('active_diff', IntegerType(), nullable = True),
    StructField('confirmed', IntegerType(), nullable = True),
    StructField('confirmed_diff', IntegerType(), nullable = True),
    StructField('date', DateType(), nullable = True),
    StructField('deaths', IntegerType(), nullable = True),
    StructField('deaths_diff', IntegerType(), nullable = True),
    StructField('fatality_rate', DoubleType(), nullable = True),
    StructField('last_update', TimestampType(), nullable = True),
    StructField('recovered', IntegerType(), nullable = True),
    StructField('recovered_diff', IntegerType(), nullable = True),
    StructField('region', region_schema, nullable = True),
    StructField('ingested_ts', TimestampType(), nullable = False),
    StructField('source_file_index', StringType(), nullable = False)
])

target_schema = StructType([
    StructField('active', IntegerType(), nullable = True),
    StructField('active_diff', IntegerType(), nullable = True),
    StructField('confirmed', IntegerType(), nullable = True),
    StructField('confirmed_diff', IntegerType(), nullable = True),
    StructField('date', DateType(), nullable = True),
    StructField('deaths', IntegerType(), nullable = True),
    StructField('deaths_diff', IntegerType(), nullable = True),
    StructField('fatality_rate', DoubleType(), nullable = True),
    StructField('last_update', TimestampType(), nullable = True),
    StructField('recovered', IntegerType(), nullable = True),
    StructField('recovered_diff', IntegerType(), nullable = True),
    StructField('cities', ArrayType(StringType()), nullable = True),
    StructField('iso', StringType(), nullable = True),
    StructField('lat', DoubleType(), nullable = True),
    StructField('long', DoubleType(), nullable = True),
    StructField('name', StringType(), nullable = True),
    StructField('province', StringType(), nullable = True),
    StructField('ingested_ts', TimestampType(), nullable = False),
    StructField('source_file_index', StringType(), nullable = False),
    StructField('created_ts', TimestampType(), nullable=False),
    StructField('updated_ts', TimestampType(), nullable=False)
])


In [88]:
raw_aws_path = 's3a://vd-airflow-docker-bucket/covid/Canada/raw'

curated_aws_path = 's3a://vd-airflow-docker-bucket/covid/Canada/curated'

In [89]:
# Load as DF for Pyspark Operations (select, filter, agg, etc.)
raw_df = spark.read.schema(raw_schema).json(raw_aws_path)
curated_df = spark.read.format('delta').load(curated_aws_path)

# Load as DeltaTable - This will be used as the target for Delta Table Operations (Merge/Upsert)
curated_target_table = DeltaTable.forPath(spark, curated_aws_path)

In [90]:
date = curated_df.select('date').agg(max('date')).collect()[0][0]

In [91]:
# Flattening raw data. Region --> cities, iso, lat, long, name, province

df_flattened = (
                raw_df                                               \
                .select(
                col('active').alias('active'),
                col('active_diff').alias('active_diff'),
                col('confirmed').alias('confirmed'),
                col('confirmed_diff').alias('confirmed_diff'),
                col('date').alias('date'),
                col('deaths').alias('deaths'),
                col('deaths_diff').alias('deaths_diff'),
                col('fatality_rate').alias('fatality_rate'),
                col('last_update').alias('last_update'),
                col('recovered').alias('recovered'),
                col('recovered_diff').alias('recovered_diff'),
                col('region.cities').alias('cities'), 
                col('region.iso').alias('iso'),              # Cast to correct type
                col('region.lat').alias('lat').cast('double'), 
                col('region.long').alias('long').cast('double'), 
                col('region.name').alias('name'), 
                col('region.province').alias('province'),
                col('ingested_ts').alias('ingested_ts'),
                col('source_file_index').alias('source_file_index')
                )
                # Add metadata timestamp
                .withColumn('created_ts', current_timestamp())         
                .withColumn('updated_ts', current_timestamp()) 
                # Set date for full load
                .filter(col('date')>= date)                             
                )

In [92]:
from pyspark.sql.functions import col, max, min, asc, desc

# Full Schema Dict for Insert (whenNotMatchedInsert)
schema_dict = {}
# Schema Dict without created ts (whenMatchedUpdate)
schema_dict_update = {}

for field_name in target_schema.fieldNames():
    schema_dict[field_name] = col(f'source.{field_name}')
    if field_name not in ['created_ts']:
        schema_dict_update[field_name] = col(f'source.{field_name}')
print(f'Columns to Insert: {schema_dict}')

print(f'Columns to Update: {schema_dict_update}')

Columns to Insert: {'active': Column<'source.active'>, 'active_diff': Column<'source.active_diff'>, 'confirmed': Column<'source.confirmed'>, 'confirmed_diff': Column<'source.confirmed_diff'>, 'date': Column<'source.date'>, 'deaths': Column<'source.deaths'>, 'deaths_diff': Column<'source.deaths_diff'>, 'fatality_rate': Column<'source.fatality_rate'>, 'last_update': Column<'source.last_update'>, 'recovered': Column<'source.recovered'>, 'recovered_diff': Column<'source.recovered_diff'>, 'cities': Column<'source.cities'>, 'iso': Column<'source.iso'>, 'lat': Column<'source.lat'>, 'long': Column<'source.long'>, 'name': Column<'source.name'>, 'province': Column<'source.province'>, 'ingested_ts': Column<'source.ingested_ts'>, 'source_file_index': Column<'source.source_file_index'>, 'created_ts': Column<'source.created_ts'>, 'updated_ts': Column<'source.updated_ts'>}
Columns to Update: {'active': Column<'source.active'>, 'active_diff': Column<'source.active_diff'>, 'confirmed': Column<'source.c

In [93]:
df_flattened.printSchema()
curated_df.printSchema()

root
 |-- active: integer (nullable = true)
 |-- active_diff: integer (nullable = true)
 |-- confirmed: integer (nullable = true)
 |-- confirmed_diff: integer (nullable = true)
 |-- date: date (nullable = true)
 |-- deaths: integer (nullable = true)
 |-- deaths_diff: integer (nullable = true)
 |-- fatality_rate: double (nullable = true)
 |-- last_update: timestamp (nullable = true)
 |-- recovered: integer (nullable = true)
 |-- recovered_diff: integer (nullable = true)
 |-- cities: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- iso: string (nullable = true)
 |-- lat: double (nullable = true)
 |-- long: double (nullable = true)
 |-- name: string (nullable = true)
 |-- province: string (nullable = true)
 |-- ingested_ts: timestamp (nullable = true)
 |-- source_file_index: string (nullable = true)
 |-- created_ts: timestamp (nullable = false)
 |-- updated_ts: timestamp (nullable = false)

root
 |-- active: integer (nullable = true)
 |-- active_diff: integer (

In [None]:
# Append to existing table
# df_flattened.write.format('delta').mode('append').save(curated_aws_path)

# Merge upsert to existing table
curated_target_table.alias('target').merge(
    df_flattened.alias('source'),
    'target.source_file_index = source.source_file_index AND target.date = source.date AND target.province = source.province'
) \
.whenMatchedUpdate(set=schema_dict_update) \
.whenNotMatchedInsert(values=schema_dict) \
.execute()

In [96]:
df = spark.read.format('delta').load(curated_aws_path)

df.filter(col('created_ts')!=col('updated_ts')).show()

+------+-----------+---------+--------------+----------+------+-----------+-------------+-------------------+---------+--------------+------+---+-------+---------+------+----------------+-------------------+--------------------+--------------------+--------------------+
|active|active_diff|confirmed|confirmed_diff|      date|deaths|deaths_diff|fatality_rate|        last_update|recovered|recovered_diff|cities|iso|    lat|     long|  name|        province|        ingested_ts|   source_file_index|          created_ts|          updated_ts|
+------+-----------+---------+--------------+----------+------+-----------+-------------+-------------------+---------+--------------+------+---+-------+---------+------+----------------+-------------------+--------------------+--------------------+--------------------+
|     2|          0|        2|             0|2020-01-31|     0|          0|          0.0|2020-01-31 23:59:00|        0|             0|    []|CAN|51.2538| -85.3232|Canada|         Ontario|