In [None]:
# Import python packages
import streamlit as st
import json

# Import Snowpark packages
from snowflake.snowpark.context import get_active_session
snowflake_session = get_active_session()

In [None]:
def display_schema_evolution_records(
    snowflake_query_result
  , source_type: str
):
  results_as_list = snowflake_query_result.__getattribute__("results").to_dict("records")
  list_of_values = [
    {
        "name": x["name"]
      , "schema_evolution_record": json.loads(x["schema evolution record"])
    }
    for x in results_as_list
    if (
          (x["schema evolution record"] is not None)
      and (json.loads(x["schema evolution record"])["fileName"].split("/")[0] == source_type)
    )
  ]

  cols = st.columns(len(list_of_values))

  for i, col_x in enumerate(cols):
    with col_x:
      st.write(list_of_values[i]["name"])
      st.code(
        json.dumps(
            list_of_values[i]["schema_evolution_record"]
          , indent = 2
        )
      )

In [None]:
use schema "DEMO";

# Schema Evolution

So far, we have established the following building blocks:

- Infer metadata from a file
- Leverage the metadata to create a new table
- Ingest data into the new table, matching by column name

Now for the main event in this session. Our next demonstration achieves the following:

1. Create a new table that only contains some generic metadata fields for data lineage and monitoring
2. Ingest our example CSV file into this new table, _automatically_ adding all the data fields
3. Ingest our example JSON file into this new table, _automatically_ adding the additional fields
4. Ingest our example Parquet file into this new table, _automatically_ adding the final additional field

First, we quickly review which fields are available in our data:

In [None]:
with "CTE__RAW" as (
  select
      "TABLE_NAME"
    , "COLUMN_NAME"
    , True as "PIVOT_VALUE"
    , ((count(distinct "TABLE_NAME") over (partition by "COLUMN_NAME")) != 3)::int as "CUSTOM_ORDER_1"
    , case "TABLE_NAME"
        when 'DATA_FROM_CSV' then 1
        when 'DATA_FROM_JSON' then 2
        when 'DATA_FROM_PARQUET' then 3
      end as "CUSTOM_ORDER_2"
    , "ORDINAL_POSITION" as "CUSTOM_ORDER_3"
    , concat(
          "CUSTOM_ORDER_1"
        , "CUSTOM_ORDER_2"
        , "CUSTOM_ORDER_3"
      ) as "CUSTOM_ORDER"
  from "INFORMATION_SCHEMA"."COLUMNS"
  where "TABLE_SCHEMA" = 'DEMO'
    and "TABLE_NAME" in (
        'DATA_FROM_CSV'
      , 'DATA_FROM_JSON'
      , 'DATA_FROM_PARQUET'
    )
)
select
    "COLUMN_NAME"
  , max("'DATA_FROM_CSV'") as "DATA_FROM_CSV"
  , max("'DATA_FROM_JSON'") as "DATA_FROM_JSON"
  , max("'DATA_FROM_PARQUET'") as "DATA_FROM_PARQUET"
from "CTE__RAW"
  pivot(
    max("PIVOT_VALUE")
    for "TABLE_NAME" in (
        'DATA_FROM_CSV'
      , 'DATA_FROM_JSON'
      , 'DATA_FROM_PARQUET'
    )
  )
group by "COLUMN_NAME"
order by
    min("CUSTOM_ORDER")

## Create the destination table

We will be loading all data into a single table.

We _could_ create the destination table using a table template from one of the sources as seen earlier in the session.

Instead, we create a brand new table first so we can fully show the power of metadata-driven ingestion and schema evolution.

The most important thing to note here is the `enable_schema_evolution` option, which instructs Snowflake to add new fields if required when ingesting data.

In [None]:
create or replace table "DATA_FROM_ALL_SOURCES"(
    "METADATA_FILE_PATH"                    string          comment 'Full path for the file in the originating stage'
  , "METADATA_FILE_ROW_NUMBER"              integer         comment 'Row number within the file in the originating stage'
  , "METADATA_RECORD_INGESTION_TIMESTAMP"   timestamp_ltz   comment 'Timestamp of record ingestion in local timezone'
)
  enable_schema_evolution = TRUE
  comment = 'Table containing data from all sources, with schema evolution enabled to automatically add new columns as required'
;

desc table "DATA_FROM_ALL_SOURCES";

