Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

S3 and Azure Blob Storage: Update File CDK to support document file types #31904

Merged
merged 13 commits into from Oct 31, 2023
Expand Up @@ -53,6 +53,11 @@ acceptance_tests:
expect_records:
path: integration_tests/expected_records/jsonl_newlines.jsonl
exact_order: true
- config_path: secrets/unstructured_config.json
expect_records:
path: integration_tests/expected_records/unstructured.jsonl
exact_order: true
timeout_seconds: 1800
connection:
tests:
- config_path: secrets/config.json
Expand Down
@@ -0,0 +1,2 @@
{"stream": "airbyte-source-azure-blob-storage-test", "data": {"content": "# Heading\n\nThis is the content which is not just a single word", "document_key": "Testdoc.pdf", "_ab_source_file_last_modified": "2023-10-30T11:38:48.000000Z", "_ab_source_file_url": "Testdoc.pdf"}, "emitted_at": 1698666216334}
{"stream": "airbyte-source-azure-blob-storage-test", "data": {"content": "This is a test", "document_key": "Testdoc_OCR.pdf", "_ab_source_file_last_modified": "2023-10-30T11:38:48.000000Z", "_ab_source_file_url": "Testdoc_OCR.pdf"}, "emitted_at": 1698666218048}
Expand Up @@ -268,6 +268,19 @@
"type": "boolean"
}
}
},
{
"title": "Document File Type Format (Experimental)",
"type": "object",
"properties": {
"filetype": {
"title": "Filetype",
"default": "unstructured",
"const": "unstructured",
"type": "string"
}
},
"description": "Extract text from document formats (.pdf, .docx, .md, .pptx) and emit as one record per file."
}
]
},
Expand Down
Expand Up @@ -3,11 +3,11 @@ data:
ql: 100
sl: 100
connectorBuildOptions:
baseImage: docker.io/airbyte/python-connector-base:1.1.0@sha256:bd98f6505c6764b1b5f99d3aedc23dfc9e9af631a62533f60eb32b1d3dbab20c
baseImage: docker.io/airbyte/python-connector-base:1.2.0@sha256:c22a9d97464b69d6ef01898edf3f8612dc11614f05a84984451dde195f337db9
connectorSubtype: file
connectorType: source
definitionId: fdaaba68-4875-4ed9-8fcd-4ae1e0a25093
dockerImageTag: 0.2.1
dockerImageTag: 0.2.2
dockerRepository: airbyte/source-azure-blob-storage
documentationUrl: https://docs.airbyte.com/integrations/sources/azure-blob-storage
githubIssueLabel: source-azure-blob-storage
Expand Down
Expand Up @@ -5,7 +5,11 @@

from setuptools import find_packages, setup

MAIN_REQUIREMENTS = ["airbyte-cdk>=0.51.17", "smart_open[azure]", "pytz", "fastavro==1.4.11", "pyarrow"]
MAIN_REQUIREMENTS = [
"airbyte-cdk[file-based]>=0.52.7",
"smart_open[azure]",
"pytz",
]

TEST_REQUIREMENTS = ["requests-mock~=1.9.3", "pytest-mock~=3.6.1", "pytest~=6.2"]

Expand Down

This file was deleted.

Expand Up @@ -21,7 +21,7 @@
"order": 10,
"type": "array",
"items": {
"title": "S3FileBasedStreamConfig",
"title": "FileBasedStreamConfig",
"type": "object",
"properties": {
"name": {
Expand Down Expand Up @@ -270,7 +270,7 @@
}
},
{
"title": "Markdown/PDF/Docx Format (Experimental)",
"title": "Document File Type Format (Experimental)",
"type": "object",
"properties": {
"filetype": {
Expand All @@ -280,7 +280,7 @@
"type": "string"
}
},
"description": "Extract text from document formats and emit as one record per file."
"description": "Extract text from document formats (.pdf, .docx, .md, .pptx) and emit as one record per file."
}
]
},
Expand Down
7 changes: 1 addition & 6 deletions airbyte-integrations/connectors/source-s3/main.py
Expand Up @@ -10,18 +10,13 @@

from airbyte_cdk.entrypoint import AirbyteEntrypoint, launch
from airbyte_cdk.models import AirbyteErrorTraceMessage, AirbyteMessage, AirbyteTraceMessage, TraceType, Type
from airbyte_cdk.sources.file_based.file_types import default_parsers
from source_s3.v4 import Config, Cursor, SourceS3, SourceS3StreamReader
from source_s3.v4.config import UnstructuredFormat
from source_s3.v4.unstructured_parser import UnstructuredParser

parsers = {**default_parsers, UnstructuredFormat: UnstructuredParser()}


