# Where Airflow, Spark, Snowflake & Databricks Fit in Document Processing

These enterprise-grade platforms complement LlamaIndex pipeline by handling **orchestration**, **scale**, **storage**, and **governance**. Here's where each fits in the document processing architecture:



## 🔄 **Apache Airflow - Workflow Orchestration**

**Role**: End-to-end pipeline orchestration and scheduling[1][2][3]

**Key Benefits**:
- **Dependency Management**: Ensures tasks run in correct order[2][4]
- **Error Handling**: Automatic retries and failure notifications[3]
- **Monitoring**: Web UI for pipeline visibility and debugging[5]
- **Scheduling**: Cron-based or event-driven execution[6]








In [None]:

from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.operators.bash import BashOperator
from datetime import datetime, timedelta

def document_ingestion_dag():
    """Complete document processing pipeline with Airflow"""

    dag = DAG(
        'document_processing_pipeline',
        default_args={
            'owner': 'data-team',
            'depends_on_past': False,
            'start_date': datetime(2025, 1, 1),
            'email_retries': 2,
            'retry_delay': timedelta(minutes=5)
        },
        description='End-to-end document processing with LlamaIndex',
        schedule_interval='@daily',  # Run daily
        catchup=False,
        max_active_runs=1
    )

    # Task 1: Extract documents from various sources
    extract_documents = PythonOperator(
        task_id='extract_documents',
        python_callable=extract_from_sources,
        dag=dag
    )

    # Task 2: Process with LlamaIndex + Spark for large datasets
    process_documents = PythonOperator(
        task_id='process_with_llamaindex_spark',
        python_callable=run_spark_llamaindex_pipeline,
        dag=dag
    )

    # Task 3: Store embeddings in Snowflake
    store_embeddings = PythonOperator(
        task_id='store_in_snowflake',
        python_callable=store_vectors_snowflake,
        dag=dag
    )

    # Task 4: Update vector index
    update_vector_index = PythonOperator(
        task_id='update_vector_index',
        python_callable=refresh_pinecone_index,
        dag=dag
    )

    # Task dependencies
    extract_documents >> process_documents >> store_embeddings >> update_vector_index

    return dag




## ⚡ **Apache Spark - Large-Scale Processing**

**Role**: Distributed processing for massive document datasets[7][8][9]

**Key Benefits**:
- **Horizontal Scaling**: Process TB+ of documents across clusters[10]
- **In-Memory Processing**: 100x faster than traditional ETL[7]
- **Fault Tolerance**: Automatic recovery from node failures[11]
- **Integration**: Works with Hadoop, S3, databases, streaming sources[8]





In [None]:

from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.ml.feature import Tokenizer, StopWordsRemover
import spark_nlp
from llamaindex_spark_integration import SparkLlamaIndexProcessor

