Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Amplitude source has errors- " Failed to apply RFC3339 pattern on .." for "Events" stream with "Incremental | Append" sync mode #13057

Closed
mohitreddy1996 opened this issue May 20, 2022 · 7 comments · Fixed by #13074
Assignees
Labels
autoteam community team/connectors-python team/tse Technical Support Engineers type/bug Something isn't working

Comments

@mohitreddy1996
Copy link
Contributor

Environment

  • Airbyte version: 0.38.4-alpha
  • OS Version / Instance: Ubuntu 20.04 docker image
  • Deployment: EKS, K8S stable deployment
  • Source Connector and version: Amplitude, BETA
  • Destination Connector and version: Not relevant (see this issue for any destination), using kafka
  • Severity: High
  • Step where error happened: Setup new connection

Current Behavior

Ingesting data from Amplitude has logs with entries:

2022-05-20 14:24:26 replication-orchestrator > Failed to apply RFC3339 pattern on 2022-05-16 19:13:09.369000
2022-05-20 14:24:26 replication-orchestrator > Failed to apply RFC3339 pattern on 2022-05-18 02:18:54.351000
2022-05-20 14:24:26 replication-orchestrator > Failed to apply RFC3339 pattern on 2022-05-18 02:18:54.349000
2022-05-20 14:24:26 replication-orchestrator > Failed to apply RFC3339 pattern on 2022-05-18 02:18:57.113881
2022-05-20 14:24:26 replication-orchestrator > Failed to apply RFC3339 pattern on 2022-05-18 02:18:54.349000
2022-05-20 14:24:26 replication-orchestrator > Failed to apply RFC3339 pattern on 2022-05-18 02:18:54.349000
2022-05-20 14:24:26 replication-orchestrator > Failed to apply RFC3339 pattern on 2022-05-18 02:18:54.349000

I do see the data getting exported:

{
  "_airbyte_ab_id": "xxxx",
  "_airbyte_stream": "events",
  "_airbyte_emitted_at": 1652788324908,
  "_airbyte_data": {
    "$insert_id": "xxxx",
    "$insert_key": "xxxx",
    "$schema": xxxx,
    "adid": null,
    "amplitude_attribution_ids": null,
    "amplitude_event_type": null,
    "amplitude_id": xxxx,
    "app": xxxx,
    "city": null,
    "client_event_time": "2022-05-16 19:13:09.369000",
    "client_upload_time": "2022-05-16 19:13:09.369000",
    "country": "United States",
    "data": {
      "group_first_event": {

      },
      "group_ids": {

      }
    },
    "data_type": "event",
    "device_brand": null,
    "device_carrier": null,
    "device_family": null,
    "device_id": "xxxxx",
    "device_manufacturer": null,
    "device_model": null,
    "device_type": null,
    "dma": null,
    "event_id": 929601028,
    "event_properties": {

    },
    "event_time": "2022-05-16 19:13:09.369000",
    "event_type": "watch_tutorial",
    "global_user_properties": {

    },
    "group_properties": {

    },
    "groups": {

    },
    "idfa": null,
    "ip_address": "127.0.0.1",
    "is_attribution_event": false,
    "language": null,
    "library": "http/2.0",
    "location_lat": null,
    "location_lng": null,
    "os_name": null,
    "os_version": null,
    "partner_id": null,
    "paying": null,
    "plan": {

    },
    "platform": null,
    "processed_time": "2022-05-16 19:13:12.299914",
    "region": null,
    "sample_rate": null,
    "server_received_time": "2022-05-16 19:13:09.369000",
    "server_upload_time": "2022-05-16 19:13:09.375000",
    "session_id": -1,
    "start_version": null,
    "user_creation_time": "2022-05-16 19:13:09.369000",
    "user_id": "john_doe@gmail.com",
    "user_properties": {
      "Cohort": "Test A"
    },
    "uuid": "3aa93d06-d54c-11ec-9cec-b7b7bb95f4cc",
    "version_name": null
  }
}

Setting up the connection with sync mode as Incremental | Append does not seem to work, we do see duplicated entries in the destination.

Could it be because event_time which is the cursor for Amplitude Events stream is not RFC3339 pattern but "UTC ISO-8601 formatted timestamp" - https://developers.amplitude.com/docs/export-api

Expected Behavior

  • No errors in the logs
  • Incremental | Append should not have duplicated entries given event_time has fine-enough granularity that syncs which are few hours apart should not have duplicated entries

Logs

Output pasted above

Steps to Reproduce

  1. Setup Amplitude source
  2. Setup any destination (Kafka or S3 preferable - I tested on both)
  3. Replicate "events" stream with "Incremental | Append" sync mode

