Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 7 additions & 8 deletions .github/workflows/python.yml
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,13 @@ jobs:
uses: actions/setup-python@v4
with:
python-version: ${{ matrix.python-version }}
- name: Install uv
uses: astral-sh/setup-uv@v4
with:
version: "latest"
- name: Install dependencies
run: |
python -m pip install --upgrade pip
pip install uv
uv sync --group dev
- name: Test with pytest
run: |
uv run pytest
- name: Test build
uv sync --all-groups
- name: Run CI checks
run: |
uv build
make ci
22 changes: 15 additions & 7 deletions CONTRIBUTING.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,10 @@ Thank you for contributing to the DeltaStream Python client.
- NPM: `npx @openapitools/openapi-generator-cli`
- Docker image: `openapitools/openapi-generator-cli`

Sync dependencies:
Install dependencies:

```bash
uv sync
make install
```

#### Regenerating the controlplane OpenAPI client (apiv2)
Expand All @@ -36,10 +36,18 @@ Notes:
#### Tests / Lint / Types

```bash
uv run pytest
uv run ruff check --fix
uv run ruff format
uv run mypy
make test # Run all tests
make lint # Run linting checks
make format # Format code
make mypy # Run type checking
make ci # Run all CI checks (lint, format, mypy, unit-tests, build)
```


For additional options:
```bash
make unit-tests # Run unit tests only (exclude integration tests)
make check-format # Check if code formatting is correct
make build # Build the package
make clean # Clean build artifacts
make help # Show all available targets
```
63 changes: 63 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
.PHONY: help install lint format check-format mypy test unit-tests build ci clean jupyter

# Default target
help:
@echo "Available targets:"
@echo " install Install all dependencies with uv"
@echo " lint Run ruff linting checks"
@echo " format Format code with ruff"
@echo " check-format Check if code formatting is correct"
@echo " mypy Run mypy type checking"
@echo " test Run all tests with pytest"
@echo " unit-tests Run unit tests only (exclude integration tests)"
@echo " build Build the package"
@echo " ci Run all CI checks (lint, format, mypy, unit-tests, build)"
@echo " clean Clean build artifacts"
@echo " jupyter Run Jupyter Lab"

# Install dependencies
install:
uv sync --all-groups

# Linting
lint:
uv run ruff check

# Format code
format:
uv run ruff format

# Check formatting
check-format:
uv run ruff format --check

# Type checking
mypy:
uv run mypy

# Unit tests
test:
uv run pytest

# Unit tests only (exclude integration tests)
unit-tests:
uv run pytest -m "not integration"

# Build package
build:
uv build

# Run all CI checks
ci: lint check-format mypy unit-tests build
@echo "All CI checks passed!"

# Clean build artifacts
clean:
rm -rf dist/
rm -rf build/
rm -rf *.egg-info/
find . -type d -name __pycache__ -exec rm -rf {} +
find . -type f -name "*.pyc" -delete

jupyter:
uv run --with jupyter jupyter lab
8 changes: 4 additions & 4 deletions mypy.ini
Original file line number Diff line number Diff line change
Expand Up @@ -56,16 +56,16 @@ no_implicit_reexport = true
warn_return_any = true

# Configuration for specific modules
[mypy-src.api.controlplane.openapi_client.*]
[mypy-src.deltastream.api.controlplane.openapi_client.*]
ignore_errors = true

[mypy-api.controlplane.openapi_client.*]
[mypy-deltastream.api.controlplane.openapi_client.*]
ignore_errors = true

[mypy-src.api.dataplane.openapi_client.*]
[mypy-src.deltastream.api.dataplane.openapi_client.*]
ignore_errors = true

[mypy-api.dataplane.openapi_client.*]
[mypy-deltastream.api.dataplane.openapi_client.*]
ignore_errors = true

[mypy-websockets.*]
Expand Down
97 changes: 97 additions & 0 deletions scripts/generate_apiv2.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
from __future__ import annotations

