Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Source Zuora: fix decimals in stringified datetimes #24460

Closed
wants to merge 14 commits into from

Conversation

frans-k
Copy link

@frans-k frans-k commented Mar 24, 2023

What

It outputs timestamps in a format that Zuora can cast to TIMESTAMP.

Today Zuora accepts 6 decimals for fractions of a second when casting to TIMESTAMP, but if that's in a WHERE filter on for example updateddate, that filter is ignored for input rows.

So if the table has > 10m rows, it hits the Data Query 10m input row limit. Haven't tested if it affects output rows.

How

Changed from 6 decimals to 3 for second fractions. Instead of strftime it uses Pendulum's format.

Recommended reading order

  1. x.java
  2. y.python

🚨 User Impact 🚨

Are there any breaking changes? What is the end result perceived by the user? If yes, please merge this PR with the 🚨🚨 emoji so changelog authors can further highlight this if needed.

Pre-merge Checklist

Expand the relevant checklist and delete the others.

New Connector

Community member or Airbyter

  • Community member? Grant edit access to maintainers (instructions)
  • Secrets in the connector's spec are annotated with airbyte_secret
  • Unit & integration tests added and passing. Community members, please provide proof of success locally e.g: screenshot or copy-paste unit, integration, and acceptance test output. To run acceptance tests for a Python connector, follow instructions in the README. For java connectors run ./gradlew :airbyte-integrations:connectors:<name>:integrationTest.
  • Code reviews completed
  • Documentation updated
    • Connector's README.md
    • Connector's bootstrap.md. See description and examples
    • docs/integrations/<source or destination>/<name>.md including changelog. See changelog example
    • docs/integrations/README.md
    • airbyte-integrations/builds.md
  • PR name follows PR naming conventions

Airbyter

If this is a community PR, the Airbyte engineer reviewing this PR is responsible for the below items.

  • Create a non-forked branch based on this PR and test the below items on it
  • Build is successful
  • If new credentials are required for use in CI, add them to GSM. Instructions.
  • /test connector=connectors/<name> command is passing
  • New Connector version released on Dockerhub by running the /publish command described here
  • After the connector is published, connector added to connector index as described here
  • Seed specs have been re-generated by building the platform and committing the changes to the seed spec files, as described here
Updating a connector

Community member or Airbyter

  • Grant edit access to maintainers (instructions)
  • Secrets in the connector's spec are annotated with airbyte_secret
  • Unit & integration tests added and passing. Community members, please provide proof of success locally e.g: screenshot or copy-paste unit, integration, and acceptance test output. To run acceptance tests for a Python connector, follow instructions in the README. For java connectors run ./gradlew :airbyte-integrations:connectors:<name>:integrationTest.
  • Code reviews completed
  • Documentation updated
    • Connector's README.md
    • Connector's bootstrap.md. See description and examples
    • Changelog updated in docs/integrations/<source or destination>/<name>.md including changelog. See changelog example
  • PR name follows PR naming conventions

Airbyter

If this is a community PR, the Airbyte engineer reviewing this PR is responsible for the below items.

  • Create a non-forked branch based on this PR and test the below items on it
  • Build is successful
  • If new credentials are required for use in CI, add them to GSM. Instructions.
  • /test connector=connectors/<name> command is passing
  • New Connector version released on Dockerhub and connector version bumped by running the /publish command described here
Connector Generator
  • Issue acceptance criteria met
  • PR name follows PR naming conventions
  • If adding a new generator, add it to the list of scaffold modules being tested
  • The generator test modules (all connectors with -scaffold in their name) have been updated with the latest scaffold by running ./gradlew :airbyte-integrations:connector-templates:generator:testScaffoldTemplates then checking in your changes
  • Documentation which references the generator is updated as needed

@CLAassistant
Copy link

CLAassistant commented Mar 24, 2023

CLA assistant check
All committers have signed the CLA.

@frans-k frans-k changed the title Source Zoura: fix decimals in stringified datetimes Source Zuora: fix decimals in stringified datetimes Mar 27, 2023
@sh4sh
Copy link
Contributor

sh4sh commented Apr 18, 2023

/test connector=connectors/source-zuora

🕑 connectors/source-zuora https://github.com/airbytehq/airbyte/actions/runs/4734069600
❌ connectors/source-zuora https://github.com/airbytehq/airbyte/actions/runs/4734069600
🐛

Build Failed

Test summary info:

Could not find result summary

@sh4sh
Copy link
Contributor

sh4sh commented Apr 18, 2023

The tests came up with this error:

