Skip to content

Commit

Permalink
✨ Source OpsGenie - Migrate Python CDK to Low Code CDK (#31552)
Browse files Browse the repository at this point in the history
Co-authored-by: Marcos Marx <marcosmarxm@users.noreply.github.com>
Co-authored-by: marcosmarxm <marcosmarxm@gmail.com>
Co-authored-by: sajarin <sajarindider@gmail.com>
  • Loading branch information
4 people committed Nov 10, 2023
1 parent 1ab67da commit 633cdd1
Show file tree
Hide file tree
Showing 14 changed files with 258 additions and 921 deletions.
2 changes: 1 addition & 1 deletion airbyte-integrations/connectors/source-opsgenie/Dockerfile
Expand Up @@ -34,5 +34,5 @@ COPY source_opsgenie ./source_opsgenie
ENV AIRBYTE_ENTRYPOINT "python /airbyte/integration_code/main.py"
ENTRYPOINT ["python", "/airbyte/integration_code/main.py"]

LABEL io.airbyte.version=0.2.0
LABEL io.airbyte.version=0.3.0
LABEL io.airbyte.name=airbyte/source-opsgenie
27 changes: 5 additions & 22 deletions airbyte-integrations/connectors/source-opsgenie/README.md
@@ -1,34 +1,17 @@
# Opsgenie Source

This is the repository for the Opsgenie source connector, written in Python.
This is the repository for the Opsgenie source connector, written in low-code configuration based source connector.
For information about how to use this connector within Airbyte, see [the documentation](https://docs.airbyte.io/integrations/sources/opsgenie).

## Local development

### Prerequisites
**To iterate on this connector, make sure to complete this prerequisites section.**
#### Building via Gradle
You can also build the connector in Gradle. This is typically used in CI and not needed for your development workflow.

#### Minimum Python version required `= 3.9.0`

#### Build & Activate Virtual Environment and install dependencies
From this connector directory, create a virtual environment:
```
python -m venv .venv
To build using Gradle, from the Airbyte repository root, run:
```

This will generate a virtualenv for this module in `.venv/`. Make sure this venv is active in your
development environment of choice. To activate it from the terminal, run:
./gradlew :airbyte-integrations:connectors:source-opsgenie:build
```
source .venv/bin/activate
pip install -r requirements.txt
pip install '.[tests]'
```
If you are in an IDE, follow your IDE's instructions to activate the virtualenv.

Note that while we are installing dependencies from `requirements.txt`, you should only edit `setup.py` for your dependencies. `requirements.txt` is
used for editable installs (`pip install -e`) to pull in Python dependencies from the monorepo and will call `setup.py`.
If this is mumbo jumbo to you, don't worry about it, just put your deps in `setup.py` but install using `pip install -r requirements.txt` and everything
should work as you expect.

#### Create credentials
**If you are a community contributor**, follow the instructions in the [documentation](https://docs.airbyte.io/integrations/sources/opsgenie)
Expand Down
Expand Up @@ -14,21 +14,11 @@ tests:
basic_read:
- config_path: "secrets/config.json"
configured_catalog_path: "integration_tests/configured_catalog.json"
empty_streams:
[
"alerts",
"alert_recipients",
"alert_logs",
"incidents",
"integrations",
"teams",
"user_teams",
"services",
]
incremental:
- config_path: "secrets/config.json"
configured_catalog_path: "integration_tests/configured_catalog.json"
# future_state_path: "integration_tests/abnormal_state.json"
empty_streams: ["incidents", "services"]
# incremental:
# - config_path: "secrets/config.json"
# configured_catalog_path: "integration_tests/configured_catalog.json"
# future_state_path: "integration_tests/abnormal_state.json"
full_refresh:
- config_path: "secrets/config.json"
configured_catalog_path: "integration_tests/configured_catalog.json"
4 changes: 2 additions & 2 deletions airbyte-integrations/connectors/source-opsgenie/metadata.yaml
Expand Up @@ -2,7 +2,7 @@ data:
connectorSubtype: api
connectorType: source
definitionId: 06bdb480-2598-40b8-8b0f-fc2e2d2abdda
dockerImageTag: 0.2.0
dockerImageTag: 0.3.0
dockerRepository: airbyte/source-opsgenie
githubIssueLabel: source-opsgenie
license: MIT
Expand All @@ -15,7 +15,7 @@ data:
releaseStage: alpha
documentationUrl: https://docs.airbyte.com/integrations/sources/opsgenie
tags:
- language:python
- language:lowcode
ab_internal:
sl: 100
ql: 100
Expand Down
@@ -0,0 +1,232 @@
version: 0.51.16
type: DeclarativeSource
definitions:
selector:
type: RecordSelector
extractor:
type: DpathExtractor
field_path:
- data
auth:
type: ApiKeyAuthenticator
api_token: "GenieKey {{ config['api_token'] }}"
inject_into:
type: RequestOption
field_name: Authorization
inject_into: header
on_error:
type: CompositeErrorHandler
error_handlers:
- type: DefaultErrorHandler
backoff_strategies:
- type: WaitUntilTimeFromHeader
header: X-RateLimit-Period-In-Sec
pagination:
type: DefaultPaginator
page_token_option:
type: RequestPath
pagination_strategy:
type: CursorPagination
cursor_value: '{{ response.get("paging", {}).get("next", {}) }}'
stop_condition: '{{ not response.get("paging", {}).get("next", {}) }}'

requester:
type: HttpRequester
url_base: "https://{{ config['endpoint'] }}"
http_method: GET
request_parameters: {}
request_headers:
Accept: application/json
authenticator:
$ref: "#/definitions/auth"
error_handler:
$ref: "#/definitions/on_error"
request_body_json: {}

retriever:
type: SimpleRetriever
requester:
$ref: "#/definitions/requester"
record_selector:
$ref: "#/definitions/selector"
paginator:
$ref: "#/definitions/pagination"

base_stream:
schema_loader:
type: JsonFileSchemaLoader
file_path: "./source_opsgenie/schemas/{{ parameters['name'] }}.json"
retriever:
$ref: "#/definitions/retriever"

users_stream:
$ref: "#/definitions/base_stream"
$parameters:
name: "users"
primary_key: "id"
path: "v2/users"

teams_stream:
$ref: "#/definitions/base_stream"
$parameters:
name: "teams"
primary_key: "id"
path: "v2/teams"

services_stream:
$ref: "#/definitions/base_stream"
$parameters:
name: "services"
primary_key: "id"
path: "v1/services"

integrations_stream:
$ref: "#/definitions/base_stream"
$parameters:
name: "integrations"
primary_key: "id"
path: "v2/integrations"

incidents_stream:
$ref: "#/definitions/base_stream"
$parameters:
name: "incidents"
primary_key: "id"
path: "v1/incidents"

alerts_stream:
$ref: "#/definitions/base_stream"
retriever:
$ref: "#/definitions/base_stream/retriever"
requester:
$ref: "#/definitions/requester"
request_parameters:
order: "asc"
incremental_sync:
type: DatetimeBasedCursor
cursor_field: updatedAt
cursor_datetime_formats:
- "%Y-%m-%dT%H:%M:%S.%fZ"
datetime_format: "%s"
start_datetime:
datetime: "{{ config['start_date'] }}"
datetime_format: "%Y-%m-%dT%H:%M:%SZ"
$parameters:
name: "alerts"
primary_key: "id"
path: "v2/alerts"

user_teams_stream:
$ref: "#/definitions/base_stream"
$parameters:
name: user_teams
primary_key: "id"
retriever:
$ref: "#/definitions/base_stream/retriever"
requester:
$ref: "#/definitions/requester"
path: "v2/users/{{ stream_partition['user_id'] }}/teams"
partition_router:
type: SubstreamPartitionRouter
parent_stream_configs:
- type: ParentStreamConfig
parent_key: id
partition_field: user_id
stream:
$ref: "#/definitions/users_stream"
transformations:
- type: AddFields
fields:
- type: AddedFieldDefinition
path: ["user_id"]
value: "{{ stream_partition['id'] }}"

alert_recipients_stream:
$ref: "#/definitions/base_stream"
$parameters:
name: "alert_recipients"
primary_key: "user_id"
path: "v2/alerts/{{ record }}/recipients"
retriever:
$ref: "#/definitions/base_stream/retriever"
requester:
$ref: "#/definitions/requester"
path: "v2/alerts/{{ stream_partition['alert_id'] }}/recipients"
partition_router:
type: SubstreamPartitionRouter
parent_stream_configs:
- type: ParentStreamConfig
parent_key: id
partition_field: alert_id
stream:
$ref: "#/definitions/alerts_stream"
transformations:
- type: AddFields
fields:
- type: AddedFieldDefinition
path: ["alert_id"]
value: "{{ stream_partition['id'] }}"
- type: AddFields
fields:
- type: AddedFieldDefinition
path: ["user_id"]
value: "{{ record['user']['id'] }}"
- type: AddFields
fields:
- type: AddedFieldDefinition
path: ["user_username"]
value: "{{ record['user']['username'] }}"
- type: RemoveFields
field_pointers:
- ["user"]

alert_logs_stream:
$ref: "#/definitions/base_stream"
$parameters:
name: "alert_logs"
primary_key: "offset"
path: "v2/alerts/{{ stream_partition.alert_id }}/logs"
retriever:
$ref: "#/definitions/base_stream/retriever"
requester:
$ref: "#/definitions/requester"
request_parameters:
order: asc
partition_router:
type: SubstreamPartitionRouter
parent_stream_configs:
- type: ParentStreamConfig
parent_key: id
partition_field: alert_id
stream:
$ref: "#/definitions/alerts_stream"
transformations:
- type: AddFields
fields:
- type: AddedFieldDefinition
path: ["alert_id"]
value: "{{ stream_partition['id'] }}"

check:
type: CheckStream
stream_names:
- users
# - teams
# - services
# - incidents
# - integrations
# - alerts
# - user_teams
# - alert_recipients
# - alert_logs

streams:
- "#/definitions/users_stream"
- "#/definitions/teams_stream"
- "#/definitions/services_stream"
- "#/definitions/integrations_stream"
- "#/definitions/incidents_stream"
- "#/definitions/alerts_stream"
- "#/definitions/user_teams_stream"
- "#/definitions/alert_recipients_stream"
- "#/definitions/alert_logs_stream"
Expand Up @@ -2,54 +2,16 @@
# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
#

from airbyte_cdk.sources.declarative.yaml_declarative_source import YamlDeclarativeSource

from typing import Any, List, Mapping, Tuple
"""
This file provides the necessary constructs to interpret a provided declarative YAML configuration file into
source connector.
WARNING: Do not modify this file.
"""

import requests
from airbyte_cdk.sources import AbstractSource
from airbyte_cdk.sources.streams import Stream
from airbyte_cdk.sources.streams.http.requests_native_auth import TokenAuthenticator

from .streams import AlertLogs, AlertRecipients, Alerts, Incidents, Integrations, Services, Teams, Users, UserTeams


# Source
class SourceOpsgenie(AbstractSource):
@staticmethod
def get_authenticator(config: Mapping[str, Any]):
return TokenAuthenticator(config["api_token"], auth_method="GenieKey")

def check_connection(self, logger, config) -> Tuple[bool, any]:

try:
auth = self.get_authenticator(config)
api_endpoint = f"https://{config['endpoint']}/v2/account"

response = requests.get(
api_endpoint,
headers=auth.get_auth_header(),
)

return response.status_code == requests.codes.ok, None

except Exception as error:
return False, f"Unable to connect to Opsgenie API with the provided credentials - {repr(error)}"

def streams(self, config: Mapping[str, Any]) -> List[Stream]:
auth = self.get_authenticator(config)
args = {"authenticator": auth, "endpoint": config["endpoint"]}
incremental_args = {**args, "start_date": config.get("start_date", "")}

users = Users(**args)
alerts = Alerts(**incremental_args)
return [
alerts,
AlertRecipients(parent_stream=alerts, **args),
AlertLogs(parent_stream=alerts, **args),
Incidents(**incremental_args),
Integrations(**args),
Services(**args),
Teams(**args),
users,
UserTeams(parent_stream=users, **args),
]
# Declarative Source
class SourceOpsgenie(YamlDeclarativeSource):
def __init__(self):
super().__init__(**{"path_to_yaml": "manifest.yaml"})
@@ -1,4 +1,4 @@
documentationUrl: https://docsurl.com
documentationUrl: https://docs.airbyte.com/integrations/sources/opsgenie
connectionSpecification:
$schema: http://json-schema.org/draft-07/schema#
title: Opsgenie Spec
Expand Down

0 comments on commit 633cdd1

Please sign in to comment.