Skip to content

Commit

Permalink
🎉 Source Pipedrive: add oAuth support (#6821)
Browse files Browse the repository at this point in the history
* Add oauth support

* Upd changelog

* Update airbyte-integrations/connectors/source-pipedrive/source_pipedrive/spec.json

Co-authored-by: Sherif A. Nada <snadalive@gmail.com>

Co-authored-by: Sherif A. Nada <snadalive@gmail.com>
  • Loading branch information
gaart and sherifnada committed Oct 12, 2021
1 parent 6fb989f commit 99b1e91
Show file tree
Hide file tree
Showing 12 changed files with 147 additions and 37 deletions.
2 changes: 2 additions & 0 deletions .github/workflows/publish-command.yml
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,8 @@ jobs:
MYSQL_SSH_PWD_TEST_CREDS: ${{ secrets.MYSQL_SSH_PWD_TEST_CREDS }}
POSTHOG_TEST_CREDS: ${{ secrets.POSTHOG_TEST_CREDS }}
PIPEDRIVE_INTEGRATION_TESTS_CREDS: ${{ secrets.PIPEDRIVE_INTEGRATION_TESTS_CREDS }}
PIPEDRIVE_INTEGRATION_TESTS_CREDS_OAUTH: ${{ secrets.PIPEDRIVE_INTEGRATION_TESTS_CREDS_OAUTH }}
PIPEDRIVE_INTEGRATION_TESTS_CREDS_OLD: ${{ secrets.PIPEDRIVE_INTEGRATION_TESTS_CREDS_OLD }}
RECHARGE_INTEGRATION_TEST_CREDS: ${{ secrets.RECHARGE_INTEGRATION_TEST_CREDS }}
QUICKBOOKS_TEST_CREDS: ${{ secrets.QUICKBOOKS_TEST_CREDS }}
SALESFORCE_BULK_INTEGRATION_TESTS_CREDS: ${{ secrets.SALESFORCE_BULK_INTEGRATION_TESTS_CREDS }}
Expand Down
2 changes: 2 additions & 0 deletions .github/workflows/test-command.yml
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,8 @@ jobs:
MYSQL_SSH_PWD_TEST_CREDS: ${{ secrets.MYSQL_SSH_PWD_TEST_CREDS }}
POSTHOG_TEST_CREDS: ${{ secrets.POSTHOG_TEST_CREDS }}
PIPEDRIVE_INTEGRATION_TESTS_CREDS: ${{ secrets.PIPEDRIVE_INTEGRATION_TESTS_CREDS }}
PIPEDRIVE_INTEGRATION_TESTS_CREDS_OAUTH: ${{ secrets.PIPEDRIVE_INTEGRATION_TESTS_CREDS_OAUTH }}
PIPEDRIVE_INTEGRATION_TESTS_CREDS_OLD: ${{ secrets.PIPEDRIVE_INTEGRATION_TESTS_CREDS_OLD }}
RECHARGE_INTEGRATION_TEST_CREDS: ${{ secrets.RECHARGE_INTEGRATION_TEST_CREDS }}
QUICKBOOKS_TEST_CREDS: ${{ secrets.QUICKBOOKS_TEST_CREDS }}
SALESFORCE_BULK_INTEGRATION_TESTS_CREDS: ${{ secrets.SALESFORCE_BULK_INTEGRATION_TESTS_CREDS }}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,6 @@
"sourceDefinitionId": "d8286229-c680-4063-8c59-23b9b391c700",
"name": "Pipedrive",
"dockerRepository": "airbyte/source-pipedrive",
"dockerImageTag": "0.1.5",
"dockerImageTag": "0.1.6",
"documentationUrl": "https://docs.airbyte.io/integrations/sources/pipedrive"
}
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@
- sourceDefinitionId: d8286229-c680-4063-8c59-23b9b391c700
name: Pipedrive
dockerRepository: airbyte/source-pipedrive
dockerImageTag: 0.1.5
dockerImageTag: 0.1.6
documentationUrl: https://docs.airbyte.io/integrations/sources/pipedrive
sourceType: api
- sourceDefinitionId: decd338e-5647-4c0b-adf4-da0e75f5a750
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,5 +12,5 @@ RUN pip install .
ENV AIRBYTE_ENTRYPOINT "python /airbyte/integration_code/main.py"
ENTRYPOINT ["python", "/airbyte/integration_code/main.py"]

LABEL io.airbyte.version=0.1.5
LABEL io.airbyte.version=0.1.6
LABEL io.airbyte.name=airbyte/source-pipedrive
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,10 @@ tests:
connection:
- config_path: "secrets/config.json"
status: "succeed"
- config_path: "secrets/oauth_config.json"
status: "succeed"
- config_path: "secrets/old_config.json"
status: "succeed"
- config_path: "integration_tests/invalid_config.json"
status: "failed"
discovery:
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,7 @@
{
"api_token": "wrong-api-token",
"replication_start_date": "2021-06-01T10:10:10Z"
}
"authorization": {
"auth_type": "Token",
"api_token": "wrong-api-token"
},
"replication_start_date": "2021-01-01T10:10:10Z"
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,13 +10,15 @@
from airbyte_cdk.models import SyncMode
from airbyte_cdk.sources import AbstractSource
from airbyte_cdk.sources.streams import Stream
from airbyte_cdk.sources.streams.http.auth import Oauth2Authenticator
from source_pipedrive.streams import Activities, ActivityFields, Deals, Leads, Organizations, Persons, Pipelines, Stages, Users


