# Create Schemas and Tables

In [None]:
%run "./initialize"

In [None]:
spark.sql(f"CREATE SCHEMA IF NOT EXISTS {staging_schema}")
spark.sql(f"CREATE SCHEMA IF NOT EXISTS {bronze_schema}")
spark.sql(f"CREATE SCHEMA IF NOT EXISTS {bronze_schema}_yaml")
spark.sql(f"CREATE SCHEMA IF NOT EXISTS {silver_schema}")
spark.sql(f"CREATE SCHEMA IF NOT EXISTS {gold_schema}")
spark.sql(f"CREATE VOLUME IF NOT EXISTS {staging_schema}.{staging_volume}")
spark.sql(f"CREATE SCHEMA IF NOT EXISTS {dpm_schema}")

## Base Tables

In [None]:
spark.sql(f"DROP TABLE IF EXISTS {staging_schema}.customer")
spark.sql(f"""CREATE TABLE {staging_schema}.customer (
  CUSTOMER_ID integer,
  FIRST_NAME string,
  LAST_NAME string,
  EMAIL string,
  DELETE_FLAG boolean,
  LOAD_TIMESTAMP timestamp)
TBLPROPERTIES (delta.enableChangeDataFeed = true);""")

spark.sql(f"DROP TABLE IF EXISTS {staging_schema}.customer_address")
spark.sql(f"""CREATE TABLE {staging_schema}.customer_address (
  CUSTOMER_ID integer,
  CITY string,
  STATE string,
  LOAD_TIMESTAMP timestamp)
TBLPROPERTIES (delta.enableChangeDataFeed = true);""")

## Feature Tables

In [None]:
spark.sql(f"DROP TABLE IF EXISTS {staging_schema}.customer_snapshot_source")
spark.sql(f"""CREATE TABLE {staging_schema}.customer_snapshot_source (
  CUSTOMER_ID integer,
  FIRST_NAME string,
  LAST_NAME string,
  EMAIL string,
  DELETE_FLAG boolean,
  LOAD_TIMESTAMP timestamp)
TBLPROPERTIES (delta.enableChangeDataFeed = true);""")

spark.sql(f"DROP TABLE IF EXISTS {staging_schema}.customer_historical_snapshot_source")
spark.sql(f"""CREATE TABLE {staging_schema}.customer_historical_snapshot_source (
  CUSTOMER_ID integer,
  FIRST_NAME string,
  LAST_NAME string,
  EMAIL string,
  LOAD_TIMESTAMP timestamp)
TBLPROPERTIES (delta.enableChangeDataFeed = true);""")

spark.sql(f"DROP TABLE IF EXISTS {staging_schema}.customer_snapshots")
spark.sql(f"""CREATE TABLE {staging_schema}.customer_snapshots (
  CUSTOMER_ID integer,
  FIRST_NAME string,
  LAST_NAME string,
  EMAIL string,
  UPDATE_TIMESTAMP timestamp,
  SNAPSHOT_TIMESTAMP timestamp,
  SNAPSHOT_VERSION integer)
TBLPROPERTIES (delta.enableChangeDataFeed = true);""")

spark.sql(f"DROP TABLE IF EXISTS {staging_schema}.customer_purchase")
spark.sql(f"""CREATE TABLE {staging_schema}.customer_purchase (
  CUSTOMER_ID integer,
  PRODUCT string,
  QUANTITY integer,
  PRICE decimal(10, 2),
  PURCHASE_TIMESTAMP timestamp)
TBLPROPERTIES (delta.enableChangeDataFeed = true);""")

spark.sql(f"DROP TABLE IF EXISTS {bronze_schema}.table_to_migrate_scd0")
spark.sql(f"""CREATE TABLE {bronze_schema}.table_to_migrate_scd0 (  
  CUSTOMER_ID integer,
  FIRST_NAME string,
  LAST_NAME string,
  EMAIL string
) TBLPROPERTIES (delta.enableChangeDataFeed = true);""")

spark.sql(f"DROP TABLE IF EXISTS {bronze_schema}.table_to_migrate_scd2")
spark.sql(f"""CREATE TABLE {bronze_schema}.table_to_migrate_scd2 (
  CUSTOMER_ID integer,
  FIRST_NAME string,
  LAST_NAME string,
  EMAIL string,
  EFFECTIVE_FROM timestamp,
  EFFECTIVE_TO timestamp)
TBLPROPERTIES (delta.enableChangeDataFeed = true);""")

## Kafka Tables

In [None]:
spark.sql(f"DROP TABLE IF EXISTS {staging_schema}.kafka_sink_sample_source")
spark.sql(f"""CREATE TABLE {staging_schema}.kafka_sink_sample_source (
    Message_Id BIGINT GENERATED BY DEFAULT AS IDENTITY (START WITH 1 INCREMENT BY 1),
    Message_Ts TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
    Message_payload STRING
)
USING delta
TBLPROPERTIES (
    'delta.enableChangeDataFeed' = 'true',
    'delta.feature.allowColumnDefaults' = 'supported',
    'delta.feature.changeDataFeed' = 'supported',
    'delta.feature.columnMapping' = 'supported',
    'delta.feature.generatedColumns' = 'supported',
    'delta.feature.invariants' = 'supported',
    'delta.minReaderVersion' = '3',
    'delta.minWriterVersion' = '7'
)""")