Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 6 additions & 1 deletion sqlmesh/dbt/target.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
from __future__ import annotations

import abc
import logging
import sys
import typing as t
from pathlib import Path
Expand Down Expand Up @@ -41,6 +42,8 @@
else:
from typing_extensions import Literal

logger = logging.getLogger(__name__)

IncrementalKind = t.Union[
t.Type[IncrementalByUniqueKeyKind],
t.Type[IncrementalByTimeRangeKind],
Expand Down Expand Up @@ -171,6 +174,8 @@ def validate_authentication(
if path is None or path == DUCKDB_IN_MEMORY
else Path(t.cast(str, path)).stem
)
if "threads" in values and t.cast(int, values["threads"]) > 1:
logger.warning("DuckDB does not support concurrency - setting threads to 1.")
return values

def default_incremental_strategy(self, kind: IncrementalKind) -> str:
Expand All @@ -190,7 +195,7 @@ def to_sqlmesh(self) -> ConnectionConfig:
kwargs["connector_config"] = self.settings
return DuckDBConnectionConfig(
database=self.path,
concurrent_tasks=self.threads,
concurrent_tasks=1,
**kwargs,
)

Expand Down
25 changes: 25 additions & 0 deletions tests/dbt/test_config.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import base64
import typing as t
from pathlib import Path
from shutil import copytree
from unittest.mock import PropertyMock

import pytest
Expand All @@ -12,6 +13,7 @@
from sqlmesh.core.model import SqlModel
from sqlmesh.dbt.common import Dependencies
from sqlmesh.dbt.context import DbtContext
from sqlmesh.dbt.loader import sqlmesh_config
from sqlmesh.dbt.model import IncrementalByUniqueKeyKind, Materialization, ModelConfig
from sqlmesh.dbt.project import Project
from sqlmesh.dbt.source import SourceConfig
Expand Down Expand Up @@ -379,6 +381,29 @@ def _test_warehouse_config(
return config


def test_duckdb_threads(tmp_path):
dbt_project_dir = "tests/fixtures/dbt/sushi_test"
temp_dir = tmp_path / "sushi_test"

copytree(dbt_project_dir, temp_dir, symlinks=True)

with open(temp_dir / "profiles.yml", "w") as f:
f.write(
"""
sushi:
outputs:
in_memory:
type: duckdb
schema: sushi
threads: 4
target: in_memory
"""
)

config = sqlmesh_config(temp_dir)
assert config.gateways["in_memory"].connection.concurrent_tasks == 1


def test_snowflake_config():
_test_warehouse_config(
"""
Expand Down