Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

can a schema refresh reset only the affected stream #752

Closed
fatchat opened this issue Jul 11, 2024 · 13 comments · Fixed by #835
Closed

can a schema refresh reset only the affected stream #752

fatchat opened this issue Jul 11, 2024 · 13 comments · Fixed by #835
Assignees

Comments

@fatchat
Copy link
Contributor

fatchat commented Jul 11, 2024

In airbytehelpers.update_connection_schema we are setting skipReset=True. We did this because resets and syncs need to be run through Prefect; we trigger them in airbyte_service.update_schema_change using trigger_reset_and_sync_workflow right after the schema changes are approved

The problem is that trigger_reset_and_sync_workflow applies to the entire connection and not to the individual streams whose schemas changed

Proposing the following solution: we run update_connection_schema with skipReset=False from a Prefect deployment

@fatchat fatchat self-assigned this Jul 29, 2024
@fatchat fatchat assigned Ishankoradia and unassigned fatchat Aug 9, 2024
@Ishankoradia
Copy link
Contributor

Ishankoradia commented Aug 9, 2024

So skipReset=False, will reset the affect streams right ? (I did an experiment on google sheets, when i check the box on airbyte UI saying Reset only affected streams- it sends skipReset as False to update connection api).

Why prefect and not celery ? because we want to show it as locked , is it & block them from clicking stuff which might be more work in celery ?

@fatchat
Copy link
Contributor Author

fatchat commented Aug 10, 2024

yes skipReset=False will reset only the affected streams

prefect because we want all airbyte tasks to run through prefect so that they don't run in parallel and overload the machine

@Ishankoradia
Copy link
Contributor

yes skipReset=False will reset only the affected streams

prefect because we want all airbyte tasks to run through prefect so that they don't run in parallel and overload the machine

I see, got it. Okay i will take this up.
This also means , Prefect needs to poll on this reset connection task that runs as soon as we update connection with skipReset=False.

@fatchat
Copy link
Contributor Author

fatchat commented Aug 10, 2024

Yes. The entry point is when the user approves the changes in the dialog, and we currently show a spinner while updating the connection. Once updated the dialog should close and the spinner should be showing on that connection in the list view

@Ishankoradia
Copy link
Contributor

Ishankoradia commented Aug 14, 2024

The schema change prefect deployment will need to do the following

  • Uupdate the connection with new cataglog with skipReset=True
  • Fetch the affected streams from the catalagDiff. So basically all the deployment needs is a connection_id and the catalogDiff that we want the connection to update with. (In future users can select which changes do they want)
  • Reset only the affected streams manually
  • Sync the connection again

@fatchat
Copy link
Contributor Author

fatchat commented Aug 14, 2024

but how do we reset affected streams manually? i don't think there's an api for that, which is why we need to use skipReset=False

@Ishankoradia
Copy link
Contributor

Ishankoradia commented Aug 14, 2024

but how do we reset affected streams manually? i don't think there's an api for that, which is why we need to use skipReset=False

There is an api for this. The way we trigger (or prefect triggers) sync , in the same way we can trigger reset and also pass the streams we want to reset.

This is that api /v1/connections/reset/stream ; you can pass streams that you want to specifically reset

@fatchat
Copy link
Contributor Author

fatchat commented Aug 15, 2024

ohhhhhhhhh

well then this gets a lot easier

can we just pass the specific streams to trigger_reset_and_sync_workflow

@Ishankoradia
Copy link
Contributor

Yes we can, so basically we update the reset connection flow in proxy to take a list of streams and then pass it in the big config dictionary as param.

Also we do sync + reset via the reset deployment. I don't know why. Actually I am not seeing the point of a different reset deployment and a different sync deployment. Ideally we just want one deployment per connection to do any connection related stuff.

@fatchat
Copy link
Contributor Author

fatchat commented Aug 15, 2024

oh, because locking is by deployment? that's a good point, yes then we can have one (manual) deployment per connection and we pass parameters to tell it what to do

that's what you're saying?

@Ishankoradia
Copy link
Contributor

Ishankoradia commented Aug 15, 2024

Yes locking is by deployment (for a connection_id)

