Skip to content

Commit

Permalink
🎉 Source Slack migration to low code (#35477)
Browse files Browse the repository at this point in the history
Co-authored-by: darynaishchenko <darina.ishchenko17@gmail.com>
Co-authored-by: Daryna Ishchenko <80129833+darynaishchenko@users.noreply.github.com>
Co-authored-by: Augustin <augustin@airbyte.io>
  • Loading branch information
4 people committed Apr 15, 2024
1 parent 0c49832 commit 714eea3
Show file tree
Hide file tree
Showing 26 changed files with 1,311 additions and 542 deletions.
3 changes: 3 additions & 0 deletions airbyte-integrations/connectors/source-slack/.coveragerc
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
[run]
omit =
source_slack/run.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,9 @@ acceptance_tests:
- spec_path: "source_slack/spec.json"
backward_compatibility_tests_config:
# edited `min`/`max` > `minimum`/`maximum` for `lookback_window` field
disable_for_version: "0.1.26"
#disable_for_version: "0.1.26"
# slight changes: removed doc url, added new null oauth param
disable_for_version: "0.3.10"
connection:
tests:
- config_path: "secrets/config.json"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,49 @@
{
"type": "STREAM",
"stream": {
"stream_state": { "float_ts": 7270247822 },
"stream_descriptor": { "name": "channel_messages" }
"stream_descriptor": {
"name": "channel_messages"
},
"stream_state": {
"states": [
{
"partition": {
"channel_id": "C04LTCM2Y56",
"parent_slice": {}
},
"cursor": {
"float_ts": "2534945416"
}
},
{
"partition": {
"channel": "C04KX3KEZ54",
"parent_slice": {}
},
"cursor": {
"float_ts": "2534945416"
}
},
{
"partition": {
"channel": "C04L3M4PTJ6",
"parent_slice": {}
},
"cursor": {
"float_ts": "2534945416"
}
},
{
"partition": {
"channel": "C04LTCM2Y56",
"parent_slice": {}
},
"cursor": {
"float_ts": "2534945416"
}
}
]
}
}
}
]

Large diffs are not rendered by default.

17 changes: 15 additions & 2 deletions airbyte-integrations/connectors/source-slack/metadata.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ data:
connectorSubtype: api
connectorType: source
definitionId: c2281cee-86f9-4a86-bb48-d23286b4c7bd
dockerImageTag: 0.4.1
dockerImageTag: 1.0.0
dockerRepository: airbyte/source-slack
documentationUrl: https://docs.airbyte.com/integrations/sources/slack
githubIssueLabel: source-slack
Expand All @@ -28,6 +28,19 @@ data:
oss:
enabled: true
releaseStage: generally_available
releases:
breakingChanges:
1.0.0:
message:
The source Slack connector is being migrated from the Python CDK to our declarative low-code CDK.
Due to changes in the handling of state format for incremental substreams, this migration constitutes a breaking change for the channel_messages stream.
Users will need to reset source configuration, refresh the source schema and reset the channel_messages stream after upgrading.
For more information, see our migration documentation for source Slack.
upgradeDeadline: "2024-04-29"
scopedImpact:
- scopeType: stream
impactedScopes:
- "channel_messages"
suggestedStreams:
streams:
- users
Expand All @@ -38,5 +51,5 @@ data:
supportLevel: certified
tags:
- language:python
- cdk:python
- cdk:low-code
metadataSpecVersion: "1.0"
138 changes: 76 additions & 62 deletions airbyte-integrations/connectors/source-slack/poetry.lock

Large diffs are not rendered by default.

3 changes: 2 additions & 1 deletion airbyte-integrations/connectors/source-slack/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ requires = [ "poetry-core>=1.0.0",]
build-backend = "poetry.core.masonry.api"

