The dbt adapter for Confluent Cloud Flink SQL.
Build, test, and manage streaming data transformations on Confluent Cloud using dbt's familiar development workflow.
dbt-confluent lets you use dbt to define and run SQL transformations on Confluent Cloud's fully managed Apache Flink service. It supports both batch-style and streaming materializations, enabling continuous data pipelines defined as dbt models.
Features:
- Standard dbt materializations (table, view, ephemeral) adapted for Flink SQL
- Streaming-native materializations (
streaming_table,streaming_source) for continuous data pipelines - Materialized views powered by Flink's continuous query execution
- Integration with Confluent Cloud connectors (e.g., Datagen/Faker) via
streaming_source
See Materializations for the full list and details.
pip install dbt-confluentor with uv:
uv add dbt-confluentRequires Python 3.10+.
After installing, scaffold a new project with:
dbt init my_projectSelect confluent as the adapter and fill in the prompts for your Confluent Cloud credentials (API key, compute pool, environment, etc.).
Confluent Cloud Flink uses different terminology than traditional databases. Here's how dbt concepts map to Flink and Confluent Cloud:
| dbt concept | Flink concept | Confluent Cloud entity |
|---|---|---|
database |
Catalog | Environment |
schema |
Database | Kafka cluster |
Unlike most dbt adapters, dbt-confluent cannot create or drop schemas — a dbt schema maps to a Flink database (Kafka cluster) in Confluent Cloud, which is managed externally. Both the dbname in your profiles.yml and any model-level schema config must reference an existing Flink database by name:
# dbt_project.yml
models:
my_project:
+schema: my-kafka-clusterA streaming table creates a table and runs a continuous INSERT query against it:
-- models/pageviews_enriched.sql
{{
config(
materialized='streaming_table',
with={'changelog.mode': 'append'}
)
}}
SELECT
p.user_id,
p.page_url,
u.username
FROM {{ ref('pageviews') }} p
JOIN {{ ref('users') }} u ON p.user_id = u.user_idA streaming source creates a connector-backed source table. The model SQL defines the column definitions:
-- models/datagen_users.sql
{{
config(
materialized='streaming_source',
connector='faker',
with={'rows-per-second': '10'}
)
}}
`user_id` INT,
`username` STRING,
`email` STRINGSee Materializations for the full list and details.
- No schema management: Flink databases (Kafka clusters) cannot be created or dropped — they are managed in Confluent Cloud.
- No table renames:
ALTER TABLE RENAMEis not supported; to effectively rename a model you must drop and recreate the underlying table, which fortable,streaming_table, andstreaming_sourcematerializations requires running with--full-refresh. - No transactions: Flink SQL is non-transactional.
- No snapshots: Flink SQL lacks the batch operations (MERGE, UPDATE) required by dbt snapshots.
- No incremental: dbt's batch-incremental semantics does not map to Flink's continuous processing model. Use
streaming_tableinstead.
git clone https://github.com/confluentinc/dbt-confluent
cd dbt-confluent
uv sync --extra dev --extra testuv run ruff check dbt/ tests/
uv run ruff format --check dbt/ tests/Tests require a Confluent Cloud environment. Set the following environment variables (or add them to a test.env file):
export CONFLUENT_ENV_ID=env-xxxxxx
export CONFLUENT_ORG_ID=xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx
export CONFLUENT_COMPUTE_POOL_ID=lfcp-xxxxx
export CONFLUENT_CLOUD_PROVIDER=aws
export CONFLUENT_CLOUD_REGION=us-west-6
export CONFLUENT_TEST_DBNAME=dbname
export CONFLUENT_FLINK_API_KEY=xxx
export CONFLUENT_FLINK_API_SECRET=xxxuv run pytestThis adapter follows semantic versioning and is versioned independently from dbt Core. Compatibility with dbt Core is declared via dependencies (currently requires dbt-core~=1.11).
Apache-2.0 — see LICENSE for details.