Skip to content

Commit

Permalink
🐛 🎉 Source PayPal Transaction: added OAuth2.0, fixed bug with norma…
Browse files Browse the repository at this point in the history
…lization (#15000)
  • Loading branch information
bazarnov committed Jul 27, 2022
1 parent bbd6540 commit eb6fa3b
Show file tree
Hide file tree
Showing 15 changed files with 353 additions and 64 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -691,11 +691,11 @@
- name: Paypal Transaction
sourceDefinitionId: d913b0f2-cc51-4e55-a44c-8ba1697b9239
dockerRepository: airbyte/source-paypal-transaction
dockerImageTag: 0.1.7
dockerImageTag: 0.1.8
documentationUrl: https://docs.airbyte.io/integrations/sources/paypal-transaction
icon: paypal.svg
sourceType: api
releaseStage: alpha
releaseStage: beta
- name: Paystack
sourceDefinitionId: 193bdcb8-1dd9-48d1-aade-91cadfd74f9b
dockerRepository: airbyte/source-paystack
Expand Down
72 changes: 60 additions & 12 deletions airbyte-config/init/src/main/resources/seed/source_specs.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -6708,29 +6708,66 @@
supportsNormalization: false
supportsDBT: false
supported_destination_sync_modes: []
- dockerImage: "airbyte/source-paypal-transaction:0.1.7"
- dockerImage: "airbyte/source-paypal-transaction:0.1.8"
spec:
documentationUrl: "https://docs.airbyte.io/integrations/sources/paypal-transactions"
connectionSpecification:
$schema: "http://json-schema.org/draft-07/schema#"
title: "Paypal Transaction Search"
type: "object"
required:
- "client_id"
- "secret"
- "start_date"
- "is_sandbox"
additionalProperties: true
properties:
client_id:
title: "Client ID"
type: "string"
description: "The Client ID of your Paypal developer application."
secret:
title: "Client Secret"
type: "string"
description: "The Client Secret of your Paypal developer application."
airbyte_secret: true
credentials:
title: "Authenticate using"
type: "object"
oneOf:
- type: "object"
title: "OAuth2.0"
required:
- "auth_type"
- "refresh_token"
properties:
auth_type:
type: "string"
const: "oauth2.0"
client_id:
type: "string"
title: "Client ID"
description: "The Client ID of your Paypal developer application."
airbyte_secret: true
client_secret:
type: "string"
title: "Client secret"
description: "The Client Secret of your Paypal developer application."
airbyte_secret: true
refresh_token:
type: "string"
title: "Refresh token"
description: "The key to refresh the expired access token."
airbyte_secret: true
- title: "Private OAuth Creds"
type: "object"
required:
- "auth_type"
- "client_id"
- "client_secret"
properties:
auth_type:
type: "string"
const: "private_oauth"
client_id:
type: "string"
title: "Client ID"
description: "The Client ID of your Paypal developer application."
airbyte_secret: true
client_secret:
type: "string"
title: "Client secret"
description: "The Client Secret of your Paypal developer application."
airbyte_secret: true
start_date:
type: "string"
title: "Start Date"
Expand All @@ -6748,6 +6785,17 @@
supportsNormalization: false
supportsDBT: false
supported_destination_sync_modes: []
authSpecification:
auth_type: "oauth2.0"
oauth2Specification:
rootObject:
- "credentials"
- "0"
oauthFlowInitParameters:
- - "client_id"
- - "client_secret"
oauthFlowOutputParameters:
- - "refresh_token"
- dockerImage: "airbyte/source-paystack:0.1.1"
spec:
documentationUrl: "https://docs.airbyte.io/integrations/sources/paystack"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,5 +12,5 @@ RUN pip install .
ENV AIRBYTE_ENTRYPOINT "python /airbyte/integration_code/main.py"
ENTRYPOINT ["python", "/airbyte/integration_code/main.py"]

LABEL io.airbyte.version=0.1.7
LABEL io.airbyte.version=0.1.8
LABEL io.airbyte.name=airbyte/source-paypal-transaction
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ docker run --rm -v $(pwd)/secrets:/secrets -v $(pwd)/integration_tests:/integrat
Make sure to familiarize yourself with [pytest test discovery](https://docs.pytest.org/en/latest/goodpractices.html#test-discovery) to know how your test files and methods should be named.
First install test dependencies into your virtual environment:
```
pip install .[tests]
pip install '.[tests]'
```
### Unit Tests
To run unit tests locally, from the connector directory run:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,10 @@ tests:
status: "succeed"
- config_path: "integration_tests/invalid_config.json"
status: "failed"
- config_path: "secrets/config_oauth.json"
status: "succeed"
- config_path: "integration_tests/invalid_config_oauth.json"
status: "failed"
discovery:
- config_path: "secrets/config.json"
basic_read:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,7 @@
"name": "transactions",
"json_schema": {},
"source_defined_cursor": true,
"default_cursor_field": [
"transaction_info",
"transaction_initiation_date"
],
"default_cursor_field": ["transaction_initiation_date"],
"supported_sync_modes": ["full_refresh", "incremental"]
},
"sync_mode": "incremental",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,7 @@
"name": "transactions",
"json_schema": {},
"source_defined_cursor": true,
"default_cursor_field": [
"transaction_info",
"transaction_initiation_date"
],
"default_cursor_field": ["transaction_initiation_date"],
"supported_sync_modes": ["full_refresh", "incremental"]
},
"sync_mode": "incremental",
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
{
"credentials": {
"auth_type": "oauth2.0",
"client_id": "AWA__",
"secret": "ENC__",
"refresh_token": "__"
},
"start_date": "2021-07-03T00:00:00+00:00",
"end_date": "2021-07-04T23:59:59+00:00",
"is_sandbox": false
}
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,14 @@
}
}
},
"transaction_id": {
"type": ["null", "string"],
"maxLength": 24
},
"transaction_initiation_date": {
"type": ["null", "string"],
"format": "date-time"
},
"payer_info": {
"type": ["null", "object"],
"properties": {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
# Copyright (c) 2022 Airbyte, Inc., all rights reserved.
#

import base64
import json
import logging
import time
Expand Down Expand Up @@ -51,10 +52,9 @@ def __repr__(self):

def get_endpoint(is_sandbox: bool = False) -> str:
if is_sandbox:
endpoint = "https://api-m.sandbox.paypal.com"
else:
endpoint = "https://api-m.paypal.com"
return endpoint
return "https://api-m.sandbox.paypal.com"

return "https://api-m.paypal.com"


class PaypalTransactionStream(HttpStream, ABC):
Expand Down Expand Up @@ -82,6 +82,10 @@ class PaypalTransactionStream(HttpStream, ABC):
stream_slice_period: Mapping[str, int] = {"days": 15} # max period is 31 days (API limit)

requests_per_minute: int = 30 # API limit is 50 reqs/min from 1 IP to all endpoints, otherwise IP is banned for 5 mins
# if the stream has nested cursor_field, we should trry to unnest it once parsing the recods to avoid normalization conflicts.
unnest_cursor: bool = False
unnest_pk: bool = False
nested_object: str = None

def __init__(
self,
Expand Down Expand Up @@ -156,11 +160,25 @@ def parse_response(self, response: requests.Response, **kwargs) -> Iterable[Mapp
# In order to support direct datetime string comparison (which is performed in incremental acceptance tests)
# convert any date format to python iso format string for date based cursors
self.update_field(record, self.cursor_field, lambda date: isoparse(date).isoformat())
# unnest cursor_field to handle normalization correctly
if self.unnest_cursor:
self.unnest_field(record, self.nested_object, self.cursor_field)
# unnest primary_key to handle normalization correctly
if self.unnest_pk:
self.unnest_field(record, self.nested_object, self.primary_key)
yield record

# sleep for 1-2 secs to not reach rate limit: 50 requests per minute
time.sleep(60 / self.requests_per_minute)

@staticmethod
def unnest_field(record: Mapping[str, Any], unnest_from: Dict, cursor_field: str):
"""
Unnest cursor_field to the root level of the record.
"""
if unnest_from in record:
record[cursor_field] = record.get(unnest_from).get(cursor_field)

@staticmethod
def update_field(record: Mapping[str, Any], field_path: Union[List[str], str], update: Callable[[Any], None]):
if not isinstance(field_path, List):
Expand Down Expand Up @@ -332,8 +350,14 @@ class Transactions(PaypalTransactionStream):
"""

data_field = "transaction_details"
primary_key = [["transaction_info", "transaction_id"]]
cursor_field = ["transaction_info", "transaction_initiation_date"]
nested_object = "transaction_info"

primary_key = "transaction_id"
cursor_field = "transaction_initiation_date"

unnest_cursor = True
unnest_pk = True

transformer = TypeTransformer(TransformConfig.CustomSchemaNormalization)

# TODO handle API error when 1 request returns more than 10000 records.
Expand Down Expand Up @@ -402,39 +426,67 @@ def next_page_token(self, response: requests.Response) -> Optional[Mapping[str,

class PayPalOauth2Authenticator(Oauth2Authenticator):
"""Request example for API token extraction:
curl -v POST https://api-m.sandbox.paypal.com/v1/oauth2/token \
-H "Accept: application/json" \
-H "Accept-Language: en_US" \
-u "CLIENT_ID:SECRET" \
-d "grant_type=client_credentials"
For `old_config` scenario:
curl -v POST https://api-m.sandbox.paypal.com/v1/oauth2/token \
-H "Accept: application/json" \
-H "Accept-Language: en_US" \
-u "CLIENT_ID:SECRET" \
-d "grant_type=client_credentials"
"""

def __init__(self, config):
super().__init__(
token_refresh_endpoint=f"{get_endpoint(config['is_sandbox'])}/v1/oauth2/token",
client_id=config["client_id"],
client_secret=config["secret"],
refresh_token="",
)
def __init__(self, config: Dict):
self.old_config: bool = False
# default auth args
self.auth_args: Dict = {
"token_refresh_endpoint": f"{get_endpoint(config['is_sandbox'])}/v1/oauth2/token",
"refresh_token": "",
}
# support old configs
if "client_id" and "secret" in config.keys():
self.old_config = True
self.auth_args.update(**{"client_id": config["client_id"], "client_secret": config["secret"]})
# new configs
if "credentials" in config.keys():
credentials = config.get("credentials")
auth_type = credentials.get("auth_type")
self.auth_args.update(**{"client_id": credentials["client_id"], "client_secret": credentials["client_secret"]})
if auth_type == "oauth2.0":
self.auth_args["refresh_token"] = credentials["refresh_token"]
elif auth_type == "private_oauth":
self.old_config = True

self.config = config
super().__init__(**self.auth_args)

def get_headers(self):
# support old configs
if self.old_config:
return {"Accept": "application/json", "Accept-Language": "en_US"}
# new configs
basic_auth = base64.b64encode(bytes(f"{self.client_id}:{self.client_secret}", "utf-8")).decode("utf-8")
return {"Authorization": f"Basic {basic_auth}"}

def get_refresh_request_body(self) -> Mapping[str, Any]:
return {"grant_type": "client_credentials"}
# support old configs
if self.old_config:
return {"grant_type": "client_credentials"}
# new configs
return {"grant_type": "refresh_token", "refresh_token": self.refresh_token}

def refresh_access_token(self) -> Tuple[str, int]:
"""
returns a tuple of (access_token, token_lifespan_in_seconds)
"""
request_args = {
"url": self.token_refresh_endpoint,
"data": self.get_refresh_request_body(),
"headers": self.get_headers(),
}
try:
data = "grant_type=client_credentials"
headers = {"Accept": "application/json", "Accept-Language": "en_US"}
auth = (self.client_id, self.client_secret)
response = requests.request(
method="POST",
url=self.token_refresh_endpoint,
data=data,
headers=headers,
auth=auth,
)
# support old configs
if self.old_config:
request_args["auth"] = (self.client_id, self.client_secret)
response = requests.post(**request_args)
response.raise_for_status()
response_json = response.json()
return response_json["access_token"], response_json["expires_in"]
Expand Down

0 comments on commit eb6fa3b

Please sign in to comment.