def spark_document_processing():
    """Process millions of documents using Spark + LlamaIndex"""

    # Initialize Spark with NLP capabilities
    spark = SparkSession.builder \
        .appName("LlamaIndex-Spark-Pipeline") \
        .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer") \
        .config("spark.jars.packages", "com.johnsnowlabs.nlp:spark-nlp_2.12:4.1.0") \
        .getOrCreate()

    # Load documents from data lake (S3, HDFS, etc.)
    documents_df = spark.read \
        .format("parquet") \
        .load("s3a://company-datalake/documents/") \
        .filter(col("document_type").isin("pdf", "docx", "txt"))

    # Spark NLP preprocessing
    from sparknlp.annotator import DocumentAssembler, Tokenizer, Normalizer
    from sparknlp.base import Pipeline

    document_assembler = DocumentAssembler() \
        .setInputCol("content") \
        .setOutputCol("document")

    tokenizer = Tokenizer() \
        .setInputCols(["document"]) \
        .setOutputCol("tokens")

    normalizer = Normalizer() \
        .setInputCols(["tokens"]) \
        .setOutputCol("normalized") \
        .setLowercase(True)

    # Create Spark NLP pipeline
    spark_nlp_pipeline = Pipeline(stages=[
        document_assembler,
        tokenizer,
        normalizer
    ])

    # Process documents in parallel across cluster
    processed_df = spark_nlp_pipeline.fit(documents_df).transform(documents_df)

    # Custom UDF to integrate with LlamaIndex
    def llamaindex_chunk_and_embed(content):
        """Process individual documents with LlamaIndex"""
        from llama_index.core import Document
        from llama_index.core.node_parser import SentenceSplitter
        from llama_index.embeddings.openai import OpenAIEmbedding

        # Create LlamaIndex document
        doc = Document(text=content)

        # Chunk with LlamaIndex
        splitter = SentenceSplitter(chunk_size=1024, chunk_overlap=200)
        nodes = splitter.get_nodes_from_documents([doc])

        # Generate embeddings
        embed_model = OpenAIEmbedding()
        for node in nodes:
            node.embedding = embed_model.get_text_embedding(node.text)

        return [{"text": node.text, "embedding": node.embedding, "metadata": node.metadata}
                for node in nodes]

    # Apply LlamaIndex processing across partitions
    from pyspark.sql.types import ArrayType, StructType, StructField, StringType

    llamaindex_udf = udf(llamaindex_chunk_and_embed, ArrayType(StructType([
        StructField("text", StringType()),
        StructField("embedding", ArrayType(FloatType())),
        StructField("metadata", StringType())
    ])))

    # Process and flatten results
    final_df = processed_df \
        .withColumn("chunks", llamaindex_udf(col("normalized.result"))) \
        .select(explode(col("chunks")).alias("chunk")) \
        .select(
            col("chunk.text"),
            col("chunk.embedding"),
            col("chunk.metadata")
        )

    # Write to data warehouse
    final_df.write \
        .format("delta") \
        .mode("append") \
        .option("mergeSchema", "true") \
        .save("s3a://company-datalake/processed-embeddings/")

    return final_df

## 🏔️ **Snowflake - Data Warehouse & RAG Storage**

**Role**: Centralized storage, governance, and native RAG capabilities[12][13][14]

**Key Benefits**:
- **Native Vector Support**: Built-in VECTOR data type and similarity functions[14]
- **Cortex AI**: Serverless LLM and embedding services[15][16]
- **Zero-ETL**: Direct integration with data lakes and warehouses[13]
- **Governance**: Row-level security, data masking, audit trails[17]

In [None]:


import snowflake.connector
from llama_index.vector_stores.snowflake import SnowflakeVectorStore