Are you willing to submit a PR?

I can take a look into the error, but I am not sure where the error is surfacing from right now. With some guidance, I am more than happy to contribute :)

@bazarnov
Copy link
Collaborator

@mohitreddy1996

Setting up the connection with sync mode as Incremental | Append does not seem to work, we do see duplicated entries in the destination.

Since you're using the Kafka or S3 destination, there is no support of append_dedup sync-mode in both of the destinations. To have it work correctly, you need to use other destinations support append_dedup sync mode.

About the data is being exposed:
Where exactly do you see this?

Could it be because event_time which is the cursor for Amplitude Events stream is not RFC3339 pattern but "UTC ISO-8601 formatted timestamp" - https://developers.amplitude.com/docs/export-api

This could be related, haven't faced this issue yet.

Could you please share the complete sync log for examination?

Thanks.

@mohitreddy1996
Copy link
Contributor Author

@bazarnov

Since you're using the Kafka or S3 destination, there is no support of append_dedup sync-mode in both of the destinations. To have it work correctly, you need to use other destinations support append_dedup sync mode.

That makes sense for that mode. But correct me if I am wrong, in the incremental sync, append mode - only the new or updated records are appended based on cursor field? With Amplitude as the source, this is event_time. So b/w two syncs, only the records with event_time >= the latest event_time observed previously would be synced? - this is my understanding from https://docs.airbyte.com/understanding-airbyte/connections/incremental-append, specifically from the example here

I believe in append_dedup mode, an intermediate table needs to be created with primary key specification which is used to dedup at the destination? This is definitely not something we are looking for. We are specifically looking for appending new events from last sync which incremental | append could satisfy, provided our understanding aligns

About the data is being exposed:
Where exactly do you see this?

We see this in the kafka topic which was configured as the destination. We had configured a replication period of 5 minutes, so we saw duplicated records (which was not we expected given our understanding of cursor field). Later looking at the code [1], it seems that the connector tries to fetch data from last 6 hours on every run and the number of duplicated entries we saw made sense.

Could you please share the complete sync log for examination?

We have lost the logs since we were prototyping locally. I will post one I am able to reproduce this with kafka destination specifically.

@bazarnov
Copy link
Collaborator

bazarnov commented May 20, 2022

@mohitreddy1996

That makes sense for that mode. But correct me if I am wrong, in the incremental sync, append mode - only the new or updated records are appended based on cursor field? With Amplitude as the source, this is event_time. So b/w two syncs, only the records with event_time >= the latest event_time observed previously would be synced? - this is my understanding from https://docs.airbyte.com/understanding-airbyte/connections/incremental-append, specifically from the example here

Correct, the connector should return at least 1 record message, using Incremental Append mode. This is covered by https://docs.airbyte.com/understanding-airbyte/connections/incremental-append

I believe in append_dedup mode, an intermediate table needs to be created with primary key specification which is used to dedup at the destination? This is definitely not something we are looking for. We are specifically looking for appending new events from last sync which incremental | append could satisfy, provided our understanding aligns

In this case, you should consider to remove the duplicates using any other way available to you. My local suggestion here is to use Postgres DB on Remote cluster or on premise.

We see this in the kafka topic which was configured as the destination. We had configured a replication period of 5 minutes, so we saw duplicated records (which was not we expected given our understanding of cursor field). Later looking at the code [1], it seems that the connector tries to fetch data from last 6 hours on every run and the number of duplicated entries we saw made sense.

Exactly, I'm not aware of how the Kafka connector works at this very moment, but i think there is no way to avoid duplicate records, because of missing append_dedup, which I believe almost impossible to implement. If you plan to use Kafka as a destination, please make sure the description of the issue reflects this, our dedicated Java Team will take a look what we can come up with.

I'll investigate the Failed to apply RFC3339 pattern issue, and / if needed, will make a PR to fix it.

@mohitreddy1996
Copy link
Contributor Author

mohitreddy1996 commented May 20, 2022

@bazarnov

I'll investigate the Failed to apply RFC3339 pattern issue, and / if needed, will make a PR to fix it.

Awesome! thank you! Please do keep this thread posted as and when it happens.

Coming back to

Correct, the connector should return at least 1 record message, using Incremental Append mode. This is covered by https://docs.airbyte.com/understanding-airbyte/connections/incremental-append

I think I am still failing to understand the "Incremental Append" mode and fetching only the "updated" records from last sync.