def get_source(args: List[str]):
catalog_path = AirbyteEntrypoint.extract_catalog(args)
try:
return SourceS3(SourceS3StreamReader(), Config, catalog_path, cursor_cls=Cursor, parsers=parsers)
return SourceS3(SourceS3StreamReader(), Config, catalog_path, cursor_cls=Cursor)
except Exception:
print(
AirbyteMessage(
Expand Down
4 changes: 2 additions & 2 deletions airbyte-integrations/connectors/source-s3/metadata.yaml
Expand Up @@ -6,11 +6,11 @@ data:
hosts:
- "*.s3.amazonaws.com"
connectorBuildOptions:
baseImage: docker.io/airbyte/python-connector-base:1.1.0@sha256:bd98f6505c6764b1b5f99d3aedc23dfc9e9af631a62533f60eb32b1d3dbab20c
baseImage: docker.io/airbyte/python-connector-base:1.2.0@sha256:c22a9d97464b69d6ef01898edf3f8612dc11614f05a84984451dde195f337db9
connectorSubtype: file
connectorType: source
definitionId: 69589781-7828-43c5-9f63-8925b1c1ccc2
dockerImageTag: 4.1.3
dockerImageTag: 4.1.4
dockerRepository: airbyte/source-s3
documentationUrl: https://docs.airbyte.com/integrations/sources/s3
githubIssueLabel: source-s3
Expand Down
12 changes: 1 addition & 11 deletions airbyte-integrations/connectors/source-s3/setup.py
Expand Up @@ -6,21 +6,12 @@
from setuptools import find_packages, setup

MAIN_REQUIREMENTS = [
"airbyte-cdk>=0.52.0",
"pyarrow==12.0.1",
"airbyte-cdk[file-based]>=0.52.7",
"smart-open[s3]==5.1.0",
"wcmatch==8.4",
"dill==0.3.4",
"pytz",
"fastavro==1.4.11",
"python-snappy==0.6.1",
"unstructured==0.10.19",
"pdf2image==1.16.3",
"pdfminer.six==20221105",
"unstructured[docx]==0.10.19",
"unstructured.pytesseract>=0.3.12",
"pytesseract==0.3.10",
"markdown",
]

TEST_REQUIREMENTS = [
Expand All @@ -32,7 +23,6 @@
"pytest-order",
"netifaces~=0.11.0",
"docker",
"avro==1.11.0",
]

setup(
Expand Down
33 changes: 2 additions & 31 deletions airbyte-integrations/connectors/source-s3/source_s3/v4/config.py
Expand Up @@ -2,33 +2,10 @@
# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
#

from typing import List, Optional, Union
from typing import Optional

from airbyte_cdk.sources.file_based.config.abstract_file_based_spec import AbstractFileBasedSpec
from airbyte_cdk.sources.file_based.config.avro_format import AvroFormat
from airbyte_cdk.sources.file_based.config.csv_format import CsvFormat
from airbyte_cdk.sources.file_based.config.file_based_stream_config import FileBasedStreamConfig
from airbyte_cdk.sources.file_based.config.jsonl_format import JsonlFormat
from airbyte_cdk.sources.file_based.config.parquet_format import ParquetFormat
from pydantic import AnyUrl, BaseModel, Field, ValidationError, root_validator


class UnstructuredFormat(BaseModel):
class Config:
title = "Markdown/PDF/Docx Format (Experimental)"
schema_extra = {"description": "Extract text from document formats and emit as one record per file."}

filetype: str = Field(
"unstructured",
const=True,
)


class S3FileBasedStreamConfig(FileBasedStreamConfig):
format: Union[AvroFormat, CsvFormat, JsonlFormat, ParquetFormat, UnstructuredFormat] = Field(
title="Format",
description="The configuration options that are used to alter how to read incoming files that deviate from the standard formatting.",
)
from pydantic import AnyUrl, Field, ValidationError, root_validator


class Config(AbstractFileBasedSpec):
Expand Down Expand Up @@ -65,12 +42,6 @@ def documentation_url(cls) -> AnyUrl:
"", title="Endpoint", description="Endpoint to an S3 compatible service. Leave empty to use AWS.", order=4
)

streams: List[S3FileBasedStreamConfig] = Field(
title="The list of streams to sync",
description='Each instance of this configuration defines a <a href="https://docs.airbyte.com/cloud/core-concepts#stream">stream</a>. Use this to define which files belong in the stream, their format, and how they should be parsed and validated. When sending data to warehouse destination such as Snowflake or BigQuery, each stream is a separate table.',
order=10,
)

@root_validator
def validate_optional_args(cls, values):
aws_access_key_id = values.get("aws_access_key_id")
Expand Down

This file was deleted.