def snowflake_rag_integration():
    """Store and query document embeddings in Snowflake"""

    # Connect to Snowflake
    conn = snowflake.connector.connect(
        user='your_user',
        password='your_password',
        account='your_account',
        warehouse='COMPUTE_WH',
        database='DOCUMENT_DB',
        schema='RAG_SCHEMA'
    )

    # Create tables for document storage and embeddings
    cursor = conn.cursor()

    # Document metadata table
    cursor.execute("""
        CREATE TABLE IF NOT EXISTS DOCUMENT_METADATA (
            document_id STRING PRIMARY KEY,
            source_path STRING,
            document_type STRING,
            processed_date TIMESTAMP_NTZ,
            metadata VARIANT
        )
    """)

    # Embeddings table with Snowflake's VECTOR type
    cursor.execute("""
        CREATE TABLE IF NOT EXISTS DOCUMENT_EMBEDDINGS (
            chunk_id STRING PRIMARY KEY,
            document_id STRING,
            chunk_text STRING,
            embedding VECTOR(FLOAT, 1536),  -- OpenAI embedding dimension
            chunk_metadata VARIANT,
            created_at TIMESTAMP_NTZ DEFAULT CURRENT_TIMESTAMP()
        )
    """)

    # Use Snowflake Cortex for native RAG
    def create_cortex_search_service():
        """Create Snowflake Cortex Search service"""
        cursor.execute("""
            CREATE OR REPLACE CORTEX SEARCH SERVICE DOCUMENT_SEARCH
            ON chunk_text
            WAREHOUSE = COMPUTE_WH
            TARGET_LAG = '1 minute'
            AS (
                SELECT chunk_id, chunk_text, chunk_metadata, embedding
                FROM DOCUMENT_EMBEDDINGS
            )
        """)

    # Query using Snowflake's native vector similarity
    def vector_search(query_text, limit=5):
        """Search documents using Snowflake vector similarity"""

        # Generate query embedding (using Snowflake Cortex)
        query_embedding_sql = f"""
            SELECT SNOWFLAKE.CORTEX.EMBED_TEXT_768('e5-base-v2', '{query_text}') as query_embedding
        """

        cursor.execute(query_embedding_sql)
        query_embedding = cursor.fetchone()

        # Vector similarity search
        similarity_search = f"""
            SELECT
                chunk_text,
                chunk_metadata,
                VECTOR_COSINE_SIMILARITY(embedding, PARSE_JSON('{query_embedding}')) as similarity_score
            FROM DOCUMENT_EMBEDDINGS
            ORDER BY similarity_score DESC
            LIMIT {limit}
        """

        cursor.execute(similarity_search)
        return cursor.fetchall()

    # Use Snowflake Cortex for complete RAG
    def snowflake_rag_query(question):
        """Complete RAG using Snowflake Cortex"""

        rag_sql = f"""
            SELECT SNOWFLAKE.CORTEX.COMPLETE(
                'mistral-7b',
                CONCAT(
                    'Context: ',
                    (SELECT LISTAGG(chunk_text, '\\n')
                     FROM (
                         SELECT chunk_text,
                         VECTOR_COSINE_SIMILARITY(
                             embedding,
                             SNOWFLAKE.CORTEX.EMBED_TEXT_768('e5-base-v2', '{question}')
                         ) as similarity
                         FROM DOCUMENT_EMBEDDINGS
                         ORDER BY similarity DESC
                         LIMIT 3
                     )
                    ),
                    '\\n\\nQuestion: {question}\\n\\nAnswer:'
                )
            ) as response
        """

        cursor.execute(rag_sql)
        return cursor.fetchone()

    return {
        'search': vector_search,
        'rag_query': snowflake_rag_query
    }



## 🧱 **Databricks - Unified AI Platform**

**Role**: End-to-end MLOps/LLMOps with integrated vector search[18][19][20]

**Key Benefits**:
- **Unity Catalog**: Centralized governance for data, models, and vectors[21][22]
- **Auto-Sync Indexes**: Vector indexes automatically update with source data[23][24]
- **MLOps Integration**: End-to-end model lifecycle with MLflow[20][18]
- **Foundation Models**: Pre-trained LLMs and embedding models[25]

In [None]:


import databricks
from databricks.vector_search.client import VectorSearchClient
from llama_index.vector_stores.databricks import DatabricksVectorSearch

