Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions api-reference/ingest/destination-connector/kafka.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,11 @@ import SharedAPIKeyURL from '/snippets/general-shared-text/api-key-url.mdx';
Now call the Unstructured CLI or Python SDK. The source connector can be any of the ones supported. This example uses the local source connector:

import KafkaAPISh from '/snippets/destination_connectors/kafka.sh.mdx';
import KafkaAPIPyV2 from '/snippets/destination_connectors/kafka.v2.py.mdx';
import KafkaAPIPyV1 from '/snippets/destination_connectors/kafka.v1.py.mdx';

<CodeGroup>
<KafkaAPISh />
<KafkaAPIPyV2 />
<KafkaAPIPyV1 />
</CodeGroup>
2 changes: 2 additions & 0 deletions api-reference/ingest/source-connectors/kafka.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,11 @@ import SharedAPIKeyURL from '/snippets/general-shared-text/api-key-url.mdx';
Now call the Unstructured Ingest CLI or the Unstructured Ingest Python library. The destination connector can be any of the ones supported. This example uses the local destination connector:

import KafkaAPISh from '/snippets/source_connectors/kafka.sh.mdx';
import KafkaAPIPyV2 from '/snippets/source_connectors/kafka.v2.py.mdx';
import KafkaAPIPyV1 from '/snippets/source_connectors/kafka.v1.py.mdx';

<CodeGroup>
<KafkaAPISh />
<KafkaAPIPyV2 />
<KafkaAPIPyV1 />
</CodeGroup>
2 changes: 2 additions & 0 deletions open-source/ingest/destination-connectors/kafka.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,12 @@ Now call the Unstructured CLI or Python. The source connector can be any of the
This example sends files to Unstructured API services for processing by default. To process files locally instead, see the instructions at the end of this page.

import KafkaAPISh from '/snippets/destination_connectors/kafka.sh.mdx';
import KafkaAPIPyV2 from '/snippets/destination_connectors/kafka.v2.py.mdx';
import KafkaAPIPyV1 from '/snippets/destination_connectors/kafka.v1.py.mdx';

<CodeGroup>
<KafkaAPISh />
<KafkaAPIPyV2 />
<KafkaAPIPyV1 />
</CodeGroup>

Expand Down
2 changes: 2 additions & 0 deletions open-source/ingest/source-connectors/kafka.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,12 @@ Now call the Unstructured CLI or Python. The destination connector can be any of
This example sends data to Unstructured API services for processing by default. To process data locally instead, see the instructions at the end of this page.

import KafkaSh from '/snippets/source_connectors/kafka.sh.mdx';
import KafkaPyV2 from '/snippets/source_connectors/kafka.v2.py.mdx';
import KafkaPyV1 from '/snippets/source_connectors/kafka.v1.py.mdx';

<CodeGroup>
<KafkaSh />
<KafkaPyV2 />
<KafkaPyV1 />
</CodeGroup>

Expand Down
2 changes: 1 addition & 1 deletion snippets/destination_connectors/kafka.sh.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ unstructured-ingest \
--topic $KAFKA_TOPIC \
--kafka-api-key $KAFKA_API_KEY \
--secret $KAFKA_API_KEY \
--confluent false \
--confluent true \
--num-messages-to-consume 1 \
--timeout 1.0
```
57 changes: 57 additions & 0 deletions snippets/destination_connectors/kafka.v2.py.mdx
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
```python Python Ingest v2
import os

from unstructured_ingest.v2.pipeline.pipeline import Pipeline
from unstructured_ingest.v2.interfaces import ProcessorConfig

from unstructured_ingest.v2.processes.connectors.local import (
LocalIndexerConfig,
LocalDownloaderConfig,
LocalConnectionConfig
)

from unstructured_ingest.v2.processes.partitioner import PartitionerConfig
from unstructured_ingest.v2.processes.chunker import ChunkerConfig
from unstructured_ingest.v2.processes.embedder import EmbedderConfig

from unstructured_ingest.v2.processes.connectors.kafka.cloud import (
CloudKafkaConnectionConfig,
CloudKafkaAccessConfig,
CloudKafkaUploaderConfig
)

# Chunking and embedding are optional.

