diff --git a/api-reference/ingest/destination-connector/kafka.mdx b/api-reference/ingest/destination-connector/kafka.mdx
index 63f0b8eb..d8795dd3 100644
--- a/api-reference/ingest/destination-connector/kafka.mdx
+++ b/api-reference/ingest/destination-connector/kafka.mdx
@@ -15,9 +15,11 @@ import SharedAPIKeyURL from '/snippets/general-shared-text/api-key-url.mdx';
Now call the Unstructured CLI or Python SDK. The source connector can be any of the ones supported. This example uses the local source connector:
import KafkaAPISh from '/snippets/destination_connectors/kafka.sh.mdx';
+import KafkaAPIPyV2 from '/snippets/destination_connectors/kafka.v2.py.mdx';
import KafkaAPIPyV1 from '/snippets/destination_connectors/kafka.v1.py.mdx';
+
\ No newline at end of file
diff --git a/api-reference/ingest/source-connectors/kafka.mdx b/api-reference/ingest/source-connectors/kafka.mdx
index f74a688a..e106bcab 100644
--- a/api-reference/ingest/source-connectors/kafka.mdx
+++ b/api-reference/ingest/source-connectors/kafka.mdx
@@ -15,9 +15,11 @@ import SharedAPIKeyURL from '/snippets/general-shared-text/api-key-url.mdx';
Now call the Unstructured Ingest CLI or the Unstructured Ingest Python library. The destination connector can be any of the ones supported. This example uses the local destination connector:
import KafkaAPISh from '/snippets/source_connectors/kafka.sh.mdx';
+import KafkaAPIPyV2 from '/snippets/source_connectors/kafka.v2.py.mdx';
import KafkaAPIPyV1 from '/snippets/source_connectors/kafka.v1.py.mdx';
+
\ No newline at end of file
diff --git a/open-source/ingest/destination-connectors/kafka.mdx b/open-source/ingest/destination-connectors/kafka.mdx
index dce049e9..f2340e46 100644
--- a/open-source/ingest/destination-connectors/kafka.mdx
+++ b/open-source/ingest/destination-connectors/kafka.mdx
@@ -15,10 +15,12 @@ Now call the Unstructured CLI or Python. The source connector can be any of the
This example sends files to Unstructured API services for processing by default. To process files locally instead, see the instructions at the end of this page.
import KafkaAPISh from '/snippets/destination_connectors/kafka.sh.mdx';
+import KafkaAPIPyV2 from '/snippets/destination_connectors/kafka.v2.py.mdx';
import KafkaAPIPyV1 from '/snippets/destination_connectors/kafka.v1.py.mdx';
+
diff --git a/open-source/ingest/source-connectors/kafka.mdx b/open-source/ingest/source-connectors/kafka.mdx
index 645f2fad..b6c406e9 100644
--- a/open-source/ingest/source-connectors/kafka.mdx
+++ b/open-source/ingest/source-connectors/kafka.mdx
@@ -15,10 +15,12 @@ Now call the Unstructured CLI or Python. The destination connector can be any of
This example sends data to Unstructured API services for processing by default. To process data locally instead, see the instructions at the end of this page.
import KafkaSh from '/snippets/source_connectors/kafka.sh.mdx';
+import KafkaPyV2 from '/snippets/source_connectors/kafka.v2.py.mdx';
import KafkaPyV1 from '/snippets/source_connectors/kafka.v1.py.mdx';
+
diff --git a/snippets/destination_connectors/kafka.sh.mdx b/snippets/destination_connectors/kafka.sh.mdx
index eace311a..b4da3301 100644
--- a/snippets/destination_connectors/kafka.sh.mdx
+++ b/snippets/destination_connectors/kafka.sh.mdx
@@ -22,7 +22,7 @@ unstructured-ingest \
--topic $KAFKA_TOPIC \
--kafka-api-key $KAFKA_API_KEY \
--secret $KAFKA_API_KEY \
- --confluent false \
+ --confluent true \
--num-messages-to-consume 1 \
--timeout 1.0
```
\ No newline at end of file
diff --git a/snippets/destination_connectors/kafka.v2.py.mdx b/snippets/destination_connectors/kafka.v2.py.mdx
new file mode 100644
index 00000000..c3277708
--- /dev/null
+++ b/snippets/destination_connectors/kafka.v2.py.mdx
@@ -0,0 +1,57 @@
+```python Python Ingest v2
+import os
+
+from unstructured_ingest.v2.pipeline.pipeline import Pipeline
+from unstructured_ingest.v2.interfaces import ProcessorConfig
+
+from unstructured_ingest.v2.processes.connectors.local import (
+ LocalIndexerConfig,
+ LocalDownloaderConfig,
+ LocalConnectionConfig
+)
+
+from unstructured_ingest.v2.processes.partitioner import PartitionerConfig
+from unstructured_ingest.v2.processes.chunker import ChunkerConfig
+from unstructured_ingest.v2.processes.embedder import EmbedderConfig
+
+from unstructured_ingest.v2.processes.connectors.kafka.cloud import (
+ CloudKafkaConnectionConfig,
+ CloudKafkaAccessConfig,
+ CloudKafkaUploaderConfig
+)
+
+# Chunking and embedding are optional.
+
+if __name__ == "__main__":
+ Pipeline.from_configs(
+ context=ProcessorConfig(),
+ indexer_config=LocalIndexerConfig(input_path=os.getenv("LOCAL_FILE_INPUT_DIR")),
+ downloader_config=LocalDownloaderConfig(),
+ source_connection_config=LocalConnectionConfig(),
+ partitioner_config=PartitionerConfig(
+ partition_by_api=True,
+ api_key=os.getenv("UNSTRUCTURED_API_KEY"),
+ partition_endpoint=os.getenv("UNSTRUCTURED_API_URL"),
+ additional_partition_args={
+ "split_pdf_page": True,
+ "split_pdf_allow_failed": True,
+ "split_pdf_concurrency_level": 15
+ }
+ ),
+ chunker_config=ChunkerConfig(chunking_strategy="by_title"),
+ embedder_config=EmbedderConfig(embedding_provider="huggingface"),
+ destination_connection_config=CloudKafkaConnectionConfig(
+ access_config=CloudKafkaAccessConfig(
+ kafka_api_key=os.getenv("KAFKA_API_KEY"),
+ secret=os.getenv("KAFKA_SECRET")
+ ),
+ bootstrap_server=os.getenv("KAFKA_BOOTSTRAP_SERVER"),
+ port=os.getenv("KAFKA_PORT")
+ ),
+ uploader_config=CloudKafkaUploaderConfig(
+ batch_size=100,
+ topic=os.getenv("KAFKA_TOPIC"),
+ timeout=10
+ )
+ ).run()
+```
\ No newline at end of file
diff --git a/snippets/source_connectors/kafka.sh.mdx b/snippets/source_connectors/kafka.sh.mdx
index 5bf37545..033c23b3 100644
--- a/snippets/source_connectors/kafka.sh.mdx
+++ b/snippets/source_connectors/kafka.sh.mdx
@@ -10,16 +10,15 @@ unstructured-ingest \
--topic $KAFKA_TOPIC \
--kafka-api-key $KAFKA_API_KEY \
--secret $KAFKA_API_KEY \
- --confluent false \
+ --confluent true \
+ --batch-size 100 \
--num-messages-to-consume 1 \
- --timeout 1.0
+ --timeout 1.0 \
--output-dir $LOCAL_FILE_OUTPUT_DIR \
- --chunk-elements \
+ --chunking-strategy by_title \
--embedding-provider huggingface \
- --num-processes 2 \
- --verbose \
--partition-by-api \
- --api-key $UNSTRUCTURED_API_KEY\
+ --api-key $UNSTRUCTURED_API_KEY \
--partition-endpoint $UNSTRUCTURED_API_URL \
--strategy hi_res \
--additional-partition-args="{\"split_pdf_page\":\"true\", \"split_pdf_allow_failed\":\"true\", \"split_pdf_concurrency_level\": 15}"
diff --git a/snippets/source_connectors/kafka.v2.py.mdx b/snippets/source_connectors/kafka.v2.py.mdx
new file mode 100644
index 00000000..dc079d18
--- /dev/null
+++ b/snippets/source_connectors/kafka.v2.py.mdx
@@ -0,0 +1,52 @@
+```python Python Ingest v2
+import os
+
+from unstructured_ingest.v2.pipeline.pipeline import Pipeline
+from unstructured_ingest.v2.interfaces import ProcessorConfig
+
+from unstructured_ingest.v2.processes.connectors.kafka.cloud import (
+ CloudKafkaIndexerConfig,
+ CloudKafkaDownloaderConfig,
+ CloudKafkaConnectionConfig,
+ CloudKafkaAccessConfig
+)
+
+from unstructured_ingest.v2.processes.partitioner import PartitionerConfig
+from unstructured_ingest.v2.processes.chunker import ChunkerConfig
+from unstructured_ingest.v2.processes.embedder import EmbedderConfig
+from unstructured_ingest.v2.processes.connectors.local import LocalUploaderConfig
+
+# Chunking and embedding are optional.
+
+if __name__ == "__main__":
+ Pipeline.from_configs(
+ context=ProcessorConfig(),
+ indexer_config=CloudKafkaIndexerConfig(
+ topic=os.getenv("KAFKA_TOPIC"),
+ num_messages_to_consume=100,
+ timeout=1
+ ),
+ downloader_config=CloudKafkaDownloaderConfig(download_dir=os.getenv("LOCAL_FILE_DOWNLOAD_DIR")),
+ source_connection_config=CloudKafkaConnectionConfig(
+ access_config=CloudKafkaAccessConfig(
+ kafka_api_key=os.getenv("KAFKA_API_KEY"),
+ secret=os.getenv("KAFKA_SECRET")
+ ),
+ bootstrap_server=os.getenv("KAFKA_BOOTSTRAP_SERVER"),
+ port=os.getenv("KAFKA_PORT")
+ ),
+ partitioner_config=PartitionerConfig(
+ partition_by_api=True,
+ api_key=os.getenv("UNSTRUCTURED_API_KEY"),
+ partition_endpoint=os.getenv("UNSTRUCTURED_API_URL"),
+ additional_partition_args={
+ "split_pdf_page": True,
+ "split_pdf_allow_failed": True,
+ "split_pdf_concurrency_level": 15
+ }
+ ),
+ chunker_config=ChunkerConfig(chunking_strategy="by_title"),
+ embedder_config=EmbedderConfig(embedding_provider="huggingface"),
+ uploader_config=LocalUploaderConfig(output_dir=os.getenv("LOCAL_FILE_OUTPUT_DIR"))
+ ).run()
+```
\ No newline at end of file