Skip to content

Commit

Permalink
added stream_slices
Browse files Browse the repository at this point in the history
  • Loading branch information
darynaishchenko committed Jun 26, 2023
1 parent cc6ef09 commit 89e3baa
Showing 1 changed file with 15 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
from abc import ABC
from typing import Any, Iterable, List, Mapping, MutableMapping, Optional

import pendulum
import requests
from airbyte_cdk.models import SyncMode
from airbyte_cdk.sources.streams.http import HttpStream
Expand Down Expand Up @@ -84,6 +85,20 @@ def should_retry(self, response: requests.Response) -> bool:

return super().should_retry(response)

def stream_slices(
self, sync_mode: SyncMode, cursor_field: List[str] = None, stream_state: Mapping[str, Any] = None
) -> Iterable[Optional[Mapping[str, Any]]]:
start_date = (stream_state or {}).get(self.cursor_field, self._start_date) if self.cursor_field else self._start_date

now = pendulum.now()

start_date = pendulum.parse(start_date)

while start_date <= now:
end_date = start_date.add(months=self.period_in_months)
yield {"start_date": start_date.strftime("%Y-%m-%d %H:%M:%S"), "end_date": end_date.strftime("%Y-%m-%d %H:%M:%S")}
start_date = end_date


class IncrementalRechargeStream(RechargeStream, ABC):
cursor_field = "updated_at"
Expand Down

0 comments on commit 89e3baa

Please sign in to comment.