Skip to content

Commit

Permalink
Merge remote-tracking branch 'upstream/master' into flash1293/pinecon…
Browse files Browse the repository at this point in the history
…e-fix
  • Loading branch information
Joe Reuter committed Sep 26, 2023
2 parents 3ff4322 + 4cc25bc commit 0f1ab49
Show file tree
Hide file tree
Showing 117 changed files with 1,276 additions and 5,288 deletions.
2 changes: 1 addition & 1 deletion airbyte-cdk/python/.bumpversion.cfg
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
[bumpversion]
current_version = 0.51.19
current_version = 0.51.21
commit = False

[bumpversion:file:setup.py]
Expand Down
6 changes: 6 additions & 0 deletions airbyte-cdk/python/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,11 @@
# Changelog

## 0.51.21
Change Error message if Stream is not found

## 0.51.20
Vector DB CDK: Add text splitting options to document processing

## 0.51.19
Ensuring invalid user-provided urls does not generate sentry issues

Expand Down
4 changes: 2 additions & 2 deletions airbyte-cdk/python/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ RUN apk --no-cache upgrade \
&& apk --no-cache add tzdata build-base

# install airbyte-cdk
RUN pip install --prefix=/install airbyte-cdk==0.51.19
RUN pip install --prefix=/install airbyte-cdk==0.51.21

# build a clean environment
FROM base
Expand All @@ -32,5 +32,5 @@ ENV AIRBYTE_ENTRYPOINT "python /airbyte/integration_code/main.py"
ENTRYPOINT ["python", "/airbyte/integration_code/main.py"]

# needs to be the same as CDK
LABEL io.airbyte.version=0.51.19
LABEL io.airbyte.version=0.51.21
LABEL io.airbyte.name=airbyte/source-declarative-manifest
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,79 @@
# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
#

from typing import List, Literal, Optional
from typing import List, Literal, Optional, Union

from pydantic import BaseModel, Field


class SeparatorSplitterConfigModel(BaseModel):
mode: Literal["separator"] = Field("separator", const=True)
separators: List[str] = Field(
default=['"\\n\\n"', '"\\n"', '" "', '""'],
title="Separators",
description='List of separator strings to split text fields by. The separator itself needs to be wrapped in double quotes, e.g. to split by the dot character, use ".". To split by a newline, use "\\n".',
)
keep_separator: bool = Field(default=False, title="Keep separator", description="Whether to keep the separator in the resulting chunks")

class Config:
title = "By Separator"
schema_extra = {
"description": "Split the text by the list of separators until the chunk size is reached, using the earlier mentioned separators where possible. This is useful for splitting text fields by paragraphs, sentences, words, etc."
}


class MarkdownHeaderSplitterConfigModel(BaseModel):
mode: Literal["markdown"] = Field("markdown", const=True)
split_level: int = Field(
default=1,
title="Split level",
description="Level of markdown headers to split text fields by. Headings down to the specified level will be used as split points",
le=6,
ge=1,
)

class Config:
title = "By Markdown header"
schema_extra = {
"description": "Split the text by Markdown headers down to the specified header level. If the chunk size fits multiple sections, they will be combined into a single chunk."
}


class CodeSplitterConfigModel(BaseModel):
mode: Literal["code"] = Field("code", const=True)
language: str = Field(
title="Language",
description="Split code in suitable places based on the programming language",
enum=[
"cpp",
"go",
"java",
"js",
"php",
"proto",
"python",
"rst",
"ruby",
"rust",
"scala",
"swift",
"markdown",
"latex",
"html",
"sol",
],
)

class Config:
title = "By Programming Language"
schema_extra = {
"description": "Split the text by suitable delimiters based on the programming language. This is useful for splitting code into chunks."
}


TextSplitterConfigModel = Union[SeparatorSplitterConfigModel, MarkdownHeaderSplitterConfigModel, CodeSplitterConfigModel]


class ProcessingConfigModel(BaseModel):
chunk_size: int = Field(
...,
Expand All @@ -33,6 +101,13 @@ class ProcessingConfigModel(BaseModel):
always_show=True,
examples=["age", "user", "user.name"],
)
text_splitter: TextSplitterConfigModel = Field(
default=None,
title="Text splitter",
discriminator="mode",
type="object",
description="Split text fields into chunks based on the specified method.",
)

class Config:
schema_extra = {"group": "processing"}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,13 @@
# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
#

import json
import logging
from dataclasses import dataclass
from typing import Any, Dict, List, Mapping, Optional, Tuple, Union