## Including metadata in ingestion

Before the introduction of the `include_metadata` option to the `COPY INTO` fuctionality when ingesting data, the only way to include metadata fields was by including a subquery within the statement. Unfortunately, this functionality (at time of writing) does not support the `match_by_column_name` option.

Fortunately, the `include_metadata` option allows us to directly map any [Metadata Columns](https://docs.snowflake.com/en/user-guide/querying-metadata#metadata-columns) to fields in the target table:

```sql
copy into "MY_TABLE"
from '<location of file(s), including stage>'
  file_format = "<Snowflake File Format object to use when reading the file>"
  match_by_column_name = CASE_INSENSITIVE
  include_metadata = (
      "METADATA_FILE_PATH" = METADATA$FILENAME
    , "METADATA_FILE_ROW_NUMBER" = METADATA$FILE_ROW_NUMBER
    , "METADATA_RECORD_INGESTION_TIMESTAMP" = METADATA$START_SCAN_TIME
  )
;
```

## Ingestion with evolution - CSV

To start, we ingest our example CSV file.

In [None]:
copy into "DATA_FROM_ALL_SOURCES"
from '@"STG__DATA"/csv'
  file_format = "FF_CSV"
  match_by_column_name = CASE_INSENSITIVE
  include_metadata = (
      "METADATA_FILE_PATH" = METADATA$FILENAME
    , "METADATA_FILE_ROW_NUMBER" = METADATA$FILE_ROW_NUMBER
    , "METADATA_RECORD_INGESTION_TIMESTAMP" = METADATA$START_SCAN_TIME
  )
;

We can now see the additional fields have been created in the target table:

In [None]:
desc table "DATA_FROM_ALL_SOURCES";

In [None]:
st.markdown("""We can expand the "schema evolution record" in this output to see exactly where each field came from:""")

display_schema_evolution_records(
    snowflake_query_result = sql__ingestion_with_evolution__csv__metadata
  , source_type = "csv"
)

We can query the table to see how these new fields have been populated:

In [None]:
select * from "DATA_FROM_ALL_SOURCES";

## Ingestion with evolution - JSON

Next, we ingest our example JSON file.

In [None]:
copy into "DATA_FROM_ALL_SOURCES"
from '@"STG__DATA"/json'
  file_format = "FF_JSON"
  match_by_column_name = CASE_INSENSITIVE
  include_metadata = (
      "METADATA_FILE_PATH" = METADATA$FILENAME
    , "METADATA_FILE_ROW_NUMBER" = METADATA$FILE_ROW_NUMBER
    , "METADATA_RECORD_INGESTION_TIMESTAMP" = METADATA$START_SCAN_TIME
  )
;

Again, we can now see the additional fields have been created in the target table:

In [None]:
desc table "DATA_FROM_ALL_SOURCES";

In [None]:
st.markdown("""Again, we can expand the "schema evolution record" in this output to see exactly where each field came from:""")

display_schema_evolution_records(
    snowflake_query_result = sql__ingestion_with_evolution__json__metadata
  , source_type = "json"
)

Again, we can query the table to see how these new fields have been populated:

In [None]:
select * from "DATA_FROM_ALL_SOURCES";

## Ingestion with evolution - Parquet

Finally, we ingest our example parquet file.

In [None]:
copy into "DATA_FROM_ALL_SOURCES"
from '@"STG__DATA"/parquet'
  file_format = "FF_PARQUET"
  match_by_column_name = CASE_INSENSITIVE
  include_metadata = (
      "METADATA_FILE_PATH" = METADATA$FILENAME
    , "METADATA_FILE_ROW_NUMBER" = METADATA$FILE_ROW_NUMBER
    , "METADATA_RECORD_INGESTION_TIMESTAMP" = METADATA$START_SCAN_TIME
  )
;

Again, we can now see the additional fields have been created in the target table:

In [None]:
desc table "DATA_FROM_ALL_SOURCES";

In [None]:
st.markdown("""Again, we can expand the "schema evolution record" in this output to see exactly where each field came from:""")

display_schema_evolution_records(
    snowflake_query_result = sql__ingestion_with_evolution__parquet__metadata
  , source_type = "parquet"
)

Again, we can query the table to see how these new fields have been populated:

In [None]:
select * from "DATA_FROM_ALL_SOURCES";