# Apache Airflow DAG for Movie Data Processing

This is an Apache Airflow DAG (Directed Acyclic Graph) that processes movie data through several steps to create searchable vector embeddings. Here are the main components and functionality:

## 1. Data Pipeline Overview
- Loads movie data from both CSV and JSON files stored in Google Cloud Storage (GCS).
- Processes movie titles and overviews.
- Generates embeddings using OpenAI's `text-embedding-ada-002` model.
- Stores the embeddings in Pinecone (a vector database).
- Saves results and metadata to BigQuery tables.

## 2. Key Features
- Handles large datasets through chunked processing.
- Implements parallel processing using both `ThreadPoolExecutor` and `ProcessPoolExecutor`.
- Includes robust error handling and retry mechanisms.
- Splits long texts to handle OpenAI's token limits.
- Processes batch operations to optimize API calls.
- Uses Dask for distributed computing capabilities.

## 3. Main Processing Steps
- **Text Preprocessing** (in parallel)
- **Embedding Generation** with OpenAI
- **Vector Storage** in Pinecone
- **Data Storage** in BigQuery (partitioned and clustered tables)

## 4. Infrastructure
- Runs on **Apache Airflow**.
- Uses **Google Cloud Platform** (GCS and BigQuery).
- Integrates with **OpenAI** for embeddings.
- Uses **Pinecone** for vector storage.
- Implements configuration management through **Airflow Variables**.

## 5. Performance Optimizations
- **Dynamic Worker Allocation** based on CPU cores.
- **Batch Processing** with configurable chunk sizes.
- **Parallel Uploads** to both Pinecone and BigQuery.
- **Text Splitting** to handle token limits efficiently.
- **Memory-Efficient Processing** of large datasets.

## DAG Characteristics
The DAG runs daily and includes error handling, retries, and monitoring capabilities. It's designed to be scalable and efficient when processing large volumes of movie data while maintaining fault tolerance.

--- 

