Skip to content

Commit

Permalink
Connector builder: Incremental sync documentation (airbytehq#25238)
Browse files Browse the repository at this point in the history
* authentication documentation

* some fixes

* fix broken link

* remove session token auth

* review comments

* first version of tutorial

* Revert "first version of tutorial"

This reverts commit bbdeb99.

* review comments

* partitioning documentation

* incremental sync

* improvements

* review comments
  • Loading branch information
Joe Reuter authored and btkcodedev committed Apr 26, 2023
1 parent 98ba192 commit e871381
Show file tree
Hide file tree
Showing 2 changed files with 134 additions and 1 deletion.
133 changes: 133 additions & 0 deletions docs/connector-development/connector-builder-ui/incremental-sync.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,133 @@
# Incremental sync

An incremental sync is a sync which pulls only the data that has changed since the previous sync (as opposed to all the data available in the data source).

This is especially important if there are a large number of records to sync and/or the API has tight request limits which makes a full sync of all records on a regular schedule too expensive or too slow.

Incremental syncs are usually implemented using a cursor value (like a timestamp) that delineates which data was pulled and which data is new. A very common cursor value is an `updated_at` timestamp. This cursor means that records whose `updated_at` value is less than or equal than that cursor value have been synced already, and that the next sync should only export records whose `updated_at` value is greater than the cursor value.

To use incremental syncs, the API endpoint needs to fullfil the following requirements:
* Records contain a date/time field that defines when this record was last updated (the "cursor field")
* It's possible to filter/request records by the cursor field

The knowledge of a cursor value also allows the Airbyte system to automatically keep a history of changes to records in the destination. To learn more about how different modes of incremental syncs, check out the [Incremental Sync - Append](/understanding-airbyte/connections/incremental-append/) and [Incremental Sync - Deduped History](/understanding-airbyte/connections/incremental-deduped-history) pages.

## Configuration

To configure incremental syncs for a stream in the connector builder, you have to specify how the records will represent the **"last changed" / "updated at" timestamp**, the **initial time range** to fetch records for and **how to request records from a certain time range**.

In the builder UI, these things are specified like this:
* The "Cursor field" is the property in the record that defines the date and time when the record got changed. It's used to decide which records are synced already and which records are "new"
* The "Datetime format" specifies the [format](https://docs.python.org/3/library/datetime.html#strftime-and-strptime-format-codes) the cursor field is using to specify date and time,
* The "Cursor granularity" is the smallest time unit supported by the API to specify the time range to request records for expressed as [ISO 8601 duration](https://en.wikipedia.org/wiki/ISO_8601#Durations)
* The "Start datetime" is the initial start date of the time range to fetch records for. When doing incremental syncs, the second sync will overwrite this date with the last record that got synced so far.
* The "End datetime" is the end date of the time range to fetch records for. In most cases it's set to the current date and time when the sync is started to sync all changes that happened so far.
* The "Inject start/end time into outgoing HTTP request" defines how to request records that got changed in the time range to sync. In most cases the start and end time is added as a request parameter or body parameter

## Example

The [API of The Guardian](https://open-platform.theguardian.com/documentation/search) has a `/search` endpoint that allows to extract a list of articles.

The `/search` endpoint has a `from-date` and a `to-date` query parameter which can be used to only request data for a certain time range.

Content records have the following form:
```
{
"id": "world/2022/oct/21/russia-ukraine-war-latest-what-we-know-on-day-240-of-the-invasion",
"type": "article",
"sectionId": "world",
"sectionName": "World news",
"webPublicationDate": "2022-10-21T14:06:14Z",
"webTitle": "Russia-Ukraine war latest: what we know on day 240 of the invasion",
// ...
}
```

As this fulfills the requirements for incremental syncs, we can configure the "Incremental sync" section in the following way:
* "Cursor field" is set to `webPublicationDate`
* "Datetime format" is set to `%Y-%m-%dT%H:%M:%SZ`
* "Cursor granularity is set to `PT1S` as this API can handle date/time values on the second level
* "Start datetime" is set to "user input" to allow the user of the connector configuring a Source to specify the time to start syncing
* "End datetime" is set to "now" to fetch all articles up to the current date
* "Inject start time into outgoing HTTP request" is set to `request_parameter` with "Field" set to `from-date`
* "Inject end time into outgoing HTTP request" is set to `request_parameter` with "Field" set to `to-date`

This API orders records by default from new to old, which is not optimal for a reliable sync as the last encountered cursor value will be the most recent date even if some older records did not get synced (for example if a sync fails halfway through). It's better to start with the oldest records and work your way up to make sure that all older records are synced already once a certain date is encountered on a record. In this case the API can be configured to behave like this by setting an additional parameter:
* At the bottom of the stream configuration page, add a new "Request parameter"
* Set the key to `order-by`
* Set the value to `oldest`

Setting the start date in the "Testing values" to a date in the past like **2023-04-09T00:00:00Z** results in the following request:
<pre>
curl 'https://content.guardianapis.com/search?order-by=oldest&from-date=<b>2023-04-09T00:00:00Z</b>&to-date={`now`}'
</pre>

The last encountered date will be saved as part of the connection - when the next sync is running, it picks up from the last record. Let's assume the last ecountered article looked like this:
<pre>
{`{
"id": "business/live/2023/apr/15/uk-bosses-more-optimistic-energy-prices-fall-ai-spending-boom-economics-business-live",
"type": "liveblog",
"sectionId": "business",
"sectionName": "Business",
"webPublicationDate": `}<b>"2023-04-15T07:30:58Z"</b>{`,
}`}
</pre>

Then when a sync is triggered for the same connection the next day, the following request is made:
<pre>
curl 'https://content.guardianapis.com/search?order-by=oldest&from-date=<b>2023-04-15T07:30:58Z</b>&to-date={`<now>`}'
</pre>

The `from-date` is set to the cutoff date of articles synced already and the `to-date` is set to the current date.

:::info
In some cases, it's helpful to reference the start and end date of the interval that's currently synced, for example if it needs to be injected into the URL path of the current stream. In these cases it can be referenced using the `{{ stream_interval.start_date }}` and `{{ stream_interval.end_date }}` placeholders. Check out [the tutorial](./tutorial.mdx#adding-incremental-reads) for such a case.
:::

## Advanced settings

The description above is sufficient for a lot of APIs. However there are some more subtle configurations which sometimes become relevant.

### Step

When incremental syncs are enabled, the connector is not fetching all records since the cutoff date at once - instead it's splitting up the time range between the cutoff date and the desired end date into intervals based on the "Step" configuration (by default it's set to one month) expressed as [ISO 8601 duration](https://en.wikipedia.org/wiki/ISO_8601#Durations).

For example if the "Step" is set to 10 days (`P10D`) for the Guardian articles stream described above and a longer time range, then the following requests will be performed:
<pre>
curl 'https://content.guardianapis.com/search?order-by=oldest&from-date=<b>2023-01-01T00:00:00Z</b>&to-date=<b>2023-01-10T00:00:00Z</b>'{`\n`}
curl 'https://content.guardianapis.com/search?order-by=oldest&from-date=<b>2023-01-10T00:00:00Z</b>&to-date=<b>2023-01-20T00:00:00Z</b>'{`\n`}
curl 'https://content.guardianapis.com/search?order-by=oldest&from-date=<b>2023-01-20T00:00:00Z</b>&to-date=<b>2023-01-30T00:00:00Z</b>'{`\n`}
...
</pre>

After an interval is processed, the cursor value of the last record will be saved as part of the connection as the new cutoff date.

In most cases, the default step size is fine, but there are two reasons to change it:
* **To protect a connection against intermittent failures** - if the "Step" size is a day, the cutoff date is saved after all records associated with a day are proccessed. If a sync fails halfway through because the API, the Airbyte system, the destination or the network between these components has a failure, then at most one day worth of data needs to be resynced. However, a smaller step size might cause more requests to the API and more load on the system. It depends on the expected amount of data and load characteristics of an API what step size is optimal, but for a lot of applications the default of one month is a good starting point.
* **The API requires the connector to fetch data in pre-specified chunks** - for example the [Exchange Rates API](https://exchangeratesapi.io/documentation/) makes the date to fetch data for part of the URL path and only allows to fetch data for a single day at a time

### Lookback window

The "Lookback window" specifies a duration that is subtracted from the last cutoff date before starting to sync.

Some APIs update records over time but do not allow to filter or search by modification date, only by creation date. For example the API of The Guardian might change the title of an article after it got published, but the `webPublicationDate` still shows the original date the article got published initially.

In these cases, there are two options:
* **Do not use incremental sync** and always sync the full set of records to always have a consistent state, losing the advantages of reduced load and [automatic history keeping in the destination](/understanding-airbyte/connections/incremental-deduped-history)
* **Configure the "Lookback window"** to not only sync exclusively new records, but resync some portion of records before the cutoff date to catch changes that were made to existing records, trading off data consistency and the amount of synced records. In the case of the API of The Guardian, news articles tend to only be updated for a few days after the initial release date, so this strategy should be able to catch most updates without having to resync all articles.

Reiterating the example from above with a "Lookback window" of 2 days configured, let's assume the last encountered article looked like this:
<pre>
{`{
"id": "business/live/2023/apr/15/uk-bosses-more-optimistic-energy-prices-fall-ai-spending-boom-economics-business-live",
"type": "liveblog",
"sectionId": "business",
"sectionName": "Business",
"webPublicationDate": `}<b>{`"2023-04-15T07:30:58Z"`}</b>{`,
}`}
</pre>

Then when a sync is triggered for the same connection the next day, the following request is made:
<pre>
curl 'https://content.guardianapis.com/search?order-by=oldest&from-date=<b>2023-04-13T07:30:58Z</b>&to-date={`<now>`}'
</pre>
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,7 @@ In this section, we'll update the source to read historical data instead of only
According to the API documentation, we can read the exchange rate for a specific date range by querying the `"/exchangerates_data/{date}"` endpoint instead of `"/exchangerates_data/latest"`.

To configure your connector to request every day individually, follow these steps:
* On top of the form, change the "Path URL" input to `/exchangerates_data/{{ stream_slice.start_time }}` to inject the date to fetch data for into the path of the request
* On top of the form, change the "Path URL" input to `/exchangerates_data/{{ stream_interval.start_time }}` to inject the date to fetch data for into the path of the request
* Enable "Incremental sync" for the Rates stream
* Set the "Cursor field" to `date` - this is the property in our records to check what date got synced last
* Set the "Datetime format" to `%Y-%m-%d` to match the format of the date in the record returned from the API
Expand Down

0 comments on commit e871381

Please sign in to comment.