import os
import shutil
import subprocess
import sys
from pathlib import Path
from typing import List


def _find_openapi_generator() -> List[str]:
"""Return the base command to run OpenAPI Generator.

Prefers the `openapi-generator` binary (Homebrew install),
falls back to `openapi-generator-cli` if available.
"""
candidates = ["openapi-generator", "openapi-generator-cli"]
for candidate in candidates:
if shutil.which(candidate):
return [candidate]

# Optional: support docker if available and the image exists locally
if shutil.which("docker"):
return [
"docker",
"run",
"--rm",
"-u",
f"{os.getuid()}:{os.getgid()}",
"-v",
f"{Path.cwd()}:{Path.cwd()}",
"-w",
f"{Path.cwd()}",
"openapitools/openapi-generator-cli",
]

raise FileNotFoundError(
"OpenAPI Generator CLI not found. Install with `brew install openapi-generator` "
"or use `npx @openapitools/openapi-generator-cli`, or ensure Docker image "
"openapitools/openapi-generator-cli is available."
)


def main() -> int:
"""Generate the controlplane OpenAPI Python client in-place.

This regenerates `deltastream.api.controlplane.openapi_client` from
`src/deltastream/api/controlplane/api-server-v2.yaml`.
"""
repo_root = Path(__file__).resolve().parent.parent
src_dir = repo_root / "src"
spec_path = src_dir / "deltastream/api/controlplane/api-server-v2.yaml"

if not spec_path.exists():
print(f"Spec file not found: {spec_path}", file=sys.stderr)
return 1

base_cmd = _find_openapi_generator()

# Use python generator with pydantic v2 and lazy imports
cmd = base_cmd + [
"generate",
"-i",
str(spec_path),
"-g",
"python",
"-o",
str(src_dir), # generate inside src so package path is respected
"--skip-validate-spec",
"--additional-properties",
(
"generateSourceCodeOnly=true,"
"lazyImport=true,"
"enumClassPrefix=true,"
"modelPropertyNaming=original,"
"packageName=deltastream.api.controlplane.openapi_client"
),
]

# Ensure output dir exists
(src_dir / "deltastream").mkdir(parents=True, exist_ok=True)

print("Running:", " ".join(cmd))
try:
subprocess.run(cmd, check=True)
except subprocess.CalledProcessError as exc:
print(
f"OpenAPI Generator failed with exit code {exc.returncode}", file=sys.stderr
)
return exc.returncode

print("OpenAPI client regenerated at: deltastream/api/controlplane/openapi_client")
return 0


if __name__ == "__main__":
raise SystemExit(main())
33 changes: 26 additions & 7 deletions src/deltastream/api/conn.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
)
from .handlers import StatementHandler, map_error_response
from deltastream.api.controlplane.openapi_client.configuration import Configuration
from uuid import UUID


