Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Table sink doesn't support consuming update and delete changes #6

Closed
gunnarmorling opened this issue Mar 24, 2023 · 3 comments · Fixed by #10
Closed

Table sink doesn't support consuming update and delete changes #6

gunnarmorling opened this issue Mar 24, 2023 · 3 comments · Fixed by #10

Comments

@gunnarmorling
Copy link

Query in dbt:

select o.order_id, o.price, p.name, p.category, s.*
from gunnar_orders o
  inner join gunnar_products p on o.product_id = p.product_id
  left join gunnar_shipments s on s.order_id = o.order_id

Failure:

15:58:29  Decodable: InvalidRequest: {'timestamp': '2023-03-24T15:58:29.641+00:00', 'status': 400, 'error': 'Bad Request', 'message': "Invalid pipeline. Reason: Table sink 'default_catalog.default_database.gunnar_orders_joined' doesn't support consuming update and delete changes which is produced by node Join(joinType=[LeftOuterJoin], where=[(order_id0 = order_id)], select=[order_id, price, name, category, shipment_id, order_id0, origin, destination, is_arrived], leftInputSpec=[NoUniqueKey], rightInputSpec=[HasUniqueKey])", 'path': '/v1alpha2/pipelines'}
@gunnarmorling
Copy link
Author

@rmetzger Another pipeline which couldn't be created via dbt.

@nicoweidner
Copy link
Contributor

nicoweidner commented Mar 27, 2023

An issue related to this and #5:
If the target stream is created manually as a change stream, then dbt-decodable deletes the change stream, creates a new append stream and then throws this error.

edit: I suspect this code block is repsonsible for this behavior:

{% if existing_relation is not none %}
{% set watermark = config.get('watermark') %}
{% if adapter.has_changed(sql, target_relation, watermark) or should_full_refresh() %}
{{ adapter.drop_relation(existing_relation) }}
{% do should_create.update({'value': true}) %}
{% endif %}

edit2: This issue is also discussed here: #10 (comment)

@nicoweidner
Copy link
Contributor

@gunnarmorling This should be fixed by #10 as well. Note that you will need to designate a primary key for the target stream in order to create a change stream, using a line like this at the top of the sql file:

{{ config(primary_key="order_id") }}

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
2 participants