Skip to content

Commit

Permalink
dont keep records in memory
Browse files Browse the repository at this point in the history
  • Loading branch information
girarda committed May 10, 2024
1 parent 6f89fba commit 0aa79ec
Showing 1 changed file with 8 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -7,35 +7,34 @@
from io import StringIO

import requests
from typing import Iterable
from airbyte_cdk.sources.declarative.extractors.dpath_extractor import DpathExtractor
from airbyte_cdk.sources.declarative.types import Config, Record, StreamSlice, StreamState


@dataclass
class XJsonRecordExtractor(DpathExtractor):
def extract_records(self, response: requests.Response) -> list[Record]:
return [json.loads(record) for record in response.iter_lines()]
for record in response.iter_lines():
yield json.loads(record)


@dataclass
class ListUsersRecordExtractor(DpathExtractor):
def extract_records(self, response: requests.Response) -> list[Record]:
return [{"email": record.decode()} for record in response.iter_lines()]
def extract_records(self, response: requests.Response) -> Iterable[Record]:
for record in response.iter_lines():
yield {"email": record.decode()}


@dataclass
class EventsRecordExtractor(DpathExtractor):
common_fields = ("itblInternal", "_type", "createdAt", "email")

def extract_records(self, response: requests.Response) -> list[Record]:
def extract_records(self, response: requests.Response) -> Iterable[Record]:
jsonl_records = StringIO(response.text)
records = []
for record in jsonl_records:
record_dict = json.loads(record)
record_dict_common_fields = {}
for field in self.common_fields:
record_dict_common_fields[field] = record_dict.pop(field, None)

records.append({**record_dict_common_fields, "data": record_dict})

return records
yield {**record_dict_common_fields, "data": record_dict}

0 comments on commit 0aa79ec

Please sign in to comment.