BUILD FAILED in 22s
> Task :airbyte-integrations:connectors:source-zuora:flakeCheck FAILED
[python] .venv/bin/python -m pflake8 --config /actions-runner/_work/airbyte/airbyte/pyproject.toml ./
	 ./unit_tests/unit_test.py:6:1: F401 'datetime.datetime' imported but unused

@frans-k before I remove datetime.datetime I want to double check with you since it was added in this PR, is it supposed to be in use somewhere? if not, removing it should get us past the error

@sh4sh
Copy link
Contributor

sh4sh commented Apr 19, 2023

/test connector=connectors/source-zuora

🕑 connectors/source-zuora https://github.com/airbytehq/airbyte/actions/runs/4744494361
❌ connectors/source-zuora https://github.com/airbytehq/airbyte/actions/runs/4744494361
🐛 https://gradle.com/s/t67ex5uarmlbq

Build Failed

Test summary info:

=========================== short test summary info ============================
FAILED test_core.py::TestConnection::test_check[inputs0] - AssertionError: as...
FAILED test_core.py::TestDiscovery::test_discover[inputs0] - docker.errors.Co...
ERROR test_core.py::TestDiscovery::test_defined_cursors_exist_in_schema[inputs0]
ERROR test_core.py::TestDiscovery::test_defined_refs_exist_in_schema[inputs0]
ERROR test_core.py::TestDiscovery::test_defined_keyword_exist_in_schema[inputs0-allOf]
ERROR test_core.py::TestDiscovery::test_defined_keyword_exist_in_schema[inputs0-not]
ERROR test_core.py::TestDiscovery::test_primary_keys_exist_in_schema[inputs0]
ERROR test_core.py::TestDiscovery::test_streams_has_sync_modes[inputs0] - doc...
ERROR test_core.py::TestDiscovery::test_additional_properties_is_true[inputs0]
ERROR test_core.py::TestDiscovery::test_backward_compatibility[inputs0] - doc...
ERROR test_core.py::TestBasicRead::test_read[inputs0] - docker.errors.Contain...
ERROR test_full_refresh.py::TestFullRefresh::test_sequential_reads[inputs0]
ERROR test_incremental.py::TestIncremental::test_two_sequential_reads[inputs0]
ERROR test_incremental.py::TestIncremental::test_read_sequential_slices[inputs0]
ERROR test_incremental.py::TestIncremental::test_state_with_abnormally_large_values[inputs0]
============ 2 failed, 25 passed, 40 warnings, 13 errors in 36.32s =============

@sh4sh
Copy link
Contributor

sh4sh commented Apr 19, 2023

It looks like the Zuora integration test credentials need to be refreshed. I've submitted a request to get it investigated.

@frans-k
Copy link
Author

frans-k commented Apr 24, 2023

Just let me know if there's anything I can do, @sh4sh.

@jetvp
Copy link

jetvp commented Apr 26, 2023

Hi @frans-k and @sh4sh,

The "TypeError: 'NoneType' object does not support item assignment" error is caused by a bug in zuora_auth.py not an authentication failure (code is trying to edit a null object to remove a field, instead of building the body and removing the items).

The following version of zuora_auth.py should fix the error:

#
# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
#


from typing import Any, Dict, Mapping

from airbyte_cdk.sources.streams.http.requests_native_auth.oauth import Oauth2Authenticator

from .zuora_endpoint import get_url_base


class OAuth(Oauth2Authenticator):
    def __init__(self, **kwargs):
        super().__init__(**kwargs)
    
    def build_refresh_request_body(self) -> Mapping[str, Any]:
        payload = super().build_refresh_request_body()
        payload.pop("refresh_token")  # Zuora doesn't have Refresh Token parameter
        return payload

class ZuoraAuthenticator:
    def __init__(self, config: Dict):
        self.config = config

    @property
    def url_base(self) -> str:
        return get_url_base(self.config["tenant_endpoint"])

    def get_auth(self) -> OAuth:
        return OAuth(
            token_refresh_endpoint=f"{self.url_base}/oauth/token",
            client_id=self.config["client_id"],
            client_secret=self.config["client_secret"],
            refresh_token='refresh_token',
            grant_type='client_credentials'
        )

@frans-k
Copy link
Author

frans-k commented Apr 27, 2023

Good call @jetvp, fixed the OAuth issue.

Now I'm getting a new issue though. Zuora gets spammed with hundreds of DESCRIBE queries for the single stream I synced. Zuora only keeps a history of 100 queries, so I don't know how many it's done nor how many it could run if I let it.