```python
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.providers.google.cloud.operators.bigquery import BigQueryOperator
from airflow.models import Variable
from airflow.utils.dates import days_ago

import os
import re
import sys
import time
import json
import numpy as np
import pandas as pd
import dask.dataframe as dd
from datetime import datetime, timedelta
from typing import List, Dict, Any, Generator
from google.cloud import storage, bigquery
from openai import OpenAI
import pinecone
import logging
import nltk
from nltk.tokenize import word_tokenize
from nltk.corpus import stopwords
from nltk.stem import WordNetLemmatizer
import tiktoken
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor
from functools import partial
import multiprocessing
from itertools import islice

# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

# Constants for batch processing
CHUNK_SIZE = 1000
EMBEDDING_BATCH_SIZE = 100
MAX_RETRIES = 3
NUM_CORES = multiprocessing.cpu_count()
MAX_WORKERS = min(32, NUM_CORES * 2)  # Limit max workers to avoid overwhelming resources

class PipelineConfig:
    def __init__(self):
        self.gcs_bucket = Variable.get('GCS_BUCKET')
        self.input_path = Variable.get('INPUT_PATH')
        self.project_id = Variable.get('GCP_PROJECT_ID')
        self.bq_dataset = Variable.get('BQ_DATASET')
        self.full_text_table = f"{self.project_id}.{self.bq_dataset}.full_text"
        self.metadata_table = f"{self.project_id}.{self.bq_dataset}.metadata"
        self.dropped_table = f"{self.project_id}.{self.bq_dataset}.dropped"
        self.pinecone_api_key = Variable.get('PINECONE_API_KEY')
        self.pinecone_env = Variable.get('PINECONE_ENV')
        self.index_name = Variable.get('PINECONE_INDEX_NAME')
        self.openai_api_key = Variable.get('OPENAI_API_KEY')
        self.num_processes = NUM_CORES
def split_text_by_tokens(text: str, encoder, max_tokens: int = 4096, overlap: int = 100) -> List[str]:
    """
    Split text into chunks that don't exceed the token limit, with optional overlap
    
    Args:
        text: Text to split
        encoder: Tokenizer encoder
        max_tokens: Maximum tokens per chunk
        overlap: Number of tokens to overlap between chunks
    
    Returns:
        List of text chunks that each fit within token limit
    """
    tokens = encoder.encode(text)
    if len(tokens) <= max_tokens:
        return [text]
    
    chunks = []
    start = 0
    while start < len(tokens):
        # Find the end position for this chunk
        end = start + max_tokens
        if end >= len(tokens):
            chunk_tokens = tokens[start:]
        else:
            # Look for a good splitting point (space or punctuation)
            # Go backwards from max_tokens to find a good split point
            split_point = end
            while split_point > start + max_tokens - 100 and split_point < len(tokens):
                # Check if this token represents a good splitting point
                if tokens[split_point] in [encode_token for encode_token in encoder.encode(". ")] or \
                   tokens[split_point] in [encode_token for encode_token in encoder.encode("? ")] or \
                   tokens[split_point] in [encode_token for encode_token in encoder.encode("! ")] or \
                   tokens[split_point] in [encode_token for encode_token in encoder.encode("\n")]:
                    break
                split_point -= 1
            
            if split_point == start + max_tokens - 100:
                # If no good splitting point found, just split at max_tokens
                split_point = end
            
            chunk_tokens = tokens[start:split_point]
        
        # Decode chunk back to text
        chunk_text = encoder.decode(chunk_tokens)
        chunks.append(chunk_text.strip())
        
        # Move start position for next chunk, accounting for overlap
        start = start + len(chunk_tokens) - overlap
    
    return chunks

def parallel_text_preprocessing(texts: List[str]) -> List[str]:
    """Parallel text preprocessing using ProcessPoolExecutor"""
    with ProcessPoolExecutor(max_workers=MAX_WORKERS) as executor:
        processed_texts = list(executor.map(preprocess_text, texts))
    return processed_texts

def batch_generator(iterable, batch_size):
    """Generate batches from an iterable"""
    iterator = iter(iterable)
    while batch := list(islice(iterator, batch_size)):
        yield batch

def parallel_generate_embeddings(texts: List[str], openai_client: OpenAI) -> List[List[float]]:
    """Generate embeddings in parallel using ThreadPoolExecutor with text splitting"""
    encoder = tiktoken.get_encoding("cl100k_base")
    
    def process_batch(batch_texts):
        retry_count = 0
        current_batch_size = len(batch_texts)
        
        while retry_count < MAX_RETRIES:
            try:
                response = openai_client.embeddings.create(
                    input=batch_texts,
                    model="text-embedding-ada-002"
                )
                return [item.embedding for item in response.data]
            except Exception as e:
                retry_count += 1
                logger.error(f"Error in batch embedding: {e}. Retry {retry_count}/{MAX_RETRIES}")
                if retry_count == MAX_RETRIES:
                    return [None] * current_batch_size
                time.sleep(2 ** retry_count)
    
    # Split long texts and track their original indices
    processed_texts = []
    text_map = {}  # Maps new indices to original indices
    current_idx = 0
    
    for idx, text in enumerate(texts):
        chunks = split_text_by_tokens(text, encoder)
        for chunk in chunks:
            processed_texts.append(chunk)
            text_map[current_idx] = {'original_idx': idx, 'total_chunks': len(chunks)}
            current_idx += 1
    
    # Process all chunks in parallel batches
    batches = list(batch_generator(processed_texts, EMBEDDING_BATCH_SIZE))
    
    with ThreadPoolExecutor(max_workers=min(8, len(batches))) as executor:
        batch_results = list(executor.map(process_batch, batches))
    
    # Flatten batch results
    chunk_embeddings = []
    for batch in batch_results:
        if batch:
            chunk_embeddings.extend(batch)
    
    # Combine embeddings for chunks from the same original text
    final_embeddings = [None] * len(texts)
    current_original_idx = -1
    current_chunks = []
    
    for i, embedding in enumerate(chunk_embeddings):
        if embedding is None:
            continue
            
        original_idx = text_map[i]['original_idx']
        total_chunks = text_map[i]['total_chunks']
        
        if original_idx != current_original_idx:
            # Process previous chunks if any
            if current_chunks:
                final_embeddings[current_original_idx] = np.mean(current_chunks, axis=0).tolist()
            # Start new chunk collection
            current_original_idx = original_idx
            current_chunks = [embedding]
        else:
            current_chunks.append(embedding)
        
        # Process last chunk if it's all chunks for this text
        if len(current_chunks) == total_chunks:
            final_embeddings[current_original_idx] = np.mean(current_chunks, axis=0).tolist()
            current_chunks = []
    
    # Process any remaining chunks
    if current_chunks:
        final_embeddings[current_original_idx] = np.mean(current_chunks, axis=0).tolist()
    
    return final_embeddings

def process_chunk(chunk: pd.DataFrame, config: PipelineConfig, openai_client: OpenAI, pinecone_index) -> Dict[str, Any]:
    """Process a single chunk of data with parallel processing and text splitting"""
    logger.info(f"Processing chunk with {len(chunk)} records.")
    
    # Parallel text preprocessing
    texts = chunk['title'].fillna('') + ". " + chunk['overview'].fillna('')
    chunk['combined_text'] = parallel_text_preprocessing(texts.tolist())
    
    # Parallel embedding generation with text splitting
    embeddings = parallel_generate_embeddings(chunk['combined_text'].tolist(), openai_client)
    chunk['embedding'] = embeddings
    
    # Prepare vectors for Pinecone
    vectors = [
        (str(row.get('id', idx)), row['embedding'], {})
        for idx, row in chunk.iterrows()
        if row['embedding'] is not None
    ]
    
    # Parallel upload to Pinecone
    upload_success = parallel_upload_to_pinecone(vectors, pinecone_index)
    
    # Prepare results - now we shouldn't have any dropped records due to length
    processed_records = chunk[chunk['embedding'].notna()]
    
    return {
        'full_text': processed_records[['id', 'combined_text']],
        'metadata': processed_records[['id']].assign(
            process_date=datetime.utcnow(),
            filename=f"batch_{datetime.utcnow().strftime('%Y%m%d%H%M%S')}",
            status='processed',
            created_at=datetime.utcnow()
        ),
        'dropped': chunk[chunk['embedding'].isna()]  # Only includes records that failed for other reasons
    }

def parallel_upload_to_pinecone(vectors: List[tuple], pinecone_index, batch_size: int = 100):
    """Upload vectors to Pinecone in parallel batches"""
    def upload_batch(batch):
        retry_count = 0
        while retry_count < MAX_RETRIES:
            try:
                pinecone_index.upsert(vectors=batch)
                return True
            except Exception as e:
                retry_count += 1
                logger.error(f"Error uploading to Pinecone: {e}. Retry {retry_count}/{MAX_RETRIES}")
                if retry_count == MAX_RETRIES:
                    return False
                time.sleep(2 ** retry_count)
    
    batches = list(batch_generator(vectors, batch_size))
    with ThreadPoolExecutor(max_workers=min(8, len(batches))) as executor:
        results = list(executor.map(upload_batch, batches))
    
    return all(results)

def process_chunk(chunk: pd.DataFrame, config: PipelineConfig, openai_client: OpenAI, pinecone_index) -> Dict[str, Any]:
    """Process a single chunk of data with parallel processing"""
    logger.info(f"Processing chunk with {len(chunk)} records.")
    
    # Parallel text preprocessing
    texts = chunk['title'].fillna('') + ". " + chunk['overview'].fillna('')
    chunk['combined_text'] = parallel_text_preprocessing(texts.tolist())
    
    # Parallel embedding generation
    embeddings = parallel_generate_embeddings(chunk['combined_text'].tolist(), openai_client)
    chunk['embedding'] = embeddings
    
    # Prepare vectors for Pinecone
    vectors = [
        (str(row.get('id', idx)), row['embedding'], {})
        for idx, row in chunk.iterrows()
        if row['embedding'] is not None
    ]
    
    # Parallel upload to Pinecone
    upload_success = parallel_upload_to_pinecone(vectors, pinecone_index)
    
    # Prepare results
    dropped_records = chunk[chunk['embedding'].isna()]
    processed_records = chunk[chunk['embedding'].notna()]
    
    return {
        'full_text': processed_records[['id', 'combined_text']],
        'metadata': processed_records[['id']].assign(
            process_date=datetime.utcnow(),
            filename=f"batch_{datetime.utcnow().strftime('%Y%m%d%H%M%S')}",
            status='processed',
            created_at=datetime.utcnow()
        ),
        'dropped': dropped_records
    }

def parallel_upload_to_bigquery(dfs: List[pd.DataFrame], table_id: str, config: PipelineConfig):
    """Upload multiple dataframes to BigQuery in parallel"""
    def upload_single_df(df, partition_id):
        client = bigquery.Client()
        job_config = bigquery.LoadJobConfig(
            schema=[
                bigquery.SchemaField("id", "STRING"),
                bigquery.SchemaField("combined_text", "STRING"),
                bigquery.SchemaField("created_at", "TIMESTAMP")
            ],
            write_disposition="WRITE_APPEND",
            time_partitioning=bigquery.TimePartitioning(
                type_=bigquery.TimePartitioningType.DAY,
                field="created_at"
            ),
            clustering_fields=["id"]
        )
        
        uri = f"gs://{config.gcs_bucket}/tmp/{table_id}_{partition_id}.csv"
        df.to_csv(f"/tmp/{table_id}_{partition_id}.csv", index=False)
        
        storage_client = storage.Client()
        bucket = storage_client.bucket(config.gcs_bucket)
        blob = bucket.blob(f"tmp/{table_id}_{partition_id}.csv")
        blob.upload_from_filename(f"/tmp/{table_id}_{partition_id}.csv")
        
        load_job = client.load_table_from_uri(uri, table_id, job_config=job_config)
        return load_job.result()

    with ThreadPoolExecutor(max_workers=min(8, len(dfs))) as executor:
        futures = [executor.submit(upload_single_df, df, i) for i, df in enumerate(dfs)]
        for future in futures:
            future.result()

def process_movie_data(**context):
    """Main processing function with enhanced parallelization"""
    config = PipelineConfig()
    
    # Initialize clients
    openai_client = OpenAI(api_key=config.openai_api_key)
    pinecone.init(api_key=config.pinecone_api_key, environment=config.pinecone_env)
    pinecone_index = pinecone.Index(config.index_name)
    
    # Load and combine data
    logger.info("Loading data from GCS...")
    df_csv = load_data_from_gcs(config.gcs_bucket, f"{config.input_path}/output.csv")
    df_json = load_data_from_gcs(config.gcs_bucket, f"{config.input_path}/output.json")
    df = ensure_matching_schema(df_csv, df_json)
    
    # Process in parallel using Dask
    chunks = df.repartition(npartitions=config.num_processes)
    
    # Process chunks with enhanced parallelization
    process_chunk_partial = partial(process_chunk, config=config, 
                                  openai_client=openai_client, 
                                  pinecone_index=pinecone_index)
    
    results = chunks.map_partitions(process_chunk_partial).compute()
    
    # Combine results
    full_text_dfs = [result['full_text'] for result in results]
    metadata_dfs = [result['metadata'] for result in results]
    dropped_dfs = [result['dropped'] for result in results if not result['dropped'].empty]
    
    # Parallel upload to BigQuery
    logger.info("Uploading processed data to BigQuery...")
    parallel_upload_to_bigquery(full_text_dfs, config.full_text_table, config)
    parallel_upload_to_bigquery(metadata_dfs, config.metadata_table, config)
    if dropped_dfs:
        parallel_upload_to_bigquery(dropped_dfs, config.dropped_table, config)
    
    logger.info(f"Processing complete. Total records processed: {len(df)}")

# DAG definition remains the same
# Create DAG
with DAG(
    'movie_vector_processing',
    default_args={
        'owner': 'airflow',
        'depends_on_past': False,
        'email_on_failure': True,
        'email_on_retry': False,
        'retries': 2,
        'retry_delay': timedelta(minutes=5),
        'start_date': datetime(2024, 10, 27),
        'pool': 'movie_processing_pool',
    },
    description='Process movie data and generate vector embeddings',
    schedule_interval='0 0 * * *',
    catchup=False,
    tags=['movies', 'vectors', 'embeddings'],
    concurrency=4,
) as dag:
    
    create_full_text_table = BigQueryOperator(
        task_id='create_full_text_table',
        sql=CREATE_FULL_TEXT_TABLE_QUERY,
        use_legacy_sql=False,
        params={
            'project_id': '{{ var.value.GCP_PROJECT_ID }}',
            'dataset': '{{ var.value.BQ_DATASET }}'
        }
    )
    
    create_metadata_table = BigQueryOperator(
        task_id='create_metadata_table',
        sql=CREATE_METADATA_TABLE_QUERY,
        use_legacy_sql=False,
        params={
            'project_id': '{{ var.value.GCP_PROJECT_ID }}',
            'dataset': '{{ var.value.BQ_DATASET }}'
        }
    )
    
    process_data = PythonOperator(
        task_id='process_movie_data',
        python_callable=process_movie_data,
        provide_context=True,
    )
    
    [create_full_text_table, create_metadata_table] >> process_data

    ```