import dpath.util
from airbyte_cdk.destinations.vector_db_based.config import ProcessingConfigModel
from airbyte_cdk.destinations.vector_db_based.config import ProcessingConfigModel, SeparatorSplitterConfigModel, TextSplitterConfigModel
from airbyte_cdk.models import AirbyteRecordMessage, AirbyteStream, ConfiguredAirbyteCatalog, ConfiguredAirbyteStream, DestinationSyncMode
from airbyte_cdk.utils.traced_exception import AirbyteTracedException, FailureType
from langchain.document_loaders.base import Document
Expand All @@ -26,6 +27,9 @@ class Chunk:
embedding: Optional[List[float]] = None


headers_to_split_on = ["(?:^|\n)# ", "(?:^|\n)## ", "(?:^|\n)### ", "(?:^|\n)#### ", "(?:^|\n)##### ", "(?:^|\n)###### "]


class DocumentProcessor:
"""
DocumentProcessor is a helper class that generates documents from Airbyte records.
Expand All @@ -39,16 +43,52 @@ class DocumentProcessor:
except if you want to implement a custom writer.
The config parameters specified by the ProcessingConfigModel has to be made part of the connector spec to allow the user to configure the document processor.
Calling DocumentProcessor.check_config(config) will validate the config and return an error message if the config is invalid.
"""

streams: Mapping[str, ConfiguredAirbyteStream]

@staticmethod
def check_config(config: ProcessingConfigModel) -> Optional[str]:
if config.text_splitter is not None and config.text_splitter.mode == "separator":
for s in config.text_splitter.separators:
try:
separator = json.loads(s)
if not isinstance(separator, str):
return f"Invalid separator: {s}. Separator needs to be a valid JSON string using double quotes."
except json.decoder.JSONDecodeError:
return f"Invalid separator: {s}. Separator needs to be a valid JSON string using double quotes."
return None

def _get_text_splitter(self, chunk_size: int, chunk_overlap: int, splitter_config: Optional[TextSplitterConfigModel]):
if splitter_config is None:
splitter_config = SeparatorSplitterConfigModel(mode="separator")
if splitter_config.mode == "separator":
return RecursiveCharacterTextSplitter.from_tiktoken_encoder(
chunk_size=chunk_size,
chunk_overlap=chunk_overlap,
separators=[json.loads(s) for s in splitter_config.separators],
keep_separator=splitter_config.keep_separator,
)
if splitter_config.mode == "markdown":
return RecursiveCharacterTextSplitter.from_tiktoken_encoder(
chunk_size=chunk_size,
chunk_overlap=chunk_overlap,
separators=headers_to_split_on[: splitter_config.split_level],
is_separator_regex=True,
keep_separator=True,
)
if splitter_config.mode == "code":
return RecursiveCharacterTextSplitter.from_tiktoken_encoder(
chunk_size=chunk_size,
chunk_overlap=chunk_overlap,
separators=RecursiveCharacterTextSplitter.get_separators_for_language(splitter_config.language),
)

def __init__(self, config: ProcessingConfigModel, catalog: ConfiguredAirbyteCatalog):
self.streams = {self._stream_identifier(stream.stream): stream for stream in catalog.streams}

self.splitter = RecursiveCharacterTextSplitter.from_tiktoken_encoder(
chunk_size=config.chunk_size, chunk_overlap=config.chunk_overlap
)
self.splitter = self._get_text_splitter(config.chunk_size, config.chunk_overlap, config.text_splitter)
self.text_fields = config.text_fields
self.metadata_fields = config.metadata_fields
self.logger = logging.getLogger("airbyte.document_processor")
Expand Down
10 changes: 8 additions & 2 deletions airbyte-cdk/python/airbyte_cdk/sources/abstract_source.py
Original file line number Diff line number Diff line change
Expand Up @@ -103,9 +103,11 @@ def read(
for configured_stream in catalog.streams:
stream_instance = stream_instances.get(configured_stream.stream.name)
if not stream_instance:
if not self.raise_exception_on_missing_stream:
continue
raise KeyError(
f"The requested stream {configured_stream.stream.name} was not found in the source."
f" Available streams: {stream_instances.keys()}"
f"The stream {configured_stream.stream.name} no longer exists in the configuration. "
f"Refresh the schema in replication settings and remove this stream from future sync attempts."
)

try:
Expand Down Expand Up @@ -144,6 +146,10 @@ def read(

logger.info(f"Finished syncing {self.name}")

@property
def raise_exception_on_missing_stream(self) -> bool:
return True

@property
def per_stream_state_enabled(self) -> bool:
return True
Expand Down
2 changes: 1 addition & 1 deletion airbyte-cdk/python/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
name="airbyte-cdk",
# The version of the airbyte-cdk package is used at runtime to validate manifests. That validation must be
# updated if our semver format changes such as using release candidate versions.
version="0.51.19",
version="0.51.21",
description="A framework for writing Airbyte Connectors.",
long_description=README,
long_description_content_type="text/markdown",
Expand Down

0 comments on commit 0f1ab49

Please sign in to comment.