Posting a short excerpt from the logs, I can't see anything that stands out however.

2023-04-27 14:22:57 INFO i.a.w.i.s.SyncPersistenceImpl(startBackgroundFlushStateTask):168 - starting state flush thread for connectionId 8eb62d62-5ebc-49f1-bb9a-f0ff2df01994
2023-04-27 14:22:57 INFO i.a.w.g.DefaultReplicationWorker(lambda$readFromDstRunnable$4):317 - State in DefaultReplicationWorker from destination: io.airbyte.protocol.models.AirbyteMessage@366e837a[type=STATE,log=<null>,spec=<null>,connectionStatus=<null>,catalog=<null>,record=<null>,state=io.airbyte.protocol.models.AirbyteStateMessage@37b5274b[type=STREAM,stream=io.airbyte.protocol.models.AirbyteStreamState@325523d9[streamDescriptor=io.airbyte.protocol.models.StreamDescriptor@6f5d2204[name=revenueeventitem,namespace=<null>,additionalProperties={}],streamState={"updateddate":"2023-02-01T02:01:29+01:00"},additionalProperties={}],global=<null>,data={"revenueeventitem":{"updateddate":"2023-02-01T02:01:29+01:00"}},additionalProperties={}],trace=<null>,control=<null>,additionalProperties={}]
2023-04-27 14:23:04 INFO i.a.w.g.DefaultReplicationWorker(lambda$readFromDstRunnable$4):317 - State in DefaultReplicationWorker from destination: io.airbyte.protocol.models.AirbyteMessage@2d6a09c6[type=STATE,log=<null>,spec=<null>,connectionStatus=<null>,catalog=<null>,record=<null>,state=io.airbyte.protocol.models.AirbyteStateMessage@33fbfc5f[type=STREAM,stream=io.airbyte.protocol.models.AirbyteStreamState@28ab5a12[streamDescriptor=io.airbyte.protocol.models.StreamDescriptor@2e601001[name=revenueeventitem,namespace=<null>,additionalProperties={}],streamState={"updateddate":"2023-02-01T02:01:29+01:00"},additionalProperties={}],global=<null>,data={"revenueeventitem":{"updateddate":"2023-02-01T02:01:29+01:00"}},additionalProperties={}],trace=<null>,control=<null>,additionalProperties={}]
2023-04-27 14:23:04 WARN i.a.w.i.b.DefaultSyncStatsTracker(updateDestinationStateStats):195 - The message tracker encountered an issue that prevents committed record counts from being reliably computed. This only impacts metadata and does not indicate a problem with actual sync data.
io.airbyte.workers.internal.book_keeping.StateDeltaTracker$StateDeltaTrackerException: State hash -1215753501 was already committed, likely indicating a state hash collision
	at io.airbyte.workers.internal.book_keeping.StateDeltaTracker.commitStateHash(StateDeltaTracker.java:121) ~[io.airbyte-airbyte-commons-worker-0.43.1.jar:?]
	at io.airbyte.workers.internal.book_keeping.DefaultSyncStatsTracker.updateDestinationStateStats(DefaultSyncStatsTracker.java:192) ~[io.airbyte-airbyte-commons-worker-0.43.1.jar:?]
	at io.airbyte.workers.internal.sync_persistence.SyncPersistenceImpl.updateDestinationStateStats(SyncPersistenceImpl.java:426) ~[io.airbyte-airbyte-commons-worker-0.43.1.jar:?]
	at io.airbyte.workers.internal.book_keeping.AirbyteMessageTracker.handleDestinationEmittedState(AirbyteMessageTracker.java:137) ~[io.airbyte-airbyte-commons-worker-0.43.1.jar:?]
	at io.airbyte.workers.internal.book_keeping.AirbyteMessageTracker.acceptFromDestination(AirbyteMessageTracker.java:103) ~[io.airbyte-airbyte-commons-worker-0.43.1.jar:?]
	at io.airbyte.workers.general.DefaultReplicationWorker.lambda$readFromDstRunnable$4(DefaultReplicationWorker.java:319) ~[io.airbyte-airbyte-commons-worker-0.43.1.jar:?]
	at java.util.concurrent.CompletableFuture$AsyncRun.run(CompletableFuture.java:1804) ~[?:?]
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1144) ~[?:?]
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:642) ~[?:?]
	at java.lang.Thread.run(Thread.java:1589) ~[?:?]
