Skip to content

Commit

Permalink
πŸ› Update Convex destination connector to fix overwrite sync mode (#26103
Browse files Browse the repository at this point in the history
)

* auto-generated destination connector template

* add config and health check

* support full_refresh overwrite

* support full refresh overwrite, full refresh append, and incremental append

* write works!

* lint

* update readme

* add test messages

* list destination??

* surface error messages

* add integration test

* add destination docs

* update source docs

* change integration test to unit test

* add check unit test

* use a map for streams

* make compatible with API changes

* add indexes for primary keys

* poll for indexes to be backfilled

* simplified stream API

* support append_dedup sync mode

* cleanup

* add changelog

* fix changelog

* use indexes ready endpoint

* update types

* rename to include primary key in API, handle namespaces

* fix types

* update docs

* add better test cases

* fix docs link

* update API and headers

* fix formatting

* update API name to streaming_import

* improve request exception and add a test for error formatting

* remove secret config

* use /replace_tables endpoint instead of /clear_tables

* partial revert of replace_tables

* fix

* formatting and fix table names

* update docs

* update metadata and dockerfile

---------

Co-authored-by: Lee Danilek <lee@convex.dev>
Co-authored-by: Marcos Marx <marcosmarxm@users.noreply.github.com>
  • Loading branch information
3 people committed May 24, 2023
1 parent 0032a77 commit 360e2f9
Show file tree
Hide file tree
Showing 11 changed files with 32 additions and 49 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -34,5 +34,5 @@ COPY destination_convex ./destination_convex
ENV AIRBYTE_ENTRYPOINT "python /airbyte/integration_code/main.py"
ENTRYPOINT ["python", "/airbyte/integration_code/main.py"]

LABEL io.airbyte.version=0.1.0
LABEL io.airbyte.version=0.2.0
LABEL io.airbyte.name=airbyte/destination-convex
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
#
# Copyright (c) 2022 Airbyte, Inc., all rights reserved.
# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
#

from typing import Any, List, Mapping
Expand All @@ -16,31 +16,27 @@ def __init__(self, config: ConvexConfig, table_metadata: Mapping[str, Any]):

def batch_write(self, records: List[Mapping[str, Any]]) -> requests.Response:
"""
See Convex docs: https://docs.convex.dev/http-api/#post-apiairbyte_ingress
See Convex docs: https://docs.convex.dev/http-api/#post-apistreaming_importimport_airbyte_records
"""
request_body = {"tables": self.table_metadata, "messages": records}
return self._request("POST", endpoint="import_airbyte_records", json=request_body)

@staticmethod
def temp_table_name(table_name: str, timestamp: str) -> str:
return f"temp_{timestamp}_{table_name}"

def replace_tables(self, table_names: Mapping[str, str]) -> requests.Response:
def delete(self, keys: List[str]) -> requests.Response:
"""
See Convex docs: https://docs.convex.dev/http-api/#post-apireplace_tables
See Convex docs: https://docs.convex.dev/http-api/#put-apistreaming_importclear_tables
"""
request_body = {"tableNames": table_names}
return self._request("POST", endpoint="replace_tables", json=request_body)
request_body = {"tableNames": keys}
return self._request("PUT", endpoint="clear_tables", json=request_body)

def add_primary_key_indexes(self, indexes: Mapping[str, List[List[str]]]) -> requests.Response:
"""
See Convex docs: https://docs.convex.dev/http-api/#put-apiadd_primary_key_indexes
See Convex docs: https://docs.convex.dev/http-api/#put-apistreaming_importadd_primary_key_indexes
"""
return self._request("PUT", "add_primary_key_indexes", json={"indexes": indexes})

def primary_key_indexes_ready(self, tables: List[str]) -> requests.Response:
"""
See Convex docs: https://docs.convex.dev/http-api/#get-apiprimary_key_indexes_ready
See Convex docs: https://docs.convex.dev/http-api/#get-apistreaming_importprimary_key_indexes_ready
"""
return self._request("GET", "primary_key_indexes_ready", json={"tables": tables})

Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
#
# Copyright (c) 2022 Airbyte, Inc., all rights reserved.
# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
#

from typing import TypedDict
Expand Down
Original file line number Diff line number Diff line change
@@ -1,9 +1,8 @@
#
# Copyright (c) 2022 Airbyte, Inc., all rights reserved.
# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
#


import time
from logging import Logger
from typing import Any, Iterable, List, Mapping, Optional, cast

