Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

S3 and Azure Blob Storage: Update File CDK to support document file types #31904

Merged
merged 13 commits into from Oct 31, 2023
6 changes: 6 additions & 0 deletions airbyte-cdk/python/README.md
Expand Up @@ -150,6 +150,12 @@ HTTP requests to `localhost:8113/data` should now return the body defined in the
1. Open a PR
2. Once it is approved and **merged**, an Airbyte member must run the `Publish CDK Manually` workflow from master using `release-type=major|manor|patch` and setting the changelog message.

#### File-based CDK

A subset of the CDK is dedicated to sources that have the notion of files. It's located in `airbyte-cdk/sources/file_based`. When using this part of the CDK, install the CDK using the `file-based` extra: `pip install airbyte-cdk[file-based]`.

As the `unstructured` parser of the file based CDK requires some native dependencies to be installed, link the `file_based_build_customization.py` file in the connector as `build_customization.py`.

## Coming Soon

* Full OAuth 2.0 support \(including refresh token issuing flow via UI or CLI\)
Expand Down
63 changes: 63 additions & 0 deletions airbyte-cdk/python/file_based_build_customization.py
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Question about symlink and git:
Do you see the connector build_customization.py (symlink) in the git diff when you modify the root file_based_build_customization.py.

As of today we detect connector changes based on changes in their folder with a git diff command. It conditionally triggers our CI and connector tests. I think we would like to keep this behavior.

