Skip to content

Commit

Permalink
Source File: Support JsonL format (#2118)
Browse files Browse the repository at this point in the history
  • Loading branch information
sherifnada committed Feb 18, 2021
1 parent fd01651 commit d426781
Show file tree
Hide file tree
Showing 9 changed files with 6,537 additions and 23 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,6 @@
"sourceDefinitionId": "778daa7c-feaf-4db6-96f3-70fd645acc77",
"name": "File",
"dockerRepository": "airbyte/source-file",
"dockerImageTag": "0.1.9",
"dockerImageTag": "0.1.10",
"documentationUrl": "https://hub.docker.com/r/airbyte/source-file"
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
- sourceDefinitionId: 778daa7c-feaf-4db6-96f3-70fd645acc77
name: File
dockerRepository: airbyte/source-file
dockerImageTag: 0.1.9
dockerImageTag: 0.1.10
documentationUrl: https://hub.docker.com/r/airbyte/source-file
- sourceDefinitionId: fdc8b827-3257-4b33-83cc-106d234c34d4
name: Google Adwords
Expand Down
2 changes: 1 addition & 1 deletion airbyte-integrations/connectors/source-file/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -11,5 +11,5 @@ COPY $CODE_PATH ./$CODE_PATH
COPY setup.py ./
RUN pip install ".[main]"

LABEL io.airbyte.version=0.1.9
LABEL io.airbyte.version=0.1.10
LABEL io.airbyte.name=airbyte/source-file
Original file line number Diff line number Diff line change
Expand Up @@ -40,19 +40,21 @@ def check_read(config, expected_columns=10, expected_rows=42):


@pytest.mark.parametrize(
"file_format, extension, expected_columns, expected_rows",
"file_format, extension, expected_columns, expected_rows, filename",
[
("csv", "csv", 8, 5000),
("json", "json", 2, 1),
("excel", "xls", 8, 50),
("excel", "xlsx", 8, 50),
("feather", "feather", 9, 3),
("parquet", "parquet", 9, 3),
("csv", "csv", 8, 5000, "demo"),
("json", "json", 2, 1, "demo"),
("jsonl", "jsonl", 2, 10, "jsonl_nested"),
("jsonl", "jsonl", 2, 6492, "jsonl"),
("excel", "xls", 8, 50, "demo"),
("excel", "xlsx", 8, 50, "demo"),
("feather", "feather", 9, 3, "demo"),
("parquet", "parquet", 9, 3, "demo"),
],
)
def test_local_file_read(file_format, extension, expected_columns, expected_rows):
def test_local_file_read(file_format, extension, expected_columns, expected_rows, filename):
file_directory = SAMPLE_DIRECTORY.joinpath(file_format)
file_path = str(file_directory.joinpath(f"demo.{extension}"))
file_path = str(file_directory.joinpath(f"{filename}.{extension}"))
configs = {"dataset_name": "test", "format": file_format, "url": file_path, "provider": {"storage": "local"}}
check_read(configs, expected_columns, expected_rows)

Expand Down

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
{"article_id_a":"art14980","recommendations": {"listing":"art10316"}}
{"article_id_a":"art149801","recommendations": {"listing":"art1031126"}}
{"article_id_a":"art149802","recommendations": {"listing":"art103126"}}
{"article_id_a":"art149803","recommendations": {"listing":"art10316"}}
{"article_id_a":"art149804","recommendations": {"listing":"art10316"}}
{"article_id_a":"art149805","recommendations": {"listing":"art10316"}}
{"article_id_a":"art149806","recommendations": {"listing":"art10316"}}
{"article_id_a":"art1498078","recommendations": {"listing":"art10316"}}
{"article_id_a":"art149809","recommendations": {"listing":"art10316"}}
{"article_id_a":"art14980000","recommendations": {"listing":"art10316"}}
30 changes: 20 additions & 10 deletions airbyte-integrations/connectors/source-file/source_file/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -229,21 +229,31 @@ def stream_name(self) -> str:
return self._dataset_name
return f"file_{self._provider['storage']}.{self._reader_format}"

@staticmethod
def load_nested_json_schema(fp) -> dict:
def load_nested_json_schema(self, fp) -> dict:
# Use Genson Library to take JSON objects and generate schemas that describe them,
builder = SchemaBuilder()
builder.add_object(json.load(fp))
if self._reader_format == "jsonl":
for o in self.read():
builder.add_object(o)
else:
builder.add_object(json.load(fp))

result = builder.to_schema()
if "items" in result and "properties" in result["items"]:
result = result["items"]["properties"]
return result

@staticmethod
def load_nested_json(fp) -> list:
result = json.load(fp)
if not isinstance(result, list):
result = [result]
def load_nested_json(self, fp) -> list:
if self._reader_format == "jsonl":
result = []
line = fp.readline()
while line:
result.append(json.loads(line))
line = fp.readline()
else:
result = json.load(fp)
if not isinstance(result, list):
result = [result]
return result

def load_dataframes(self, fp, skip_data=False) -> List:
Expand Down Expand Up @@ -309,7 +319,7 @@ def binary_source(self):
def read(self, fields: Iterable = None) -> Iterable[dict]:
"""Read data from the stream"""
with self.reader.open(binary=self.binary_source) as fp:
if self._reader_format == "json":
if self._reader_format == "json" or self._reader_format == "jsonl":
yield from self.load_nested_json(fp)
else:
for df in self.load_dataframes(fp):
Expand All @@ -319,7 +329,7 @@ def read(self, fields: Iterable = None) -> Iterable[dict]:

def _stream_properties(self):
with self.reader.open(binary=self.binary_source) as fp:
if self._reader_format == "json":
if self._reader_format == "json" or self._reader_format == "jsonl":
return self.load_nested_json_schema(fp)

df_list = self.load_dataframes(fp, skip_data=False)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
},
"format": {
"type": "string",
"enum": ["csv", "json", "excel", "feather", "parquet"],
"enum": ["csv", "json", "jsonl", "excel", "feather", "parquet"],
"default": "csv",
"description": "File Format of the file to be replicated (Warning: some format may be experimental, please refer to docs)."
},
Expand Down

0 comments on commit d426781

Please sign in to comment.