class SourcePipedrive(AbstractSource):
def check_connection(self, logger: AirbyteLogger, config: Mapping[str, Any]) -> Tuple[bool, Any]:
try:
deals = Deals(api_token=config["api_token"], replication_start_date=pendulum.parse(config["replication_start_date"]))
stream_kwargs = self.get_stream_kwargs(config)
deals = Deals(**stream_kwargs)
deals_gen = deals.read_records(sync_mode=SyncMode.full_refresh)
next(deals_gen)
return True, None
Expand All @@ -27,17 +29,41 @@ def streams(self, config: Mapping[str, Any]) -> List[Stream]:
"""
:param config: A Mapping of the user input configuration as defined in the connector spec.
"""
stream_kwargs = {"api_token": config["api_token"]}
incremental_stream_kwargs = {**stream_kwargs, "replication_start_date": pendulum.parse(config["replication_start_date"])}
stream_kwargs = self.get_stream_kwargs(config)
incremental_kwargs = {**stream_kwargs, "replication_start_date": pendulum.parse(config["replication_start_date"])}
streams = [
Activities(**incremental_stream_kwargs),
Activities(**incremental_kwargs),
ActivityFields(**stream_kwargs),
Deals(**incremental_stream_kwargs),
Deals(**incremental_kwargs),
Leads(**stream_kwargs),
Organizations(**incremental_stream_kwargs),
Persons(**incremental_stream_kwargs),
Pipelines(**incremental_stream_kwargs),
Stages(**incremental_stream_kwargs),
Users(**incremental_stream_kwargs),
Organizations(**incremental_kwargs),
Persons(**incremental_kwargs),
Pipelines(**incremental_kwargs),
Stages(**incremental_kwargs),
Users(**incremental_kwargs),
]
return streams

@staticmethod
def get_stream_kwargs(config: Mapping[str, Any]) -> Mapping[str, Any]:
authorization = config.get("authorization", {})
stream_kwargs = dict()

auth_type = authorization.get("auth_type")
if auth_type == "Client":
stream_kwargs["authenticator"] = Oauth2Authenticator(
token_refresh_endpoint="https://oauth.pipedrive.com/oauth/token",
client_secret=authorization.get("client_secret"),
client_id=authorization.get("client_id"),
refresh_token=authorization.get("refresh_token"),
)
elif auth_type == "Token":
stream_kwargs["authenticator"] = {"api_token": authorization.get("api_token")}
# backward compatibility
else:
if config.get("api_token"):
stream_kwargs["authenticator"] = {"api_token": config.get("api_token")}
else:
raise Exception(f"Invalid auth type: {auth_type}")