[tool.poetry]
version = "0.4.1"
version = "1.0.0"
name = "source-slack"
description = "Source implementation for Slack."
authors = [ "Airbyte <contact@airbyte.io>",]
Expand All @@ -19,6 +19,7 @@ include = "source_slack"
python = "^3.9,<3.12"
pendulum = "==2.1.2"
airbyte-cdk = "^0"
freezegun = "^1.4.0"

[tool.poetry.scripts]
source-slack = "source_slack.run:run"
Expand Down
Empty file.
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
# Copyright (c) 2023 Airbyte, Inc., all rights reserved.

from dataclasses import dataclass
from typing import List

import requests
from airbyte_cdk.sources.declarative.extractors import DpathExtractor
from airbyte_cdk.sources.declarative.types import Record


@dataclass
class ChannelMembersExtractor(DpathExtractor):
"""
Transform response from list of strings to list dicts:
from: ['aa', 'bb']
to: [{'member_id': 'aa'}, {{'member_id': 'bb'}]
"""

def extract_records(self, response: requests.Response) -> List[Record]:
records = super().extract_records(response)
return [{"member_id": record} for record in records]
Original file line number Diff line number Diff line change
@@ -0,0 +1,123 @@
# Copyright (c) 2023 Airbyte, Inc., all rights reserved.

import logging
from functools import partial
from typing import Any, Iterable, List, Mapping, Optional

import requests
from airbyte_cdk.models import SyncMode
from airbyte_cdk.sources.declarative.partition_routers import SinglePartitionRouter
from airbyte_cdk.sources.declarative.retrievers import SimpleRetriever
from airbyte_cdk.sources.declarative.types import Record, StreamSlice
from airbyte_cdk.sources.streams.core import StreamData
from airbyte_cdk.sources.streams.http import HttpStream
from airbyte_cdk.sources.streams.http.auth import TokenAuthenticator

LOGGER = logging.getLogger("airbyte_logger")


class JoinChannelsStream(HttpStream):
"""
This class is a special stream which joins channels because the Slack API only returns messages from channels this bot is in.
Its responses should only be logged for debugging reasons, not read as records.
"""

url_base = "https://slack.com/api/"
http_method = "POST"
primary_key = "id"

def __init__(self, channel_filter: List[str] = None, **kwargs):
self.channel_filter = channel_filter or []
super().__init__(**kwargs)

def path(self, **kwargs) -> str:
return "conversations.join"

def parse_response(self, response: requests.Response, stream_slice: Mapping[str, Any] = None, **kwargs) -> Iterable:
"""
Override to simply indicate that the specific channel was joined successfully.
This method should not return any data, but should return an empty iterable.
"""
is_ok = response.json().get("ok", False)
if is_ok:
self.logger.info(f"Successfully joined channel: {stream_slice['channel_name']}")
else:
self.logger.info(f"Unable to joined channel: {stream_slice['channel_name']}. Reason: {response.json()}")
return []

def request_body_json(self, stream_slice: Mapping = None, **kwargs) -> Optional[Mapping]:
if stream_slice:
return {"channel": stream_slice.get("channel")}

def next_page_token(self, response: requests.Response) -> Optional[Mapping[str, Any]]:
"""
The pagination is not applicable to this Service Stream.
"""
return None


class ChannelsRetriever(SimpleRetriever):
def __post_init__(self, parameters: Mapping[str, Any]):
super().__post_init__(parameters)
self.stream_slicer = SinglePartitionRouter(parameters={})
self.record_selector.transformations = []

def should_join_to_channel(self, config: Mapping[str, Any], record: Record) -> bool:
"""
The `is_member` property indicates whether the API Bot is already assigned / joined to the channel.
https://api.slack.com/types/conversation#booleans
"""
return config["join_channels"] and not record.get("is_member")

def make_join_channel_slice(self, channel: Mapping[str, Any]) -> Mapping[str, Any]:
channel_id: str = channel.get("id")
channel_name: str = channel.get("name")
LOGGER.info(f"Joining Slack Channel: `{channel_name}`")
return {"channel": channel_id, "channel_name": channel_name}