class APIConnection:
Expand All @@ -31,7 +32,7 @@ def __init__(
token_provider: Callable[[], Awaitable[str]],
session_id: Optional[str],
timezone: str,
organization_id: Optional[str],
organization_id: Optional[Union[str, UUID]],
role_name: Optional[str],
database_name: Optional[str],
schema_name: Optional[str],
Expand All @@ -42,8 +43,24 @@ def __init__(
self.server_url = server_url
self.session_id = session_id
self.timezone = timezone
# Convert to UUID if provided and valid
org_uuid = None
if organization_id is not None:
if isinstance(organization_id, UUID):
org_uuid = organization_id
elif isinstance(organization_id, str):
try:
org_uuid = UUID(organization_id)
except ValueError as e:
raise ValueError(
f"Invalid organization_id: '{organization_id}' is not a valid UUID string"
) from e
else:
raise TypeError(
f"organization_id must be a string or UUID, got {type(organization_id)}"
)
self.rsctx = ResultSetContext(
organization_id=organization_id,
organization_id=org_uuid,
role_name=role_name,
database_name=database_name,
schema_name=schema_name,
Expand Down Expand Up @@ -145,11 +162,13 @@ async def query(self, query: str, attachments: Optional[List[Blob]] = None) -> R
)

if dp_req.request_type == "result-set":
dp_rs = await dpconn.get_statement_status(dp_req.statement_id, 0)
dp_rs = await dpconn.get_statement_status(
UUID(dp_req.statement_id), 0
)
cp_rs = self._dataplane_to_controlplane_resultset(dp_rs)

async def cp_get_statement_status(
statement_id: str, partition_id: int
statement_id: UUID, partition_id: int
) -> CPResultSet:
dp_result = await dpconn.get_statement_status(
statement_id, partition_id
Expand All @@ -169,7 +188,7 @@ async def cp_get_statement_status(
cp_rs = self._dataplane_to_controlplane_resultset(rs)

async def cp_get_statement_status(
statement_id: str, partition_id: int
statement_id: UUID, partition_id: int
) -> CPResultSet:
result = await self.statement_handler.get_statement_status(
statement_id, partition_id
Expand Down Expand Up @@ -306,7 +325,7 @@ def _dataplane_to_controlplane_resultset(dp_rs) -> CPResultSet:
cp_meta = CPResultSetMetadata(
encoding=getattr(meta, "encoding", ""),
partitionInfo=getattr(meta, "partition_info", None),
columns=getattr(meta, "columns", None),
columns=getattr(meta, "columns", []),
dataplaneRequest=getattr(meta, "dataplane_request", None),
context=getattr(meta, "context", None),
)
Expand All @@ -330,7 +349,7 @@ async def submit_statement(
raise

async def get_statement_status(
self, statement_id: str, partition_id: int
self, statement_id: UUID, partition_id: int
) -> CPResultSet:
try:
await self._set_auth_header()
Expand Down
13 changes: 9 additions & 4 deletions src/deltastream/api/dpconn.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
from typing import Optional
from urllib.parse import urlparse, parse_qs
import asyncio
from uuid import UUID

from deltastream.api.dataplane.openapi_client import (
Configuration,
Expand Down Expand Up @@ -34,14 +35,14 @@ def __init__(

self.api = DataplaneApi(config)

def _get_statement_status_api(self, statement_id: str, partition_id: int):
def _get_statement_status_api(self, statement_id: UUID, partition_id: int):
resp = self.api.get_statement_status(
statement_id=statement_id, partition_id=partition_id
)
return resp

async def get_statement_status(
self, statement_id: str, partition_id: int
self, statement_id: UUID, partition_id: int
) -> ResultSet:
try:
resp = self._get_statement_status_api(statement_id, partition_id)
Expand All @@ -65,7 +66,11 @@ async def get_statement_status(
statement_status.statement_id, partition_id
)
else:
raise SQLError("Invalid statement status", "", "")
raise SQLError(
"Invalid statement status",
"",
UUID("00000000-0000-0000-0000-000000000000"),
)
else:
result_set = resp.body
if result_set.sql_state == SqlState.SQL_STATE_SUCCESSFUL_COMPLETION:
Expand All @@ -79,7 +84,7 @@ async def get_statement_status(
except Exception as exc:
raise RuntimeError(str(exc))

async def wait_for_completion(self, statement_id: str) -> ResultSet:
async def wait_for_completion(self, statement_id: UUID) -> ResultSet:
result_set = await self.get_statement_status(statement_id, 0)
if result_set.sql_state == SqlState.SQL_STATE_SUCCESSFUL_COMPLETION:
return result_set
Expand Down
3 changes: 2 additions & 1 deletion src/deltastream/api/error.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
from enum import Enum
from uuid import UUID


class InterfaceError(Exception):
Expand Down Expand Up @@ -32,7 +33,7 @@ def __init__(self, message: str):


class SQLError(Exception):
def __init__(self, message: str, code: str, statement_id: str):
def __init__(self, message: str, code: str, statement_id: UUID):
super().__init__(message)
self.name = "SQLError"
try:
Expand Down
Loading