Consider this example:
As part of the first sync (since the connector fetches data for the past 6 hours, assume that the data is returned from 14:00 - 20:00), amplitude returns:

[{
....
event_time: "XXXX-XX-XX 14:01:00"
},
{
....
event_time: "XXXX-XX-XX 14:05:00"
},
{
....
event_time: "XXXX-XX-XX 15:01:00"
},
....
{
....
event_time: "XXXX-XX-XX 19:01:00"
},
{
....
event_time: "XXXX-XX-XX 19:55:00"
},
]

since this is first sync, this is essentially a refresh, so all the data is logged in the destination (does not matter which destination).

In the next sync, say that gets kicked off at 21:00, the data connector will try to fetch is from 15:00 - 21:00, say we get:

[{
....
event_time: "XXXX-XX-XX 15:01:00"
},
....
{
....
event_time: "XXXX-XX-XX 19:01:00"
},
{
....
event_time: "XXXX-XX-XX 19:55:00"
},

// We have seen the above results

{
....
event_time: "XXXX-XX-XX 20:01:00"
},
{
....
event_time: "XXXX-XX-XX 20:05:00"
},
]

Given the definition of cursor field which is event_time here, this sync should only emit records with event_time: "XXXX-XX-XX 20:01:00" and "XXXX-XX-XX 20:05:00" - please correct me if I am wrong but it does not matter which destination is as it is the property of the connection and not the destination?

The behavior we see right now is that, all the records in this batch are emitted.

the problem is not about de-duplication, it's about redundant records being sent which by definition of cursor field should not happen.

@bazarnov
Copy link
Collaborator

@bazarnov

I'll investigate the Failed to apply RFC3339 pattern issue, and / if needed, will make a PR to fix it.

Awesome! thank you! Please do keep this thread posted as and when it happens.

Coming back to

Correct, the connector should return at least 1 record message, using Incremental Append mode. This is covered by https://docs.airbyte.com/understanding-airbyte/connections/incremental-append

I think I am still failing to understand the "Incremental Append" mode and fetching only the "updated" records from last sync.

Consider this example:

As part of the first sync (since the connector fetches data for the past 6 hours, assume that the data is returned from 14:00 - 20:00), amplitude returns:


[{

....

event_time: "XXXX-XX-XX 14:01:00"

},

{

....

event_time: "XXXX-XX-XX 14:05:00"

},

{

....

event_time: "XXXX-XX-XX 15:01:00"

},

....

{

....

event_time: "XXXX-XX-XX 19:01:00"

},

{

....

event_time: "XXXX-XX-XX 19:55:00"

},

]

since this is first sync, this is essentially a refresh, so all the data is logged in the destination (does not matter which destination).

In the next sync, say that gets kicked off at 21:00, the data connector will try to fetch is from 15:00 - 21:00, say we get:


[{

....

event_time: "XXXX-XX-XX 15:01:00"

},

....

{

....

event_time: "XXXX-XX-XX 19:01:00"

},

{

....

event_time: "XXXX-XX-XX 19:55:00"

},



// We have seen the above results



{

....

event_time: "XXXX-XX-XX 20:01:00"

},

{

....

event_time: "XXXX-XX-XX 20:05:00"

},

]

Given the definition of cursor field which is event_time here, this sync should only emit records with event_time: "XXXX-XX-XX 20:01:00" and "XXXX-XX-XX 20:05:00" - please correct me if I am wrong but it does not matter which destination is as it is the property of the connection and not the destination?

The behavior we see right now is that, all the records in this batch are emitted.

the problem is not about de-duplication, it's about redundant records being sent which by definition of cursor field should not happen.

Yes, you're right, there is the offset for 6 hours, from the cursor_field value, every time when sync starts. This causes this data overlay. Will check also, if we could call the api without this offset.

@bazarnov
Copy link
Collaborator

bazarnov commented May 21, 2022

@mohitreddy1996
Regards Failed to apply RFC3339 pattern on ...., this is not related to the Source Amplitude, but to Destination S3. I'll address the issue to our dev team for further investigation, along with the examples. Thanks for pointing to this.
Here is the follow up issue: #13075

Regards the time offset and duplicates:

The fix: #13074

Please be aware, after #13074 is merged, this issue will be closed automatically.
Hope this will help you to adopt Source Amplitude to your needs.

@mohitreddy1996
Copy link
Contributor Author

@bazarnov thank you so much! Will definitely try and let you know if things work for us :)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
autoteam community team/connectors-python team/tse Technical Support Engineers type/bug Something isn't working
Projects
Status: In review (internal)
Development

Successfully merging a pull request may close this issue.

4 participants