In other words:
When you change file_base_build_customization.py do you want to release a new connector version for all the connectors using it or do you want to decouple the hook changes from the connector release?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@alafanechere CDK changes are currently decoupled from connector changes - if a connector wants to use changed CDK stuff, the CDK needs to be published and a separate PR bumping the version needs to be opened (like what I'm doing here). It seems correct to me to extend this to the build customization script as well.

So no, this shouldn't automatically trigger a connector release. Seems like this makes the symlink approach less attractive.

@@ -0,0 +1,63 @@
#
# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
#
from __future__ import annotations

import textwrap
from typing import TYPE_CHECKING

if TYPE_CHECKING:
from dagger import Container


def setup_nltk(connector_container: Container) -> Container:
"""
Seeds the connector with nltk data at build time. This is because the nltk data
is large and takes a long time to download. It runs a python script that downloads
the data following connector installation.
"""

nltk_python_script = textwrap.dedent(
"""
import nltk
nltk.download('punkt')
nltk.download('averaged_perceptron_tagger')
"""
)
connector_container = (
connector_container.with_new_file("/tmp/nltk_python_script.py", nltk_python_script)
.with_exec(["python", "/tmp/nltk_python_script.py"], skip_entrypoint=True)
.with_exec(["rm", "/tmp/nltk_python_script.py"], skip_entrypoint=True)
)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you comment on which system path the nltk data will be written?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done


return connector_container


def install_tesseract_and_poppler(connector_container: Container) -> Container:
"""
Installs Tesseract-OCR and Poppler-utils in the container. These tools are necessary for
OCR (Optical Character Recognition) processes and working with PDFs, respectively.
"""

connector_container = connector_container.with_exec(
["sh", "-c", "apt-get update && apt-get install -y tesseract-ocr poppler-utils"], skip_entrypoint=True
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we install specific versions of these utils to maximize build reproducibility?

)

return connector_container


async def post_connector_install(connector_container: Container) -> Container:
"""
Handles post-installation setup for the connector by setting up nltk and
installing necessary system dependencies such as Tesseract-OCR and Poppler-utils.

These steps are necessary if the unstructured parser from the file based CDK is exposed in the connector.
"""

# Setup nltk in the container
connector_container = setup_nltk(connector_container)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Something interesting to note here, which is playing against the bundling of the nltk data in the base image, is that:

  • this execution would depend of the installation of nltk in the base image
  • the result of this execution (the file download) is dynamic. We don't control what's being downloaded exactly

This is why it makes sense for it to be a post_connector_install thing.
If we downloaded this data in the base image we'd have to make sure the nltk version we download this data with is the same as the one declared in the CDK...

I'm also bit concerned by the fact that this post_connector_install can't guarantee reproducible results as we rely on nltk to decide which data is downloaded.
@flash1293 could we host snapshots of this data on one of our public bucket and make this hook download this data and mount it to the container?

What I mean by "reproducible build" is that we always get the same image when running the build command on the same commit. If the data downloading function of nltk is dynamic we don't have reproducibility.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Inlined the nltk data index in the customization script so it points to a raw github url of that specific commit - I have a pretty high confidence this is always reproduceable (short of the maintainers deleting the repo and then we have problems anyway)


# Install Tesseract and Poppler
connector_container = install_tesseract_and_poppler(connector_container)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can this be called in a pre_install_hook?
These are system dependency installation that do not depend on the python package installation if I'm not mistaken. This will speed up the build as the apt-get install layers will be cached. In your current implementation we'll redownload and install these tools on any code change.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Moved the install to pre_connector_install and pinned the versions


return connector_container
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This build_customization.py is the same as the S3 one right?

I'd like to think a bit with my team what could be the best option to avoid code duplication:

A. Install these dependencies in our python-connector-base image. Advantage: simple incremental change. Drawback: all our python connector image will grow in size.
B. Put this build_customization.py file in the file cdk and create symlinks in s3 and azure connector to this file. Advantage: code reuse without base image change. Drawback: its another edge case to remind of for our team.
C. Create a specific base image for file/LLM connectors based on python-connector-base. Advantage: a new clean and centralized artifact. Drawback: a change in this package needs to happen to build multiple images for the same connector language.

I'm more inclined toward C. But would recommend B to not block you right now.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree with all of this (B for now, C in a follow-up). We could re-build the base image as part of the CDK publish action (still would require manually bumping the affected connectors but I think this would still be helpful)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

After thinking a bit more about it I'm not sure that C is our best long-term bet. I believe it adds up complexity and we can end up in a similar situation as we are today with strict-encrypt connectors. Managing variants is feasible but likely to be cumbersome. The base image will become a dependency of the file base image and compatibility, version pinning problems etc. will come with it.
As of today, I think option A - installing your new system dependencies in the base image - is the one with the lighter long-term maintenance burden, but as I said here I find it risky to download the nltk data through a python script execution as we don't have any reproducibility guarantee and will have to maintain nltk version equivalence between the base image and the cdk...

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure about the best solution, but the symlink thing seems wrong as well. For now I duplicated the file, but happy to go for a better solution

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since the build_customization.py is just a py file that can import things, would it be possible for all of the helper methods to live in the file based CDK, imported from build_customization.py?

Then the pre_connector_install and post_connector_install might be duplicated, but small and declarative: "before installing, install_tesseract_and_poppler`

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So here's what we realized could be the best approach to take with @flash1293 :

  • @flash1293 's changes mean the CDK now has system dependencies. Which is definitely a new thing.
  • These system dependencies can change along the CDK version
  • We should bundle the CDK system dependencies in the python connector base image
  • As system dependencies can change on CDK version change we now have a coupling between CDK version used by the connector and the base image they can use.
  • We should hardcode somewhere a mapping between CDK version and Base image version compatibility
  • On connector build we should compare their CDK version to the base image version and fail the build if they're not compatible according to our mapping
  • Connector developer will have to change the baseImage metadata according to this failure.

Copy link
Contributor Author

@flash1293 flash1293 Oct 27, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think @erohmensing suggestion is interesting - this means we would need to install the CDK in the version specified in the setup.py of the connector in the python environment that is building the dagger pipeline - if that's possible it might work, although it feels a little crazy.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since the build_customization.py is just a py file that can import things, would it be possible for all of the helper methods to live in the file based CDK, imported from build_customization.py?

The build_customization.py is imported at runtime in the build process here. @erohmensing it would mean that airbyte-ci would depend on the CDK if we'd import helpers from there . As these helpers can change according to the CDK version it means we can be in a situation where we build a connector image depending on an old version of the CDK with helpers from the latest version.

Expand Up @@ -268,6 +268,19 @@
"type": "boolean"
}
}
},
{
"title": "Document File Type Format (Experimental)",
"type": "object",
"properties": {
"filetype": {
"title": "Filetype",
"default": "unstructured",
"const": "unstructured",
"type": "string"
}
},
"description": "Extract text from document formats (.pdf, .docx, .md) and emit as one record per file."
}
]
},
Expand Down
Expand Up @@ -7,7 +7,7 @@ data:
connectorSubtype: file
connectorType: source
definitionId: fdaaba68-4875-4ed9-8fcd-4ae1e0a25093
dockerImageTag: 0.2.1
dockerImageTag: 0.2.2
dockerRepository: airbyte/source-azure-blob-storage
documentationUrl: https://docs.airbyte.com/integrations/sources/azure-blob-storage
githubIssueLabel: source-azure-blob-storage
Expand Down
Expand Up @@ -5,7 +5,20 @@