def databricks_unified_rag_pipeline():
    """Complete MLOps pipeline with Databricks Unity Catalog"""

    # Initialize Databricks Vector Search
    vsc = VectorSearchClient()

    # Create vector search endpoint
    vsc.create_endpoint(
        name="document-processing-endpoint",
        endpoint_type="STANDARD"
    )

    # Create Unity Catalog managed vector index
    index = vsc.create_delta_sync_index(
        endpoint_name="document-processing-endpoint",
        source_table_name="main.documents.processed_chunks",
        index_name="main.documents.vector_index",
        pipeline_type="CONTINUOUS",  # Auto-sync with source table
        primary_key="chunk_id",
        embedding_source_column="chunk_text",
        embedding_model_endpoint_name="databricks-bge-large-en"  # Managed embedding model
    )

    # MLflow integration for model tracking
    import mlflow
    from mlflow.deployments import get_deploy_client

    def register_rag_model():
        """Register complete RAG model in MLflow"""

        class RAGModel(mlflow.pyfunc.PythonModel):
            def __init__(self, vector_index_name):
                self.vector_index_name = vector_index_name
                self.vsc = VectorSearchClient()

            def predict(self, context, model_input):
                """RAG inference with vector search + LLM"""

                query = model_input["query"]

                # Vector search
                search_results = self.vsc.similarity_search(
                    index_name=self.vector_index_name,
                    query_text=query,
                    columns=["chunk_text", "metadata"],
                    num_results=5
                )

                # Get context from search results
                context_text = "\\n".join([
                    result["chunk_text"] for result in search_results["result"]["data_array"]
                ])

                # LLM completion using Databricks Foundation Models
                deploy_client = get_deploy_client("databricks")

                prompt = f"""
                Context: {context_text}

                Question: {query}

                Answer based on the context provided:
                """

                response = deploy_client.predict(
                    endpoint="databricks-llama-2-70b-chat",
                    inputs={"prompt": prompt}
                )

                return response["choices"]["text"]

        # Log model with MLflow
        with mlflow.start_run() as run:
            rag_model = RAGModel("main.documents.vector_index")

            mlflow.pyfunc.log_model(
                "rag_model",
                python_model=rag_model,
                registered_model_name="document_rag_model"
            )

        return run.info.run_id

    # Databricks Workflows for automation
    def create_document_processing_workflow():
        """Databricks Workflow for end-to-end processing"""

        workflow_spec = {
            "name": "Document Processing Pipeline",
            "job_clusters": [{
                "job_cluster_key": "processing_cluster",
                "new_cluster": {
                    "spark_version": "13.3.x-scala2.12",
                    "node_type_id": "i3.xlarge",
                    "num_workers": 4,
                    "spark_conf": {
                        "spark.databricks.delta.preview.enabled": "true"
                    }
                }
            }],
            "tasks": [
                {
                    "task_key": "extract_documents",
                    "job_cluster_key": "processing_cluster",
                    "notebook_task": {
                        "notebook_path": "/Repos/main/document-pipeline/extract_documents"
                    }
                },
                {
                    "task_key": "process_with_llamaindex",
                    "depends_on": [{"task_key": "extract_documents"}],
                    "job_cluster_key": "processing_cluster",
                    "notebook_task": {
                        "notebook_path": "/Repos/main/document-pipeline/process_llamaindex"
                    }
                },
                {
                    "task_key": "update_vector_index",
                    "depends_on": [{"task_key": "process_with_llamaindex"}],
                    "job_cluster_key": "processing_cluster",
                    "notebook_task": {
                        "notebook_path": "/Repos/main/document-pipeline/update_vectors"
                    }
                }
            ]
        }

        # Create workflow using Databricks SDK
        from databricks.sdk import WorkspaceClient

        w = WorkspaceClient()
        job = w.jobs.create(**workflow_spec)

        return job.job_id

    return {
        'vector_index': index,
        'model_registration': register_rag_model,
        'workflow_creation': create_document_processing_workflow
    }



## 🏗️ **Enterprise Architecture Integration**

Here's how they work together in a production document processing pipeline:

```mermaid
graph TD
    A[Document Sources] --> B[Airflow Orchestrator]
    B --> C[Spark Cluster Processing]
    C --> D[LlamaIndex Integration]
    D --> E[Snowflake Data Warehouse]
    D --> F[Databricks Vector Search]
    E --> G[Snowflake Cortex RAG]
    F --> H[Databricks MLOps]
    G --> I[Production Applications]
    H --> I
```

**Recommended Architecture**:

1. **Airflow** orchestrates the entire pipeline with scheduling and monitoring[1][2]
2. **Spark** handles large-scale document processing and preprocessing[9][7]
3. **LlamaIndex** provides sophisticated chunking, metadata extraction, and embedding generation
4. **Snowflake** stores structured metadata and enables native RAG with Cortex[12][14]
5. **Databricks** manages vector indexes, model deployment, and MLOps workflows[18][20]

This combination gives you:
- **Enterprise scale** (millions of documents)
- **Production reliability** (monitoring, retries, governance)
- **Cost optimization** (auto-scaling, serverless options)
- **Security compliance** (unified access control, audit trails)
- **Operational simplicity** (managed services, unified platforms)

Each tool handles what it does best while LlamaIndex remains your core document intelligence engine, creating a robust production-ready document processing ecosystem.