2023-04-27 14:23:04 INFO i.a.w.g.DefaultReplicationWorker(lambda$readFromDstRunnable$4):317 - State in DefaultReplicationWorker from destination: io.airbyte.protocol.models.AirbyteMessage@53391832[type=STATE,log=<null>,spec=<null>,connectionStatus=<null>,catalog=<null>,record=<null>,state=io.airbyte.protocol.models.AirbyteStateMessage@3fcc1f0e[type=STREAM,stream=io.airbyte.protocol.models.AirbyteStreamState@4dc5a92f[streamDescriptor=io.airbyte.protocol.models.StreamDescriptor@638b588e[name=revenueeventitem,namespace=<null>,additionalProperties={}],streamState={"updateddate":"2023-02-01T02:01:30+01:00"},additionalProperties={}],global=<null>,data={"revenueeventitem":{"updateddate":"2023-02-01T02:01:30+01:00"}},additionalProperties={}],trace=<null>,control=<null>,additionalProperties={}]
2023-04-27 14:23:10 INFO i.a.w.g.DefaultReplicationWorker(lambda$readFromDstRunnable$4):317 - State in DefaultReplicationWorker from destination: io.airbyte.protocol.models.AirbyteMessage@78683445[type=STATE,log=<null>,spec=<null>,connectionStatus=<null>,catalog=<null>,record=<null>,state=io.airbyte.protocol.models.AirbyteStateMessage@27037029[type=STREAM,stream=io.airbyte.protocol.models.AirbyteStreamState@47ef3937[streamDescriptor=io.airbyte.protocol.models.StreamDescriptor@7e19466c[name=revenueeventitem,namespace=<null>,additionalProperties={}],streamState={"updateddate":"2023-02-01T02:01:33+01:00"},additionalProperties={}],global=<null>,data={"revenueeventitem":{"updateddate":"2023-02-01T02:01:33+01:00"}},additionalProperties={}],trace=<null>,control=<null>,additionalProperties={}]
2023-04-27 14:23:10 INFO i.a.w.g.DefaultReplicationWorker(lambda$readFromDstRunnable$4):317 - State in DefaultReplicationWorker from destination: io.airbyte.protocol.models.AirbyteMessage@476a9ff6[type=STATE,log=<null>,spec=<null>,connectionStatus=<null>,catalog=<null>,record=<null>,state=io.airbyte.protocol.models.AirbyteStateMessage@7b543d0a[type=STREAM,stream=io.airbyte.protocol.models.AirbyteStreamState@45cc95d6[streamDescriptor=io.airbyte.protocol.models.StreamDescriptor@71528125[name=revenueeventitem,namespace=<null>,additionalProperties={}],streamState={"updateddate":"2023-02-01T02:01:33+01:00"},additionalProperties={}],global=<null>,data={"revenueeventitem":{"updateddate":"2023-02-01T02:01:33+01:00"}},additionalProperties={}],trace=<null>,control=<null>,additionalProperties={}]
2023-04-27 14:23:18 INFO i.a.w.g.DefaultReplicationWorker(lambda$readFromDstRunnable$4):317 - State in DefaultReplicationWorker from destination: io.airbyte.protocol.models.AirbyteMessage@625435d6[type=STATE,log=<null>,spec=<null>,connectionStatus=<null>,catalog=<null>,record=<null>,state=io.airbyte.protocol.models.AirbyteStateMessage@3832f8fb[type=STREAM,stream=io.airbyte.protocol.models.AirbyteStreamState@19f1b80e[streamDescriptor=io.airbyte.protocol.models.StreamDescriptor@7c4f59bc[name=revenueeventitem,namespace=<null>,additionalProperties={}],streamState={"updateddate":"2023-02-01T02:01:33+01:00"},additionalProperties={}],global=<null>,data={"revenueeventitem":{"updateddate":"2023-02-01T02:01:33+01:00"}},additionalProperties={}],trace=<null>,control=<null>,additionalProperties={}]
2023-04-27 14:23:18 INFO i.a.w.g.DefaultReplicationWorker(lambda$readFromDstRunnable$4):317 - State in DefaultReplicationWorker from destination: io.airbyte.protocol.models.AirbyteMessage@3ff62966[type=STATE,log=<null>,spec=<null>,connectionStatus=<null>,catalog=<null>,record=<null>,state=io.airbyte.protocol.models.AirbyteStateMessage@10acfb43[type=STREAM,stream=io.airbyte.protocol.models.AirbyteStreamState@4891090[streamDescriptor=io.airbyte.protocol.models.StreamDescriptor@20d3b04[name=revenueeventitem,namespace=<null>,additionalProperties={}],streamState={"updateddate":"2023-02-01T02:01:33+01:00"},additionalProperties={}],global=<null>,data={"revenueeventitem":{"updateddate":"2023-02-01T02:01:33+01:00"}},additionalProperties={}],trace=<null>,control=<null>,additionalProperties={}]