Expand Down Expand Up @@ -45,20 +44,18 @@ def write(
:return: Iterable of AirbyteStateMessages wrapped in AirbyteMessage structs
"""
config = cast(ConvexConfig, config)
timestamp = str(int(time.time()))
writer = ConvexWriter(ConvexClient(config, self.table_metadata(configured_catalog.streams, timestamp)))
writer = ConvexWriter(ConvexClient(config, self.table_metadata(configured_catalog.streams)))

# Setup: Clear tables if in overwrite mode; add indexes if in append_dedup mode.
streams_to_replace = {}
streams_to_delete = []
indexes_to_add = {}
sync_mode_for_table = {}
for configured_stream in configured_catalog.streams:
table_name = self.table_name_for_stream(configured_stream.stream.namespace, configured_stream.stream.name)
sync_mode_for_table[table_name] = configured_stream.destination_sync_mode
if configured_stream.destination_sync_mode == DestinationSyncMode.overwrite:
streams_to_replace[self.temp_table_name(table_name, timestamp)] = table_name
streams_to_delete.append(configured_stream.stream.name)
elif configured_stream.destination_sync_mode == DestinationSyncMode.append_dedup and configured_stream.primary_key:
indexes_to_add[configured_stream.stream.name] = configured_stream.primary_key
if len(streams_to_delete) != 0:
writer.delete_tables(streams_to_delete)
if len(indexes_to_add) != 0:
writer.add_indexes(indexes_to_add)

Expand All @@ -74,8 +71,6 @@ def write(
message.record.namespace,
message.record.stream,
)
if sync_mode_for_table[table_name] == DestinationSyncMode.overwrite:
table_name = self.temp_table_name(table_name, timestamp)
msg = {
"tableName": table_name,
"data": message.record.data,
Expand All @@ -88,21 +83,14 @@ def write(
# Make sure to flush any records still in the queue
writer.flush()

if len(streams_to_replace) != 0:
writer.replace_streams(streams_to_replace)

def table_name_for_stream(self, namespace: Optional[str], stream_name: str) -> str:
if namespace is not None:
return f"{namespace}_{stream_name}"
return stream_name

def temp_table_name(self, table_name: str, timestamp: str) -> str:
return f"temp_{timestamp}_{table_name}"

def table_metadata(
self,
streams: List[ConfiguredAirbyteStream],
timestamp: str,
) -> Mapping[str, Any]:
table_metadata = {}
for s in streams:
Expand All @@ -117,8 +105,6 @@ def table_metadata(
s.stream.namespace,
s.stream.name,
)
if s.destination_sync_mode == DestinationSyncMode.overwrite:
name = self.temp_table_name(name, timestamp)
table_metadata[name] = stream
return table_metadata

Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
#
# Copyright (c) 2022 Airbyte, Inc., all rights reserved.
# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
#

import time
Expand All @@ -20,10 +20,10 @@ class ConvexWriter:
def __init__(self, client: ConvexClient):
self.client = client

def replace_streams(self, table_names: Mapping[str, str]) -> None:
def delete_tables(self, table_names: List[str]) -> None:
"""Deletes all the records belonging to the input stream"""
if len(table_names) > 0:
self.client.replace_tables(table_names)
self.client.delete(table_names)

def add_indexes(self, indexes: Mapping[str, List[List[str]]]) -> None:
self.client.add_primary_key_indexes(indexes)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
#
# Copyright (c) 2022 Airbyte, Inc., all rights reserved.
# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
#


Expand Down
2 changes: 1 addition & 1 deletion airbyte-integrations/connectors/destination-convex/main.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
#
# Copyright (c) 2022 Airbyte, Inc., all rights reserved.
# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
#


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ data:
connectorType: destination
connectorSubtype: api
definitionId: 3eb4d99c-11fa-4561-a259-fc88e0c2f8f4
dockerImageTag: 0.1.0
dockerImageTag: 0.2.0
dockerRepository: airbyte/destination-convex
githubIssueLabel: destination-convex
icon: convex.svg
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
#
# Copyright (c) 2022 Airbyte, Inc., all rights reserved.
# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
#


Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
#
# Copyright (c) 2022 Airbyte, Inc., all rights reserved.
# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
#

import logging
Expand Down Expand Up @@ -81,7 +81,7 @@ def record(stream: str, str_value: str, int_value: int) -> AirbyteMessage:


def setup_good_responses(config):
responses.add(responses.POST, f"{config['deployment_url']}/api/streaming_import/replace_tables", status=200)
responses.add(responses.PUT, f"{config['deployment_url']}/api/streaming_import/clear_tables", status=200)
responses.add(responses.POST, f"{config['deployment_url']}/api/streaming_import/import_airbyte_records", status=200)
responses.add(responses.GET, f"{config['deployment_url']}/version", status=200)
responses.add(responses.PUT, f"{config['deployment_url']}/api/streaming_import/add_primary_key_indexes", status=200)
Expand All @@ -94,16 +94,16 @@ def setup_good_responses(config):


def setup_bad_response(config):
responses.add(responses.POST, f"{config['deployment_url']}/api/streaming_import/replace_tables", status=400, json={"code": "ErrorCode", "message": "error message"})
responses.add(responses.PUT, f"{config['deployment_url']}/api/streaming_import/clear_tables", status=400, json={"code": "ErrorCode", "message": "error message"})


@responses.activate
def test_bad_write(config: ConvexConfig, configured_catalog: ConfiguredAirbyteCatalog):
setup_bad_response(config)
client = ConvexClient(config, {})
with pytest.raises(Exception) as e:
client.replace_tables({})
assert "/api/streaming_import/replace_tables failed with: 400: {'code': 'ErrorCode', 'message': 'error message'}" in str(e.value)
client.delete([])
assert "/api/streaming_import/clear_tables failed with: 400: {'code': 'ErrorCode', 'message': 'error message'}" in str(e.value)


@responses.activate
Expand Down
7 changes: 4 additions & 3 deletions docs/integrations/destinations/convex.md
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ On the [Convex dashboard](https://dashboard.convex.dev/), navigate to the projec

## Changelog

| Version | Date | Pull Request | Subject |
| :------ | :--------- | :------------------------------------------------------- | :------------------------- |
| 0.1.0 | 2023-01-05 | [21287](https://github.com/airbytehq/airbyte/pull/21287) | πŸŽ‰ New Destination: Convex |
| Version | Date | Pull Request | Subject |
| :------ | :--------- | :------------------------------------------------------- | :---------------------------------------------------------------- |
| 0.2.0 | 2023-05-15 | [26103](https://github.com/airbytehq/airbyte/pull/26103) | πŸ› Update Convex destination connector to fix overwrite sync mode |
| 0.1.0 | 2023-01-05 | [21287](https://github.com/airbytehq/airbyte/pull/21287) | πŸŽ‰ New Destination: Convex |

0 comments on commit 360e2f9

Please sign in to comment.