diff --git a/sqlmesh/dbt/target.py b/sqlmesh/dbt/target.py index edc1dc1987..7a84ca2cd8 100644 --- a/sqlmesh/dbt/target.py +++ b/sqlmesh/dbt/target.py @@ -1,6 +1,7 @@ from __future__ import annotations import abc +import logging import sys import typing as t from pathlib import Path @@ -41,6 +42,8 @@ else: from typing_extensions import Literal +logger = logging.getLogger(__name__) + IncrementalKind = t.Union[ t.Type[IncrementalByUniqueKeyKind], t.Type[IncrementalByTimeRangeKind], @@ -171,6 +174,8 @@ def validate_authentication( if path is None or path == DUCKDB_IN_MEMORY else Path(t.cast(str, path)).stem ) + if "threads" in values and t.cast(int, values["threads"]) > 1: + logger.warning("DuckDB does not support concurrency - setting threads to 1.") return values def default_incremental_strategy(self, kind: IncrementalKind) -> str: @@ -190,7 +195,7 @@ def to_sqlmesh(self) -> ConnectionConfig: kwargs["connector_config"] = self.settings return DuckDBConnectionConfig( database=self.path, - concurrent_tasks=self.threads, + concurrent_tasks=1, **kwargs, ) diff --git a/tests/dbt/test_config.py b/tests/dbt/test_config.py index 0f4d4f1748..4e19d63f80 100644 --- a/tests/dbt/test_config.py +++ b/tests/dbt/test_config.py @@ -1,6 +1,7 @@ import base64 import typing as t from pathlib import Path +from shutil import copytree from unittest.mock import PropertyMock import pytest @@ -12,6 +13,7 @@ from sqlmesh.core.model import SqlModel from sqlmesh.dbt.common import Dependencies from sqlmesh.dbt.context import DbtContext +from sqlmesh.dbt.loader import sqlmesh_config from sqlmesh.dbt.model import IncrementalByUniqueKeyKind, Materialization, ModelConfig from sqlmesh.dbt.project import Project from sqlmesh.dbt.source import SourceConfig @@ -379,6 +381,29 @@ def _test_warehouse_config( return config +def test_duckdb_threads(tmp_path): + dbt_project_dir = "tests/fixtures/dbt/sushi_test" + temp_dir = tmp_path / "sushi_test" + + copytree(dbt_project_dir, temp_dir, symlinks=True) + + with open(temp_dir / "profiles.yml", "w") as f: + f.write( + """ + sushi: + outputs: + in_memory: + type: duckdb + schema: sushi + threads: 4 + target: in_memory + """ + ) + + config = sqlmesh_config(temp_dir) + assert config.gateways["in_memory"].connection.concurrent_tasks == 1 + + def test_snowflake_config(): _test_warehouse_config( """