if __name__ == "__main__":
Pipeline.from_configs(
context=ProcessorConfig(),
indexer_config=LocalIndexerConfig(input_path=os.getenv("LOCAL_FILE_INPUT_DIR")),
downloader_config=LocalDownloaderConfig(),
source_connection_config=LocalConnectionConfig(),
partitioner_config=PartitionerConfig(
partition_by_api=True,
api_key=os.getenv("UNSTRUCTURED_API_KEY"),
partition_endpoint=os.getenv("UNSTRUCTURED_API_URL"),
additional_partition_args={
"split_pdf_page": True,
"split_pdf_allow_failed": True,
"split_pdf_concurrency_level": 15
}
),
chunker_config=ChunkerConfig(chunking_strategy="by_title"),
embedder_config=EmbedderConfig(embedding_provider="huggingface"),
destination_connection_config=CloudKafkaConnectionConfig(
access_config=CloudKafkaAccessConfig(
kafka_api_key=os.getenv("KAFKA_API_KEY"),
secret=os.getenv("KAFKA_SECRET")
),
bootstrap_server=os.getenv("KAFKA_BOOTSTRAP_SERVER"),
port=os.getenv("KAFKA_PORT")
),
uploader_config=CloudKafkaUploaderConfig(
batch_size=100,
topic=os.getenv("KAFKA_TOPIC"),
timeout=10
)
).run()
```
11 changes: 5 additions & 6 deletions snippets/source_connectors/kafka.sh.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -10,16 +10,15 @@ unstructured-ingest \
--topic $KAFKA_TOPIC \
--kafka-api-key $KAFKA_API_KEY \
--secret $KAFKA_API_KEY \
--confluent false \
--confluent true \
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What happens when this is set to false?

--batch-size 100 \
--num-messages-to-consume 1 \
--timeout 1.0
--timeout 1.0 \
--output-dir $LOCAL_FILE_OUTPUT_DIR \
--chunk-elements \
--chunking-strategy by_title \
--embedding-provider huggingface \
--num-processes 2 \
--verbose \
--partition-by-api \
--api-key $UNSTRUCTURED_API_KEY\
--api-key $UNSTRUCTURED_API_KEY \
--partition-endpoint $UNSTRUCTURED_API_URL \
--strategy hi_res \
--additional-partition-args="{\"split_pdf_page\":\"true\", \"split_pdf_allow_failed\":\"true\", \"split_pdf_concurrency_level\": 15}"
Expand Down
52 changes: 52 additions & 0 deletions snippets/source_connectors/kafka.v2.py.mdx
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
```python Python Ingest v2
import os

from unstructured_ingest.v2.pipeline.pipeline import Pipeline
from unstructured_ingest.v2.interfaces import ProcessorConfig

from unstructured_ingest.v2.processes.connectors.kafka.cloud import (
CloudKafkaIndexerConfig,
CloudKafkaDownloaderConfig,
CloudKafkaConnectionConfig,
CloudKafkaAccessConfig
)

from unstructured_ingest.v2.processes.partitioner import PartitionerConfig
from unstructured_ingest.v2.processes.chunker import ChunkerConfig
from unstructured_ingest.v2.processes.embedder import EmbedderConfig
from unstructured_ingest.v2.processes.connectors.local import LocalUploaderConfig

# Chunking and embedding are optional.

if __name__ == "__main__":
Pipeline.from_configs(
context=ProcessorConfig(),
indexer_config=CloudKafkaIndexerConfig(
topic=os.getenv("KAFKA_TOPIC"),
num_messages_to_consume=100,
timeout=1
),
downloader_config=CloudKafkaDownloaderConfig(download_dir=os.getenv("LOCAL_FILE_DOWNLOAD_DIR")),
source_connection_config=CloudKafkaConnectionConfig(
access_config=CloudKafkaAccessConfig(
kafka_api_key=os.getenv("KAFKA_API_KEY"),
secret=os.getenv("KAFKA_SECRET")
),
bootstrap_server=os.getenv("KAFKA_BOOTSTRAP_SERVER"),
port=os.getenv("KAFKA_PORT")
),
partitioner_config=PartitionerConfig(
partition_by_api=True,
api_key=os.getenv("UNSTRUCTURED_API_KEY"),
partition_endpoint=os.getenv("UNSTRUCTURED_API_URL"),
additional_partition_args={
"split_pdf_page": True,
"split_pdf_allow_failed": True,
"split_pdf_concurrency_level": 15
}
),
chunker_config=ChunkerConfig(chunking_strategy="by_title"),
embedder_config=EmbedderConfig(embedding_provider="huggingface"),
uploader_config=LocalUploaderConfig(output_dir=os.getenv("LOCAL_FILE_OUTPUT_DIR"))
).run()
```