Those last messages are repeated, unclear if they're related to the issue. data={"revenueeventitem":{"updateddate":"2023-02-01T02:07:29+01:00"}} seem to be incremented ever so slightly with each log message.

@jetvp
Copy link

jetvp commented Apr 28, 2023

I came over this issue the other day.

The ZuoraObjectsBase > def get_json_schema(self) should have a cache above it:

    @lru_cache(maxsize=None)
    def get_json_schema(self) -> Mapping[str, Any]:
        """
        Override get_json_schema CDK method to retrieve the schema information for Zuora Object dynamicaly.
        """
        schema = list(ZuoraDescribeObject(self.name, config=self._config).read_records(sync_mode=None))
        return {"type": "object", "properties": {key: d[key] for d in schema for key in d}}

You'll also need to import:

from functools import lru_cache

That means it will only call describe once and use that result for the other rows (instead of firing of an API call for every row!).

I'm currently fixing it to make sure it does not return more than the 50k rows, and itinerates from the last date (but can have that in a separate patch).

@jetvp
Copy link

jetvp commented May 1, 2023

@frans-k , let me know if that works for you.

@frans-k
Copy link
Author

frans-k commented May 19, 2023

@jetvp Yes, that part seems to work now, which of course means I'm running into new issues.

Unrelated to this PR, but just to share the first new issue:

select *
            from orderaction where
            updateddate >= TIMESTAMP '2023-02-16 00:54:54.000 +01:00' and
            updateddate <= TIMESTAMP '2023-02-20 23:54:54.000 +00:00'
            order by updateddate asc

Fails on dbValue 3 not supported for coreEnumClass CancellationPolicy. Guessing that #2227 is required to fix this on the client side.

Even a table that don't have this issue is failing on something as well:

  File "/usr/local/lib/python3.9/site-packages/airbyte_cdk/sources/streams/http/http.py", line 439, in _read_pages
    request, response = self._fetch_next_page(stream_slice, stream_state, next_page_token)
  File "/usr/local/lib/python3.9/site-packages/airbyte_cdk/sources/streams/http/http.py", line 462, in _fetch_next_page
    response = self._send_request(request, request_kwargs)
  File "/airbyte/integration_code/source_zuora/source.py", line 452, in _send_request
    status = job_check["data"]["queryStatus"]
KeyError: 'data'
,retryable=<null>,timestamp=1684489330161], io.airbyte.config.FailureReason@34348470[failureOrigin=source,failureType=<null>,internalMessage=Source process exited with non-zero exit code 1,externalMessage=Something went wrong within the source connector,metadata=io.airbyte.config.Metadata@454b1d9c[additionalProperties={attemptNumber=0, jobId=120, connector_command=read}],stacktrace=io.airbyte.workers.internal.exception.SourceException: Source process exited with non-zero exit code 1
	at io.airbyte.workers.general.DefaultReplicationWorker.lambda$readFromSrcAndWriteToDstRunnable$7(DefaultReplicationWorker.java:457)
	at java.base/java.util.concurrent.CompletableFuture$AsyncRun.run(CompletableFuture.java:1804)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1144)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:642)
	at java.base/java.lang.Thread.run(Thread.java:1589)
,retryable=<null>,timestamp=1684489330327]],commitStateAsap=true]

Don't have the time today to try and debug this any further.

@jrolom jrolom removed the bounty label Jun 9, 2023
@marcosmarxm marcosmarxm added team/tse Technical Support Engineers community and removed community-legacy labels Jun 27, 2023
@sh4sh sh4sh mentioned this pull request Jul 18, 2023
@marcosmarxm
Copy link
Member

Sorry the long delay to get this reviewed and merged. Our team is finishing the August Hackathon review contributions in next two weeks and after we're going to return to the community backlog.

@sh4sh
Copy link
Contributor

sh4sh commented Oct 3, 2023

@frans-k circling back to this PR and hoping to get it merged soon. Since it's been a while, I wanted to confirm - are you still interested in continuing this work? Thanks in advance!

@marcosmarxm
Copy link
Member

Closed because user didn't return last comment. @frans-k let us know if you want to continue the work here.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
area/connectors Connector related issues community connectors/source/zuora team/tse Technical Support Engineers
Projects
Development

Successfully merging this pull request may close these issues.

None yet

7 participants