That's what I meant. And then when we can achieve the deployment concurrency (it will be out in a month, prefect team told), which will make sure for any connection only 1 job is running (sync, reset or sync+reset).

@Ishankoradia
Copy link
Contributor

Ishankoradia commented Aug 15, 2024

Yes locking is by deployment (for a connection_id)

That's what I meant. And then when we can achieve the deployment concurrency (it will be out in a month, prefect team told), which will make sure for any connection only 1 job is running (sync, reset or sync+reset).

I will make these changes also in this issue/PR itself.

@Ishankoradia
Copy link
Contributor

Ishankoradia commented Aug 16, 2024

Spec (also includes #815 ) @fatchat let me know what you think. We can discuss on discord.

  • Make sure the reset flow takes in the list of streams to reset
    • [Prefect-airbyte] create a new reset connection flow to take in a list of stream names.This api should be used /v1/connections/reset/stream If the list is empty, it runs a sync of the connection (weird)
      -> We will have two reset flows one which we already that does a full reset and the other that does the reset of certain streams. If the list of stream is empty we call the full reset while if the list of stream is non-empty we call the new reset stream flow. This logic can be handled in [Prefect proxy]
    • [Prefect-proxy] for the reset job in the deployment, assume we have a list of streams in that big config dictionary and pass that to the reset connection flow
    • [Prefect-proxy] If the list of stream is empty we call the full reset flow while if the list of stream is non-empty we call the new reset stream flow.
    • [HOLD] [DDP backend] we need a way to control what changes we propagate. I don’t see any apis that could let us send a partial catalodDiff and generate a new catalog to be saved or maybe just apply schema changes based on this partial catalogDiff. Found this api - /v1/sources/apply_schema_changes but I don’t understand what does it mean by “list of diff” because I don’t see it taking any catalogDiff in the docs. Maybe it means we apply the catalogDiff (partial) then generate the streams that need to the replaced using this api (not sure completely). If we don’t find an api (or if there isn’t one), we will need to create a custom function that takes this partial diff applies it to the original catalog to generate a new one ( we could take a look at airbyte’s source code to figure out the logic, it is here I think airbyte-commons-server/src/main/java/io/airbyte/commons/server/converters/CatalogDiffConverters.java)
    • [HOLD] [DDP backend] send a list of the streams to the reset deployment flow_run . List of streams should be based on the catalogDiff that the client/user approves. If the list is empty proxy will do a full reset.
    • [HOLD] [Webapp] the schema changes modal will now have a tick mark in front of each change that the user might approve. There should be an approve all.
    • [DDP backend] Since airbyte doesn't have the fine grain control over which changes to accept (opened a discussion here Is there a way to have fine grain control over the schema changes i approve via apis airbytehq/airbyte#44175 ). The way we will do it now now is once the user accepts and approve schema changes. We scheduled (maybe coming Sunday or any configurable schedule) a deployment run that does the following
      - Fetch the catalogDiff to compute the affected streams
      - Update the connection with skipReset=True
      - Reset the affected streams
      - Sync the connection
      Note: If the deployment is scheduled coming Sunday, the catalogDiff maybe different from the one they accepted previously.
    • [Prefect-proxy] Create a flow that does the jobs above
    • Test this much
  • Refactor to have a single manual deployment for a connection. The idea is since manual deployments don’t have a schedule and they run on demand. We can always use the config to run it a particular job related to connection (sync, reset or sync + reset).
    • [DDP backend] currently manual syncs occur via “v1/flows/{deployment_id}/flow_run/“ , change this to run via a new api “/v1/connections/{connection_id}/sync”
    • [DDP backend] Similarly for manual reset will have a new api “/v1/connections/{connection_id}/reset”
    • [Webapp] Update the client to call the new reset and sync apis
    • [DDP backend] Remove/refactor all references to reset_dataflow
    • Test and make sure the locks etc work as before.
  • Cleanup for the refactor of having single manual deployment for a connection. This should go once the above changes are merged and running fine with no errors
    • [DDP backend] Write a cleanup script that removes all the reset deployment
    • [DDP backend] Migration to drop the column reset_conn_dataflow_id in OrgDataFlowv1

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
Status: Done
Development

Successfully merging a pull request may close this issue.

2 participants