Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
17 commits
Select commit Hold shift + click to select a range
58d77e0
Add BigQuery query client and dialect support
colinmf Mar 7, 2026
0b27190
Fix pre-commit: ruff format, trailing commas, GraphQL schema
colinmf Mar 7, 2026
60377d3
Fix BigQuery test failures: QueryJobConfig mock and BIGNUMERIC precision
colinmf Mar 7, 2026
0b943cd
Add tests for _get_client and test_connection to reach 100% coverage
colinmf Mar 7, 2026
b04993a
Add test for BigQuery ImportError path in utils to reach 100% coverage
colinmf Mar 7, 2026
01a83a1
Add engine URI project resolution to BigQueryClient (#1)
colinmf Mar 7, 2026
653611c
Address review comments on BigQueryClient (#2)
colinmf Mar 7, 2026
4bb8636
Merge branch 'main' into colinmf/bigquery-integration
colinmf Mar 7, 2026
48ae7b4
Add BigQuery project resolution order from engine URI
colinmf Mar 7, 2026
dead3a2
Merge fork/colinmf/bigquery-integration: combine URI parsing and _get…
colinmf Mar 7, 2026
64208ae
Add BigQuery query execution support to datajunction-query
colinmf Mar 7, 2026
6022f1f
Add comprehensive BigQuery integration tests
colinmf Mar 7, 2026
bfd3a0f
Fix mypy type error and ruff formatting in BigQuery query execution
colinmf Mar 7, 2026
6b974a4
Fix test_get_columns_for_table_with_engine_project_override mock setup
colinmf Mar 7, 2026
5f26afc
Add tests for uncovered branches in _get_project_from_engine
colinmf Mar 7, 2026
031ed72
Strip catalog prefix from BigQuery SQL and fix ADC credential handling
colinmf Mar 7, 2026
432927c
Remove unused variable flagged by ruff (F841)
colinmf Mar 7, 2026
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions datajunction-query/config.djqs.yml
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,12 @@ engines:
version: ''
type: sqlalchemy
uri: postgresql+psycopg://readonly_user:readonly_pass@postgres_metadata:5432/dj
# - name: bigquery
# version: ''
# type: bigquery
# uri: bigquery:///
# extra_params:
# project: <your-gcp-project>
catalogs:
- name: warehouse
engines:
Expand All @@ -26,3 +32,6 @@ catalogs:
- name: dj
engines:
- dj_system
# - name: <your-gcp-project>
# engines:
# - bigquery
1 change: 1 addition & 0 deletions datajunction-query/djqs/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ class EngineType(Enum):
SQLALCHEMY = "sqlalchemy"
SNOWFLAKE = "snowflake"
TRINO = "trino"
BIGQUERY = "bigquery"


class EngineInfo: # pylint: disable=too-few-public-methods
Expand Down
43 changes: 43 additions & 0 deletions datajunction-query/djqs/engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@

import duckdb
import snowflake.connector
from google.cloud import bigquery
from psycopg_pool import AsyncConnectionPool
from sqlalchemy import create_engine, text

Expand Down Expand Up @@ -117,6 +118,25 @@ def run_query( # pylint: disable=R0914
cur = conn.cursor()

return run_snowflake_query(query, cur)
elif engine.type == EngineType.BIGQUERY:
_logger.info("Creating BigQuery client")
project = engine.extra_params.get("project")
credentials_path = engine.extra_params.get("credentials_path")
location = engine.extra_params.get("location")

client_kwargs = {}
if credentials_path:
from google.oauth2 import service_account

credentials = service_account.Credentials.from_service_account_file(
credentials_path,
)
client_kwargs["credentials"] = credentials
if location:
client_kwargs["location"] = location

bq_client = bigquery.Client(project=project, **client_kwargs)
return run_bigquery_query(query, bq_client)

_logger.info(
"Creating sqlalchemy engine using engine name and version defined on query",
Expand Down Expand Up @@ -164,6 +184,29 @@ def run_snowflake_query(
return output


def run_bigquery_query(
query: Query,
client: bigquery.Client,
) -> List[Tuple[str, List[ColumnMetadata], Stream]]:
"""
Run a query against BigQuery.

DJ prefixes table references with the catalog name (e.g. your-gcp-project.dataset.table),
but BigQuery interprets three-part names as project.dataset.table. Since the client
already has the correct project configured, we strip the catalog prefix so BigQuery
receives dataset.table references instead.
"""
sql = query.submitted_query
if query.catalog_name:
sql = sql.replace(f"{query.catalog_name}.", "")
output: List[Tuple[str, List[ColumnMetadata], Stream]] = []
result = client.query(sql).result()
rows = iter([tuple(row.values()) for row in result])
columns: List[ColumnMetadata] = []
output.append((sql, columns, rows))
return output


def serialize_for_json(obj):
"""
Handle serialization of date/datetimes for JSON output.
Expand Down
1 change: 1 addition & 0 deletions datajunction-query/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ dependencies = [
"rich>=10.16.2",
"toml>=0.10.2",
"snowflake-connector-python>=3.3.1",
"google-cloud-bigquery>=3.11.0",
"pyyaml>=6.0.1",
"trino>=0.324.0",
"psycopg[pool]>=3.2.1",
Expand Down
Loading
Loading