from setuptools import find_packages, setup

MAIN_REQUIREMENTS = ["airbyte-cdk>=0.51.17", "smart_open[azure]", "pytz", "fastavro==1.4.11", "pyarrow"]
MAIN_REQUIREMENTS = [
"airbyte-cdk>=0.52.5",
"smart_open[azure]",
"pytz",
"fastavro==1.4.11",
"pyarrow",
"unstructured==0.10.19",
Copy link
Contributor Author

@flash1293 flash1293 Oct 27, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This stuff is also included in airbyte-cdk[file-based], but then there would be a version mismatch with fastavro 1.4.11 which is listed explicitly here (it's fastavro~=1.8.0 in the file-based extra). Is this a leftover or is there a strong reason to not rely on the CDK "standard" dependencies for file based sources?

Same question applies to S3 (which makes me think that it wasn't a conscious choice).

Do you know where this came from @clnoll ?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I read this comment after I submitted my first review.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

According to your PR title I would assume that these two connectors would depend on the file based CDK and the unstructured lib would be bundled in it. It's be great if connectors and CDK could agree on the fastavro version indeed.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not aware of any reason not to use the file-based CDKs dependencies.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Switched it to use the extra

"pdf2image==1.16.3",
"pdfminer.six==20221105",
"unstructured[docx]==0.10.19",
"unstructured.pytesseract>=0.3.12",
"pytesseract==0.3.10",
"markdown",
]

TEST_REQUIREMENTS = ["requests-mock~=1.9.3", "pytest-mock~=3.6.1", "pytest~=6.2"]

Expand Down

This file was deleted.

Expand Up @@ -21,7 +21,7 @@
"order": 10,
"type": "array",
"items": {
"title": "S3FileBasedStreamConfig",
"title": "BasedStreamConfig",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
"title": "BasedStreamConfig",
"title": "FileBasedStreamConfig",

"type": "object",
"properties": {
"name": {
Expand Down Expand Up @@ -270,7 +270,7 @@
}
},
{
"title": "Markdown/PDF/Docx Format (Experimental)",
"title": "Document File Type Format (Experimental)",
"type": "object",
"properties": {
"filetype": {
Expand All @@ -280,7 +280,7 @@
"type": "string"
}
},
"description": "Extract text from document formats and emit as one record per file."
"description": "Extract text from document formats (.pdf, .docx, .md) and emit as one record per file."
}
]
},
Expand Down
7 changes: 1 addition & 6 deletions airbyte-integrations/connectors/source-s3/main.py
Expand Up @@ -10,18 +10,13 @@

from airbyte_cdk.entrypoint import AirbyteEntrypoint, launch
from airbyte_cdk.models import AirbyteErrorTraceMessage, AirbyteMessage, AirbyteTraceMessage, TraceType, Type
from airbyte_cdk.sources.file_based.file_types import default_parsers
from source_s3.v4 import Config, Cursor, SourceS3, SourceS3StreamReader
from source_s3.v4.config import UnstructuredFormat
from source_s3.v4.unstructured_parser import UnstructuredParser

parsers = {**default_parsers, UnstructuredFormat: UnstructuredParser()}


def get_source(args: List[str]):
catalog_path = AirbyteEntrypoint.extract_catalog(args)
try:
return SourceS3(SourceS3StreamReader(), Config, catalog_path, cursor_cls=Cursor, parsers=parsers)
return SourceS3(SourceS3StreamReader(), Config, catalog_path, cursor_cls=Cursor)
except Exception:
print(
AirbyteMessage(
Expand Down
2 changes: 1 addition & 1 deletion airbyte-integrations/connectors/source-s3/metadata.yaml
Expand Up @@ -10,7 +10,7 @@ data:
connectorSubtype: file
connectorType: source
definitionId: 69589781-7828-43c5-9f63-8925b1c1ccc2
dockerImageTag: 4.1.3
dockerImageTag: 4.1.4
dockerRepository: airbyte/source-s3
documentationUrl: https://docs.airbyte.com/integrations/sources/s3
githubIssueLabel: source-s3
Expand Down
2 changes: 1 addition & 1 deletion airbyte-integrations/connectors/source-s3/setup.py
Expand Up @@ -6,7 +6,7 @@
from setuptools import find_packages, setup

MAIN_REQUIREMENTS = [
"airbyte-cdk>=0.52.0",
"airbyte-cdk>=0.52.5",
"pyarrow==12.0.1",
"smart-open[s3]==5.1.0",
"wcmatch==8.4",
Expand Down
33 changes: 2 additions & 31 deletions airbyte-integrations/connectors/source-s3/source_s3/v4/config.py
Expand Up @@ -2,33 +2,10 @@
# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
#

from typing import List, Optional, Union
from typing import Optional

from airbyte_cdk.sources.file_based.config.abstract_file_based_spec import AbstractFileBasedSpec
from airbyte_cdk.sources.file_based.config.avro_format import AvroFormat
from airbyte_cdk.sources.file_based.config.csv_format import CsvFormat
from airbyte_cdk.sources.file_based.config.file_based_stream_config import FileBasedStreamConfig
from airbyte_cdk.sources.file_based.config.jsonl_format import JsonlFormat
from airbyte_cdk.sources.file_based.config.parquet_format import ParquetFormat
from pydantic import AnyUrl, BaseModel, Field, ValidationError, root_validator


class UnstructuredFormat(BaseModel):
class Config:
title = "Markdown/PDF/Docx Format (Experimental)"
schema_extra = {"description": "Extract text from document formats and emit as one record per file."}

filetype: str = Field(
"unstructured",
const=True,
)


class S3FileBasedStreamConfig(FileBasedStreamConfig):
format: Union[AvroFormat, CsvFormat, JsonlFormat, ParquetFormat, UnstructuredFormat] = Field(
title="Format",
description="The configuration options that are used to alter how to read incoming files that deviate from the standard formatting.",
)
from pydantic import AnyUrl, Field, ValidationError, root_validator


class Config(AbstractFileBasedSpec):
Expand Down Expand Up @@ -65,12 +42,6 @@ def documentation_url(cls) -> AnyUrl:
"", title="Endpoint", description="Endpoint to an S3 compatible service. Leave empty to use AWS.", order=4
)

streams: List[S3FileBasedStreamConfig] = Field(
title="The list of streams to sync",
description='Each instance of this configuration defines a <a href="https://docs.airbyte.com/cloud/core-concepts#stream">stream</a>. Use this to define which files belong in the stream, their format, and how they should be parsed and validated. When sending data to warehouse destination such as Snowflake or BigQuery, each stream is a separate table.',
order=10,
)

@root_validator
def validate_optional_args(cls, values):
aws_access_key_id = values.get("aws_access_key_id")
Expand Down