A Delta Lake connector for GCP Datastream
Datastream is a GCP a Serverless Change Data Capture (CDC). You setup and configure Datastream to monitor your databases, and it delivers all the changes (insert, update, and delete) in a semi-real time stream.
Delta Lake is an open source Spark library that provides ACID transactions, scalable metadata handling, and unifies streaming and batch data processing on top of Spark.
Currently Datastream streams changes to files in Google Cloud Storage (GCS). The Delta Lake connector streams those files and writes the changes to Delta Lake in two forms
- Change Log tables - contain every single change that was made in the source database since the replication started. Can be replayed to rebuild the state of the database in any given point in the past.
- Merged tables - contain the most recent snapshot of the database.
The overall architecture is as follows,
- Supported Source Databases: Currently only MySQL and Oracle as supported. However other sources are on the roadmap. Ones avaiable, the can be integrated into this connector with a few lines of code
- Supported Sinks: Currently Datastream supports streaming data only to GCS. However, support for publishing messages directly to Pub/Sub in under development
GCP provides two ways to run spark: DataProc and Databricks running on GCP In this guide we will be using Databricks.
sbt "project spark" clean assembly
- Create a CloudSQL instance: https://cloud.google.com/sql/docs/mysql/create-instance
- Configure the CloudSQL instance to for Datastream: https://cloud.google.com/datastream/docs/configure-your-source-mysql-database
- Create a Datastream connection profile: https://cloud.google.com/datastream/docs/create-connection-profiles
- Create a Datastream stream: https://cloud.google.com/datastream/docs/create-a-stream
Follow instructions in app-engine-mock-data-producer/
After a couple of minutes you should see files being writen
- https://docs.gcp.databricks.com/getting-started/try-databricks-gcp.html
- Configure a Databricks cluster with a GCP service account: https://docs.gcp.databricks.com/clusters/configure.html#google-service-account
- The service account should have read permission to the GCS bucket used by Datastream
- Deploy a new Databricks cluster with environment variables.
- Follow the instructions for building a fat jar for the connector
- In Databricks, define a new job of type JAR
- main class is
io.badal.databricks.jobs.DatastreamDeltaStreamingJob
- add the fat jar as a dependent library
- point to the databricks cluster with your configuration setup
- main class is
- Run the job
We have provided a notebook which you can import, modify configurations, and run. The notebooks are in the notebooks directory of this project.
If you need to see examples on how to configure the job programatically see spark/src/test/scala/io/badal/databricks/config/DatastreamDeltaConfigSpec.scala
You need to download gcloud command line tools Download and set up gcloud sdk (https://cloud.google.com/sdk/docs/install) and executing the following commands through CLI:
gcloud auth application-default login
May need to set your project configuration to the relevant one also,
gcloud config set project sandbox-databricks
sbt test
Configuration of the connector is achieved through a series of environment variables
Configuration | Description | Default | Required |
---|---|---|---|
GENERATE_LOG_TABLE | flag to control whether not an intermediate delta table (change log table) will be generated. The table will have the suffix _log |
true | no |
CHECKPOINT_DIR | Location of the structured streaming checkpoints (note should include protocol) | dbfs:/datastream-deltalake-connector/_checkpoint | no |
NAME | The name of the Spark application | datastream-deltalake-connector | no |
READ_FORMAT | The file format used by datastream. Currently supports only avro / parquet / json (can be used but not actually supported by datastream) |
avro | no |
FILE_READ_CONCURRENCY | The max number of files per table which can be read in a single batch | 2 | no |
DATASTREAM_TABLE_SOURCE_TYPE | A type flag used to distinguish between the different mechanisms for providing the Datastream tables to stream into Delta Currently supports two types discovery-bucket and local-directory |
discovery-bucket | no |
BUCKET | The GCS bucket to be used if using the discovery-bucket type | None | yes (when type is discovery-bucket ) |
SOURCE_PATH | When using discovery-bucket this optionally provides a path to a directory within the bucket to be used. For local-directory this is a file path to the local directory where tables are located |
None | yes (when type is local-directory ) |
DELTA_TABLE_NAME_PREFIX | None | no | |
DELTA_MERGE_FREQUENCY_MINUTES | The microbatch duration of the job (how often changes will be merged into delta) | 1 | no |
DELTA_SCHEMA_EVOLUTION_STRATEGY | The strategy for dealing with schema changes. There are currently three types: mergeSchema - attempts to use delta's pre-defined merge strategy overwriteSchema - uses delta's overwrite schema option (will break backwards compatibility) none - no strategy will be applied and schema changes will cause failures https://databricks.com/blog/2019/09/24/diving-into-delta-lake-schema-enforcement-evolution.html |
mergeSchema | no |
DELTA_TABLE_PATH | The location of the delta table data / log | dbfs:/datastream-deltalake-connector/_delta | no |
DELTA_DATABASE | An override to the database to register tables to in the catalog When not provided the database is inferred from the record metadate |
None | no |
DELTA_MICROBATCH_PARTITIONS | The number of partitions in the streaming dataframe for each microbatch being processed This applies both the delta log table and the merged table. This config will impact the number of parquet files which make up the delta table When not provided the data will not repartitioned |
None | no |
DATABRICKS_DELTA_AUTO_COMPACTION_ENABLED | If set to true, compaction will be run on a table after every merge This setting is only available on Databricks runtime |
false | no |
DATABRICKS_DELTA_MIN_NUMBER_OF_FILES | None (will use databricks default) | no | |
DATABRICKS_DELTA_MAX_FILE_SIZE_BYTES | If auto compaction is enabled, this will be an upper bound to the compacted file size | None (will use databricks default) | no |
DATABRICKS_DELTA_TARGET_FILE_SIZE_BYTES | If auto compaction is enabled, files will attempt to be compacted to roughly this size in bytes This setting is only available on Databricks runtime |
None (will use databricks default value) | no |
DATABRICKS_DELTA_AUTO_OPTIMIZE_ENABLED | If set to true, optimize will be run on a table after every merge This setting is only available on Databricks runtime |
false | no |
The connector is modeled after the official Datastream Dataflow connector
The ingestion can be divided into several phases
- Table discovery
- The Datastream root directory is scanned to discover all sub-folder. Each subfolder contains a separate table.
- Currently, this stage runs only once when the job stars - it will not pick up and new tables that are added while the job runs
- Stream creation per table
- For each table/sub-folder a new stream is created
- The TableMetadata is extracted by reading the first record from the stream
- A new Database and tables are created if they don't already exist
- Ingestion into log tables
- The stream is written (in append mode) as is into an intermediate log table
- The log table has a row for each record in the source GCS files
- Merge from logs tables into final tables
- The log tables is used as a Delta streaming source.
- For each micro-batch we
- Migrate the schema of the target table. This is done by appending an empty Dataframe with the new schema. code
- Executing a Delta merge command into the target table. code
Recovery from failure is supported using checkpointing
Bigest issues we have run into with running the connector are related Delta tables accumulating many small files.
We have provided a configuration to DELTA_MICROBATCH_PARTITIONS
which can be used to limit the number of files
generated per microbatch.
Overtime, the number of files will still grow and it is recommended to periodically perform compaction.
See - Delta Lake Best Practices
If running the connector in a Databricks runtime we have had success using the auto compaction enabled in conjunction with setting target file size bytes if the default settings are not producing a good file layout for the delta tables.
See - Databricks Optimization Docs
- Updating primary key columns has not been tested
- source_metadata.is_deleted column is used to detect deletes, while the change_type column is ignored (similar to the Dataflow implementation)
- New Datastream tables are not auto-discovered. Ingesting newly added tables requires restating the connector
- Add custom metrics for better monitoring
- Discover new tables that were added while the job is running
- Provide instructions on how to setup the jobs to restart on failure
- Better error recovery in case of one a single stream failing (i.e it should not affect other streams)