Skip to content

Commit

Permalink
removed stream slices
Browse files Browse the repository at this point in the history
  • Loading branch information
darynaishchenko committed Jun 23, 2023
1 parent fdca322 commit cc6ef09
Showing 1 changed file with 1 addition and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,9 @@
# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
#


from abc import ABC
from typing import Any, Iterable, List, Mapping, MutableMapping, Optional

import pendulum
import requests
from airbyte_cdk.models import SyncMode
from airbyte_cdk.sources.streams.http import HttpStream
Expand Down Expand Up @@ -61,7 +59,7 @@ def request_params(
if next_page_token:
params.update(next_page_token)
else:
params.update({"updated_at_min": (stream_slice or {}).get("start_date", self._start_date)})
params.update({"updated_at_min": (stream_state or {}).get("updated_at", self._start_date)})

return params

Expand All @@ -86,18 +84,6 @@ def should_retry(self, response: requests.Response) -> bool:

return super().should_retry(response)

def stream_slices(
self, sync_mode: SyncMode, cursor_field: List[str] = None, stream_state: Mapping[str, Any] = None
) -> Iterable[Optional[Mapping[str, Any]]]:
start_date = (stream_state or {}).get(self.cursor_field, self._start_date) if self.cursor_field else self._start_date

now = pendulum.now()

start_date = pendulum.parse(start_date)

while start_date <= now:
yield {"start_date": start_date.strftime("%Y-%m-%d %H:%M:%S")}


class IncrementalRechargeStream(RechargeStream, ABC):
cursor_field = "updated_at"
Expand Down

0 comments on commit cc6ef09

Please sign in to comment.