Skip to content

Commit

Permalink
🚨🚨 Source JIRA: Save state for Boards Issues per board (#33715)
Browse files Browse the repository at this point in the history
  • Loading branch information
artem1205 authored Jan 10, 2024
1 parent c084212 commit 1ac5029
Show file tree
Hide file tree
Showing 6 changed files with 97 additions and 18 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,15 @@
"name": "board_issues"
},
"stream_state": {
"updated": "2122-01-01T00:00:00Z"
"1": {
"updated": "2122-01-01T00:00:00Z"
},
"17": {
"updated": "2122-01-01T00:00:00Z"
},
"58": {
"updated": "2122-01-01T00:00:00Z"
}
}
}
},
Expand Down
10 changes: 9 additions & 1 deletion airbyte-integrations/connectors/source-jira/metadata.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ data:
connectorSubtype: api
connectorType: source
definitionId: 68e63de2-bb83-4c7e-93fa-a8a9051e3993
dockerImageTag: 0.14.1
dockerImageTag: 1.0.0
dockerRepository: airbyte/source-jira
documentationUrl: https://docs.airbyte.com/integrations/sources/jira
githubIssueLabel: source-jira
Expand All @@ -24,6 +24,14 @@ data:
oss:
enabled: true
releaseStage: generally_available
releases:
breakingChanges:
1.0.0:
message: "Stream state will be saved for every board in stream `Boards Issues`. Customers who use stream `Board Issues` in Incremental Sync mode must take action with their connections."
upgradeDeadline: "2024-01-25"
scopedImpact:
- scopeType: stream
impactedScopes: ["board_issues"]
suggestedStreams:
streams:
- issues
Expand Down
59 changes: 47 additions & 12 deletions airbyte-integrations/connectors/source-jira/source_jira/streams.py
Original file line number Diff line number Diff line change
Expand Up @@ -246,17 +246,19 @@ def transform(self, record: MutableMapping[str, Any], stream_slice: Mapping[str,
return record


class BoardIssues(IncrementalJiraStream):
class BoardIssues(StartDateJiraStream):
"""
https://developer.atlassian.com/cloud/jira/software/rest/api-group-board/#api-rest-agile-1-0-board-boardid-issue-get
"""

cursor_field = "updated"
extract_field = "issues"
api_v1 = True
state_checkpoint_interval = 50 # default page size is 50

def __init__(self, **kwargs):
super().__init__(**kwargs)
self._starting_point_cache = {}
self.boards_stream = Boards(authenticator=self.authenticator, domain=self._domain, projects=self._projects)

def path(self, stream_slice: Mapping[str, Any], **kwargs) -> str:
Expand All @@ -270,11 +272,17 @@ def request_params(
) -> MutableMapping[str, Any]:
params = super().request_params(stream_state=stream_state, stream_slice=stream_slice, next_page_token=next_page_token)
params["fields"] = ["key", "created", "updated"]
jql = self.jql_compare_date(stream_state)
jql = self.jql_compare_date(stream_state, stream_slice)
if jql:
params["jql"] = jql
return params

def jql_compare_date(self, stream_state: Mapping[str, Any], stream_slice: Mapping[str, Any]) -> Optional[str]:
compare_date = self.get_starting_point(stream_state, stream_slice)
if compare_date:
compare_date = compare_date.strftime("%Y/%m/%d %H:%M")
return f"{self.cursor_field} >= '{compare_date}'"

def _is_board_error(self, response):
"""Check if board has error and should be skipped"""
if response.status_code == 500:
Expand All @@ -288,17 +296,44 @@ def should_retry(self, response: requests.Response) -> bool:
# for all other HTTP errors the default handling is applied
return super().should_retry(response)

def stream_slices(self, **kwargs) -> Iterable[Optional[Mapping[str, Any]]]:
yield from read_full_refresh(self.boards_stream)

def read_records(self, stream_slice: Optional[Mapping[str, Any]] = None, **kwargs) -> Iterable[Mapping[str, Any]]:
for board in read_full_refresh(self.boards_stream):
try:
yield from super().read_records(stream_slice={"board_id": board["id"]}, **kwargs)
except HTTPError as e:
if self._is_board_error(e.response):
# Wrong board is skipped
self.logger.warning(f"Board {board['id']} has no columns with a mapped status. Skipping.")
continue
else:
raise
try:
yield from super().read_records(stream_slice={"board_id": stream_slice["id"]}, **kwargs)
except HTTPError as e:
if self._is_board_error(e.response):
# Wrong board is skipped
self.logger.warning(f"Board {stream_slice['id']} has no columns with a mapped status. Skipping.")
else:
raise

def get_updated_state(self, current_stream_state: MutableMapping[str, Any], latest_record: Mapping[str, Any]):
updated_state = latest_record[self.cursor_field]
board_id = str(latest_record["boardId"])
stream_state_value = current_stream_state.get(board_id, {}).get(self.cursor_field)
if stream_state_value:
updated_state = max(updated_state, stream_state_value)
current_stream_state.setdefault(board_id, {})[self.cursor_field] = updated_state
return current_stream_state

def get_starting_point(self, stream_state: Mapping[str, Any], stream_slice: Mapping[str, Any]) -> Optional[pendulum.DateTime]:
board_id = str(stream_slice["board_id"])
if self.cursor_field not in self._starting_point_cache:
self._starting_point_cache.setdefault(board_id, {})[self.cursor_field] = self._get_starting_point(
stream_state=stream_state, stream_slice=stream_slice
)
return self._starting_point_cache[board_id][self.cursor_field]

def _get_starting_point(self, stream_state: Mapping[str, Any], stream_slice: Mapping[str, Any]) -> Optional[pendulum.DateTime]:
if stream_state:
board_id = str(stream_slice["board_id"])
stream_state_value = stream_state.get(board_id, {}).get(self.cursor_field)
if stream_state_value:
stream_state_value = pendulum.parse(stream_state_value) - self._lookback_window_minutes
return safe_max(stream_state_value, self._start_date)
return self._start_date

def transform(self, record: MutableMapping[str, Any], stream_slice: Mapping[str, Any], **kwargs) -> MutableMapping[str, Any]:
record["boardId"] = stream_slice["board_id"]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -381,7 +381,7 @@ def test_board_issues_stream(config, mock_board_response, board_issues_response)
authenticator = SourceJira().get_authenticator(config=config)
args = {"authenticator": authenticator, "domain": config["domain"], "projects": config.get("projects", [])}
stream = BoardIssues(**args)
records = [r for r in stream.read_records(sync_mode=SyncMode.incremental)]
records = list(read_full_refresh(stream))
assert len(records) == 1
assert len(responses.calls) == 4

Expand All @@ -391,10 +391,10 @@ def test_stream_updated_state(config):
args = {"authenticator": authenticator, "domain": config["domain"], "projects": config.get("projects", [])}
stream = BoardIssues(**args)

current_stream_state = {"updated": "09.11.2023"}
latest_record = {"updated": "10.11.2023"}
current_stream_state = {"22": {"updated": "2023-10-01T00:00:00Z"}}
latest_record = {"boardId": 22, "updated": "2023-09-01T00:00:00Z"}

assert {"updated": "10.11.2023"} == stream.get_updated_state(current_stream_state=current_stream_state, latest_record=latest_record)
assert {"22": {"updated": "2023-10-01T00:00:00Z"}} == stream.get_updated_state(current_stream_state=current_stream_state, latest_record=latest_record)


@responses.activate
Expand Down
27 changes: 27 additions & 0 deletions docs/integrations/sources/jira-migrations.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
# Jira Migration Guide

## Upgrading to 1.0.0

Note: this change is only breaking if you are using the `Boards Issues` stream in Incremental Sync mode.

This is a breaking change because Stream State for `Boards Issues` will be changed, so please follow the instructions below to migrate to version 1.0.0:

1. Select **Connections** in the main navbar.
1.1 Select the connection(s) affected by the update.
2. Select the **Replication** tab.
2.1 Select **Refresh source schema**.
```note
Any detected schema changes will be listed for your review.
```
2.2 Select **OK**.
3. Select **Save changes** at the bottom of the page.
3.1 Ensure the **Reset affected streams** option is checked.
```note
Depending on destination type you may not be prompted to reset your data
```
4. Select **Save connection**.
```note
This will reset the data in your destination and initiate a fresh sync.
```

For more information on resetting your data in Airbyte, see [this page](https://docs.airbyte.com/operator-guides/reset).
1 change: 1 addition & 0 deletions docs/integrations/sources/jira.md
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,7 @@ The Jira connector should not run into Jira API limitations under normal usage.

| Version | Date | Pull Request | Subject |
|:--------|:-----------|:-----------------------------------------------------------|:------------------------------------------------------------------------------------------------------------------------|
| 1.0.0 | 2024-01-01 | [33682](https://github.com/airbytehq/airbyte/pull/33682) | Save state for stream `Board Issues` per `board` |
| 0.14.1 | 2023-12-19 | [33625](https://github.com/airbytehq/airbyte/pull/33625) | Skip 404 error |
| 0.14.0 | 2023-12-15 | [33532](https://github.com/airbytehq/airbyte/pull/33532) | Add lookback window |
| 0.13.0 | 2023-12-12 | [33353](https://github.com/airbytehq/airbyte/pull/33353) | Fix check command to check access for all available streams |
Expand Down

0 comments on commit 1ac5029

Please sign in to comment.