Skip to content

Commit

Permalink
Experimental Snowflake sink.
Browse files Browse the repository at this point in the history
We use the Snowflake Kafka connector with Snowpipe Streaming to send
a stream of output changes from Feldera into Snowflake.  The main
challenge is that Snowpipe currently supports inserts but not updates
or deletions.  The workaround is to write all updates into a set of
landing tables that mirror the target database schema and use a
combination of snowflake streams to incrementally apply these updates
to target tables by converting them into insert/update/delete commands.

So the end-to-end process is:

* Feldera outputs updates to Kafka
* The Snowflake Kafka connector converts them into a stream of inserts
  into landing tables.
* We attach a Snowpipe Stream to each landing table to track changes
  to the table.  A periodic task reads updates from the stream,
  applies them to the target table and removes them from the landing
  tables.

At the moment the landing tables and the data ingestion logic (Snowflake
streams and tasks) must be written by the user, but they can in
principle be automatically generated.

TODO:
- Docs (#867)
- WebConsole support (#859)
- Support Snowflake's `TIMESTAMP` format (#862)
- Figure out how to apply multiple updates atomically
  (See: snowflakedb/snowflake-kafka-connector#717)
- Test under load.
- Automate the generation of landing tables and data ingestion tasks.
- Figure out downtime and schema evolution ops.

Addresses #774

Signed-off-by: Leonid Ryzhyk <leonid@feldera.com>
  • Loading branch information
Leonid Ryzhyk committed Oct 11, 2023
1 parent 82e4f61 commit 458d5d4
Show file tree
Hide file tree
Showing 26 changed files with 1,209 additions and 208 deletions.
11 changes: 11 additions & 0 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,17 @@ jobs:
env:
itest_config: ${{ secrets.itest_config }}

# Ship secrets for the Snowflake CI account to Earthly.
- name: Snowflake .env
run: |
echo SNOWFLAKE_CI_USER_PRIVATE_KEY_PASSPHRASE="${snowflake_passphrase}" >> deploy/.env && \
echo SNOWFLAKE_CI_USER_PRIVATE_KEY="${snowflake_key}" >> deploy/.env && \
echo SNOWFLAKE_CI_USER_PASSWORD="${snowflake_password}" >> deploy/.env
env:
snowflake_key: ${{ secrets.snowflake_ci_user_private_key }}
snowflake_passphrase: ${{ secrets.snowflake_ci_user_private_key_passphrase }}
snowflake_password: ${{ secrets.snowflake_ci_user_password }}

- name: Earthly version
run: earthly --version

Expand Down
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ demo/project_demo00-SecOps/simulator/Cargo.lock
demo/pipeline_data

# Used to run earthly locally.
.env
deploy/.env

# These are backup files generated by rustfmt
Expand Down
5 changes: 5 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,11 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

## [Unreleased]

### Added

- Experimental Snowflake sink
([#774](https://github.com/feldera/feldera/issues/774))

## [0.1.7] - 2023-10-10

### Added
Expand Down
Loading

0 comments on commit 458d5d4

Please sign in to comment.