# Google Cloud Composer Deployment Script Summary

This script automates the deployment of an Apache Airflow DAG in Google Cloud Composer for movie data processing. It performs the following steps:

1. **Set Environment Variables**: Configures project, region, Composer environment, and file paths.

2. **Create Temporary Directory**: Manages temporary files needed for deployment.

3. **Generate `requirements.txt`**: Lists Python dependencies required for the DAG.

4. **Configure Google Cloud Project**: Sets the correct Google Cloud project context.

5. **Get DAG Folder Location**: Retrieves and processes the Composer DAG bucket path.

6. **Upload DAG and Requirements**: Uploads the DAG and dependencies to the Composer bucket.

7. **Install Dependencies**: Updates Composer to install Python packages from `requirements.txt`.

8. **Set Airflow Variables**: Configures key variables for input paths, API keys, and project settings.

9. **Create BigQuery Dataset**: Ensures the BigQuery dataset is available.

10. **Clean Up**: Deletes the temporary deployment directory.

11. **Deployment Completion**: Notifies that the deployment is complete and prompts verification in the Cloud Composer UI.

---


```bash 
#!/bin/bash

# Set your Google Cloud project and Composer environment variables
PROJECT_ID="your-project-id"
REGION="your-region"  # e.g., us-central1
ENVIRONMENT_NAME="your-composer-environment"
DAG_NAME="movie_vector_processing.py"
REQUIREMENTS_FILE="requirements.txt"

# Create a temporary directory for deployment files
TEMP_DIR=$(mktemp -d)
echo "Created temporary directory: $TEMP_DIR"

# Create requirements.txt for additional dependencies
cat > "$TEMP_DIR/$REQUIREMENTS_FILE" << EOL
dask[complete]>=2024.1.0
openai>=1.3.0
pinecone-client>=2.2.4
tiktoken>=0.5.0
nltk>=3.8.1
google-cloud-storage>=2.13.0
google-cloud-bigquery>=3.13.0
EOL

# Set the Google Cloud project
gcloud config set project $PROJECT_ID

# Get the Composer environment's DAG folder location
BUCKET_NAME=$(gcloud composer environments describe $ENVIRONMENT_NAME \
    --location $REGION \
    --format="get(config.dagGcsPrefix)")

# Remove the 'gs://' prefix and '/dags' suffix from the bucket path
BUCKET_NAME=$(echo $BUCKET_NAME | sed 's|gs://||' | sed 's|/dags||')

# Upload the DAG file to the Composer environment
echo "Uploading DAG file to Cloud Composer..."
gsutil cp $DAG_NAME gs://$BUCKET_NAME/dags/

# Create a plugins directory if it doesn't exist
gsutil ls gs://$BUCKET_NAME/plugins || gsutil mb gs://$BUCKET_NAME/plugins

# Upload requirements file to the plugins directory
echo "Uploading requirements file..."
gsutil cp $TEMP_DIR/$REQUIREMENTS_FILE gs://$BUCKET_NAME/plugins/requirements.txt

# Install additional dependencies in the Composer environment
gcloud composer environments update $ENVIRONMENT_NAME \
    --location $REGION \
    --update-pypi-packages-from-file gs://$BUCKET_NAME/plugins/requirements.txt

# Set Airflow variables
echo "Setting Airflow variables..."
gcloud composer environments run $ENVIRONMENT_NAME \
    --location $REGION variables -- \
    set GCS_BUCKET "your-gcs-bucket"

gcloud composer environments run $ENVIRONMENT_NAME \
    --location $REGION variables -- \
    set INPUT_PATH "your/input/path"

gcloud composer environments run $ENVIRONMENT_NAME \
    --location $REGION variables -- \
    set GCP_PROJECT_ID "$PROJECT_ID"

gcloud composer environments run $ENVIRONMENT_NAME \
    --location $REGION variables -- \
    set BQ_DATASET "your_dataset"

gcloud composer environments run $ENVIRONMENT_NAME \
    --location $REGION variables -- \
    set PINECONE_API_KEY "your-pinecone-api-key"

gcloud composer environments run $ENVIRONMENT_NAME \
    --location $REGION variables -- \
    set PINECONE_ENV "your-pinecone-environment"

gcloud composer environments run $ENVIRONMENT_NAME \
    --location $REGION variables -- \
    set PINECONE_INDEX_NAME "your-index-name"

gcloud composer environments run $ENVIRONMENT_NAME \
    --location $REGION variables -- \
    set OPENAI_API_KEY "your-openai-api-key"

# Create BigQuery dataset if it doesn't exist
echo "Creating BigQuery dataset if it doesn't exist..."
bq mk --dataset \
    --description "Dataset for movie vector processing" \
    ${PROJECT_ID}:your_dataset

# Clean up temporary directory
rm -rf $TEMP_DIR

echo "Deployment complete! Please check the Cloud Composer UI to verify the DAG is running correctly."
```