def join_channels_stream(self, config) -> JoinChannelsStream:
token = config["credentials"].get("api_token") or config["credentials"].get("access_token")
authenticator = TokenAuthenticator(token)
channel_filter = config["channel_filter"]
return JoinChannelsStream(authenticator=authenticator, channel_filter=channel_filter)

def join_channel(self, config: Mapping[str, Any], record: Mapping[str, Any]):
list(
self.join_channels_stream(config).read_records(
sync_mode=SyncMode.full_refresh,
stream_slice=self.make_join_channel_slice(record),
)
)

def read_records(
self,
records_schema: Mapping[str, Any],
stream_slice: Optional[StreamSlice] = None,
) -> Iterable[StreamData]:
_slice = stream_slice or StreamSlice(partition={}, cursor_slice={}) # None-check

self._paginator.reset()

most_recent_record_from_slice = None
record_generator = partial(
self._parse_records,
stream_state=self.state or {},
stream_slice=_slice,
records_schema=records_schema,
)

for stream_data in self._read_pages(record_generator, self.state, _slice):
# joining channel logic
if self.should_join_to_channel(self.config, stream_data):
self.join_channel(self.config, stream_data)

current_record = self._extract_record(stream_data, _slice)
if self.cursor and current_record:
self.cursor.observe(_slice, current_record)

most_recent_record_from_slice = self._get_most_recent_record(most_recent_record_from_slice, current_record, _slice)
yield stream_data

if self.cursor:
self.cursor.observe(_slice, most_recent_record_from_slice)
return
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
# Copyright (c) 2023 Airbyte, Inc., all rights reserved.

import logging
from typing import Any, List, Mapping

from airbyte_cdk import AirbyteEntrypoint
from airbyte_cdk.config_observation import create_connector_config_control_message
from airbyte_cdk.sources.message import InMemoryMessageRepository, MessageRepository
from source_slack import SourceSlack

logger = logging.getLogger("airbyte_logger")


class MigrateLegacyConfig:
message_repository: MessageRepository = InMemoryMessageRepository()

@classmethod
def _should_migrate(cls, config: Mapping[str, Any]) -> bool:
"""
legacy config:
{
"start_date": "2021-07-22T20:00:00Z",
"end_date": "2021-07-23T20:00:00Z",
"lookback_window": 1,
"join_channels": True,
"channel_filter": ["airbyte-for-beginners", "good-reads"],
"api_token": "api-token"
}
api token should be in the credentials object
"""
if config.get("api_token") and not config.get("credentials"):
return True
return False

@classmethod
def _move_token_to_credentials(cls, config: Mapping[str, Any]) -> Mapping[str, Any]:
api_token = config["api_token"]
config.update({"credentials": {"api_token": api_token, "option_title": "API Token Credentials"}})
config.pop("api_token")
return config

@classmethod
def _modify_and_save(cls, config_path: str, source: SourceSlack, config: Mapping[str, Any]) -> Mapping[str, Any]:
migrated_config = cls._move_token_to_credentials(config)
# save the config
source.write_config(migrated_config, config_path)
return migrated_config

@classmethod
def _emit_control_message(cls, migrated_config: Mapping[str, Any]) -> None:
# add the Airbyte Control Message to message repo
cls.message_repository.emit_message(create_connector_config_control_message(migrated_config))
# emit the Airbyte Control Message from message queue to stdout
for message in cls.message_repository._message_queue:
print(message.json(exclude_unset=True))

@classmethod
def migrate(cls, args: List[str], source: SourceSlack) -> None:
"""
This method checks the input args, should the config be migrated,
transform if necessary and emit the CONTROL message.
"""
# get config path
config_path = AirbyteEntrypoint(source).extract_config(args)
# proceed only if `--config` arg is provided
if config_path:
# read the existing config
config = source.read_config(config_path)
# migration check
if cls._should_migrate(config):
cls._emit_control_message(
cls._modify_and_save(config_path, source, config),
)
Loading

0 comments on commit 714eea3

Please sign in to comment.