return stream_kwargs
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,77 @@
"$schema": "http://json-schema.org/draft-07/schema#",
"title": "Pipedrive Spec",
"type": "object",
"required": ["api_token", "replication_start_date"],
"additionalProperties": false,
"required": ["replication_start_date"],
"additionalProperties": true,
"properties": {
"api_token": {
"title": "API Token",
"description": "Pipedrive API Token",
"airbyte_secret": true,
"type": "string"
"authorization": {
"type": "object",
"title": "Authentication Type",
"oneOf": [
{
"title": "Sign in via Pipedrive (OAuth)",
"type": "object",
"required": [
"auth_type",
"client_id",
"client_secret",
"refresh_token"
],
"properties": {
"auth_type": {
"type": "string",
"const": "Client",
"enum": ["Client"],
"default": "Client",
"order": 0
},
"client_id": {
"title": "Client ID",
"type": "string",
"description": "The Client ID of your developer application",
"airbyte_secret": true
},
"client_secret": {
"title": "Client Secret",
"type": "string",
"description": "The client secret of your developer application",
"airbyte_secret": true
},
"access_token": {
"title": "Access Token",
"type": "string",
"description": "An access token generated using the above client ID and secret",
"airbyte_secret": true
},
"refresh_token": {
"title": "Refresh Token",
"type": "string",
"description": "A refresh token generated using the above client ID and secret",
"airbyte_secret": true
}
}
},
{
"type": "object",
"title": "API Key Authentication",
"required": ["auth_type", "api_token"],
"properties": {
"auth_type": {
"type": "string",
"const": "Token",
"enum": ["Token"],
"default": "Token",
"order": 0
},
"api_token": {
"title": "API Token",
"type": "string",
"description": "Pipedrive API Token",
"airbyte_secret": true
}
}
}
]
},
"replication_start_date": {
"title": "Replication Start Date",
Expand Down
16 changes: 12 additions & 4 deletions airbyte-integrations/connectors/source-pipedrive/source_pipedrive/streams.py
100644 → 100755
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import pendulum
import requests
from airbyte_cdk.sources.streams.http import HttpStream
from airbyte_cdk.sources.streams.http.auth import NoAuth, Oauth2Authenticator

PIPEDRIVE_URL_BASE = "https://api.pipedrive.com/v1/"

Expand All @@ -18,9 +19,13 @@ class PipedriveStream(HttpStream, ABC):
data_field = "data"
page_size = 50

def __init__(self, api_token: str, replication_start_date: pendulum.datetime = None, **kwargs):
super().__init__(**kwargs)
self._api_token = api_token
def __init__(self, authenticator, replication_start_date=None, **kwargs):
if isinstance(authenticator, Oauth2Authenticator):
super().__init__(authenticator=authenticator, **kwargs)
else:
super().__init__(**kwargs)
self._api_token = authenticator["api_token"]

self._replication_start_date = replication_start_date

@property
Expand Down Expand Up @@ -56,7 +61,10 @@ def request_params(
self, stream_state: Mapping[str, Any], stream_slice: Mapping[str, any] = None, next_page_token: Mapping[str, Any] = None
) -> MutableMapping[str, Any]:
next_page_token = next_page_token or {}
params = {"api_token": self._api_token, "limit": self.page_size, **next_page_token}
params = {"limit": self.page_size, **next_page_token}

if isinstance(self.authenticator, NoAuth):
params["api_token"] = self._api_token

replication_start_date = self._replication_start_date
if replication_start_date:
Expand Down
18 changes: 9 additions & 9 deletions docs/integrations/sources/pipedrive.md
Original file line number Diff line number Diff line change
Expand Up @@ -85,12 +85,12 @@ See [How to find the API token](https://pipedrive.readme.io/docs/how-to-find-the

## Changelog

| Version | Date | Pull Request | Subject |
| :--- | :--- | :--- | :--- |
| 0.1.5 | 2021-09-27 | [6441](https://github.com/airbytehq/airbyte/pull/6441) | Fix normalization error |
| 0.1.4 | 2021-08-26 | [5943](https://github.com/airbytehq/airbyte/pull/5943) | Add organizations stream |
| 0.1.3 | 2021-08-26 | [5642](https://github.com/airbytehq/airbyte/pull/5642) | Remove date-time from deals stream |
| 0.1.2 | 2021-07-23 | [4912](https://github.com/airbytehq/airbyte/pull/4912) | Update money type to support floating point |
| 0.1.1 | 2021-07-19 | [4686](https://github.com/airbytehq/airbyte/pull/4686) | Update spec.json |
| 0.1.0 | 2021-07-19 | [4686](https://github.com/airbytehq/airbyte/pull/4686) | Release Pipedrive connector! |

| Version | Date | Pull Request | Subject |
| :------ | :-------- | :----- | :------ |
| 0.1.6 | 2021-10-05 | [6821](https://github.com/airbytehq/airbyte/pull/6821) | Add OAuth support |
| 0.1.5 | 2021-09-27 | [6441](https://github.com/airbytehq/airbyte/pull/6441) | Fix normalization error |
| 0.1.4 | 2021-08-26 | [5943](https://github.com/airbytehq/airbyte/pull/5943) | Add organizations stream |
| 0.1.3 | 2021-08-26 | [5642](https://github.com/airbytehq/airbyte/pull/5642) | Remove date-time from deals stream |
| 0.1.2 | 2021-07-23 | [4912](https://github.com/airbytehq/airbyte/pull/4912) | Update money type to support floating point |
| 0.1.1 | 2021-07-19 | [4686](https://github.com/airbytehq/airbyte/pull/4686) | Update spec.json |
| 0.1.0 | 2021-07-19 | [4686](https://github.com/airbytehq/airbyte/pull/4686) | Release Pipedrive connector! |
4 changes: 3 additions & 1 deletion tools/bin/ci_credentials.sh
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,9 @@ write_standard_creds source-paypal-transaction "$PAYPAL_TRANSACTION_CREDS"
write_standard_creds source-mysql "$MYSQL_SSH_KEY_TEST_CREDS" "ssh-key-config.json"
write_standard_creds source-mysql "$MYSQL_SSH_PWD_TEST_CREDS" "ssh-pwd-config.json"
write_standard_creds source-posthog "$POSTHOG_TEST_CREDS"
write_standard_creds source-pipedrive "$PIPEDRIVE_INTEGRATION_TESTS_CREDS"
write_standard_creds source-pipedrive "$PIPEDRIVE_INTEGRATION_TESTS_CREDS" "config.json"
write_standard_creds source-pipedrive "$PIPEDRIVE_INTEGRATION_TESTS_CREDS_OAUTH" "oauth_config.json"
write_standard_creds source-pipedrive "$PIPEDRIVE_INTEGRATION_TESTS_CREDS_OLD" "old_config.json"
write_standard_creds source-quickbooks-singer "$QUICKBOOKS_TEST_CREDS"
write_standard_creds source-recharge "$RECHARGE_INTEGRATION_TEST_CREDS"
write_standard_creds source-recurly "$SOURCE_RECURLY_INTEGRATION_TEST_CREDS"
Expand Down

0 comments on commit 99b1e91

Please sign in to comment.