# AI Engineering: Data ETL Pipeline with LLMs

### 1. Project Objective (Business Context)

The objective of this project is to simulate an order from a pharmaceutical company: to extract variables of interest dynamically from a corpus of unstructured documents (PDFs of *papers* pharmaceuticals).

The final deliverable is a single `.csv` table where each row represents a document and each column an extracted interest variable. If a document does not contain a variable, a null value (`null`) must be imputed.

---

### 2. Technical Challenge and Engineering Solution

Performing this task in an environment with limited resources (such as Google Colab T4) presents three major AI engineering challenges:

1. **Memory (OOM):** Powerful LLMs (7B+ params) do not fit in GPUs of <16GB VRAM.

2. **Efficiency (Latency):** Processing N documents sequentially (one by one) is extremely slow.

3. **Reliability (Parsing):** LLMs are "locuate" and rarely return perfect JSON, which causes parsing failures.

This notebook implements an **optimized AI pipeline** to solve these problems:

* **Model:** `mistralai/Mistral-7B-Instruct-v0.3` (a high-performance 7B model).

* **Memory Optimization:** **4-bit (NF4)** quantization is used with `bitsandbytes` to load the 7B model on ~5GB of VRAM, avoiding OOM errors.

* **Batching:** The code was refactored to go from N sequential calls to only **2 calls to the GPU** (one to discover variables and another to extract data), multiplying efficiency.

* **Robust Parsing:** A `robust_json_parser` was implemented that "hunts" the JSON block within the LLM response, ignoring the extra text to avoid failures.

* **Data Cleaning:** `difflib` is used to find and merge columns with similar names (e.g. "placebo effect" and "placebo_effect") dynamically generated by AI.

In [2]:
!uv pip uninstall torch torchvision torchaudio transformers bitsandbytes accelerate pdfplumber
!uv pip install torch==2.4.1 torchvision==0.19.1 torchaudio==2.4.1 --index-url https://download.pytorch.org/whl/cu121
!uv pip install transformers bitsandbytes accelerate pdfplumber

[2mUsing Python 3.12.12 environment at: /usr[0m
[2mUninstalled [1m5 packages[0m [2min 957ms[0m[0m
 [31m-[39m [1maccelerate[0m[2m==1.11.0[0m
 [31m-[39m [1mtorch[0m[2m==2.8.0+cu126[0m
 [31m-[39m [1mtorchaudio[0m[2m==2.8.0+cu126[0m
 [31m-[39m [1mtorchvision[0m[2m==0.23.0+cu126[0m
 [31m-[39m [1mtransformers[0m[2m==4.57.1[0m
[2mUsing Python 3.12.12 environment at: /usr[0m
[2K[2mResolved [1m27 packages[0m [2min 1.31s[0m[0m
[2K[2mPrepared [1m15 packages[0m [2min 49.08s[0m[0m
[2mUninstalled [1m12 packages[0m [2min 45ms[0m[0m
[2K[2mInstalled [1m15 packages[0m [2min 209ms[0m[0m
 [31m-[39m [1mnvidia-cublas-cu12[0m[2m==12.6.4.1[0m
 [32m+[39m [1mnvidia-cublas-cu12[0m[2m==12.1.3.1[0m
 [31m-[39m [1mnvidia-cuda-cupti-cu12[0m[2m==12.6.80[0m
 [32m+[39m [1mnvidia-cuda-cupti-cu12[0m[2m==12.1.105[0m
 [31m-[39m [1mnvidia-cuda-nvrtc-cu12[0m[2m==12.6.77[0m
 [32m+[39m [1mnvidia-cuda-nvrtc-cu12[0m[2m==12.1.105

In [1]:
import torch
print(torch.__version__)
print(torch.cuda.is_available())
!nvcc --version  # Para confirmar CUDA del sistema (~12.1)

2.4.1+cu121
True
nvcc: NVIDIA (R) Cuda compiler driver
Copyright (c) 2005-2024 NVIDIA Corporation
Built on Thu_Jun__6_02:18:23_PDT_2024
Cuda compilation tools, release 12.5, V12.5.82
Build cuda_12.5.r12.5/compiler.34385749_0


In [None]:
from concurrent.futures import ProcessPoolExecutor, as_completed
import multiprocessing
import pdfplumber, os
import warnings

# Suppress pdfminer warnings
warnings.filterwarnings("ignore", category=UserWarning, module='pdfminer')

def pdf_to_text_worker(pdf_path: str,processed_dir:str) -> str:
    """
    Worker function to extract text from a single PDF.
    Saves the .txt file to the specified 'processed_dir'.
    """
    # Generate txt path
    file_name = os.path.basename(pdf_path)
    txt_name = file_name.replace('.pdf', '.txt')
    text_path = os.path.join(processed_dir, txt_name)
    parts = []
    try:
        with pdfplumber.open(pdf_path) as pdf:
            for page in pdf.pages:
                t = page.extract_text()
                if t:
                    parts.append(t)
    except Exception as e:
        print(f"Warning: failed to process {pdf_path}: {e}")
    
    # Save extracted text
    with open(text_path, 'w', encoding='utf-8') as f:
        f.write('\n'.join(parts))
    return text_path

# path to raw data
data_raw_path = "../data/raw/"
data_processed_path = "../data/processed/"

# Ensure processed directory exists
os.makedirs(data_processed_path, exist_ok=True)

# check if directory exists
if not os.path.isdir(data_raw_path):
    print(f"Error: Directory not found at {data_raw_path}")
    print("Please create 'data/raw' and add your PDFs.")
    pdf_paths = []
else:
    pdf_paths = [os.path.join(data_raw_path, f) for f in os.listdir(data_raw_path) if f.lower().endswith('.pdf')]

# Set workers based on CPU count
max_workers = max(1, min(4, multiprocessing.cpu_count()-1))

txt_paths = []
if pdf_paths:
    with ProcessPoolExecutor(max_workers=max_workers) as ex:
        # Submit tasks
        futures = {ex.submit(pdf_to_text_worker, p, data_processed_path): p for p in pdf_paths}
        for fut in as_completed(futures):
            try:
                txt_paths.append(fut.result())
            except Exception as e:
                print("pdf->txt failed:", e)

    print(f"Generated {len(txt_paths)} text files from {len(pdf_paths)} PDFs: {txt_paths}")
    for p in txt_paths:
        print(f" - Generated: {p}")
else:
    print("No PDFs found in 'data/raw/' to process.")



Generated txt files: 6


In [None]:
import os
import pdfplumber
import warnings
import torch
import gc, re, json, difflib
import numpy as np
import pandas as pd
from transformers import AutoTokenizer, AutoModelForCausalLM, pipeline, BitsAndBytesConfig
import time

start_time = time.time()

# --- 1. Model Configuration & Loading ---
model_name = "mistralai/Mistral-7B-Instruct-v0.3"
device_has_cuda = torch.cuda.is_available()
print(f"CUDA available: {device_has_cuda}")

quantization_config = None
if device_has_cuda:
    quantization_config = BitsAndBytesConfig(
        load_in_4bit=True,
        bnb_4bit_compute_dtype=torch.float16,
        bnb_4bit_use_double_quant=True,
        bnb_4bit_quant_type="nf4",
    )
    model_dtype = torch.float16
else:
    model_dtype = torch.float32
    print("Running on CPU: Expect significantly slower inference.")

tokenizer = AutoTokenizer.from_pretrained(model_name, trust_remote_code=True)
if tokenizer.pad_token is None:
    tokenizer.pad_token = tokenizer.eos_token

model = AutoModelForCausalLM.from_pretrained(
    model_name,
    quantization_config=quantization_config,
    device_map="auto" if device_has_cuda else None,
    torch_dtype=model_dtype,
    low_cpu_mem_usage=True,
    trust_remote_code=True,
)

gc.collect()
if device_has_cuda:
    torch.cuda.empty_cache()

device_id = 0 if device_has_cuda else -1
generator = pipeline(
    "text-generation",
    model=model,
    tokenizer=tokenizer,
    device=device_id
)

# --- 2. Efficient Batch Generation ---

def generate_batch(prompts: list[str], max_new_tokens: int = 300, temperature: float = 0.05, batch_size: int = 4):
    """
    Processes a list of prompts in parallel batches for maximum GPU efficiency.
    """
    print(f"  -> Generating batch of {len(prompts)} prompts (batch_size={batch_size})...")
    messages_list = [[{"role": "user", "content": p}] for p in prompts]
    formatted_prompts = [tokenizer.apply_chat_template(
        m, tokenize=False, add_generation_prompt=True
    ) for m in messages_list]

    with torch.no_grad():
        outputs = generator(
            formatted_prompts,
            max_new_tokens=max_new_tokens,
            do_sample=(temperature > 0),
            temperature=temperature,
            pad_token_id=tokenizer.eos_token_id,
            return_full_text=False,
            batch_size=batch_size
        )

    gc.collect()
    if device_has_cuda:
        torch.cuda.empty_cache()

    if not outputs or not isinstance(outputs, list) or not isinstance(outputs[0], list):
         print(f"Warning: Pipeline output format unexpected. Output: {str(outputs)[:200]}...")
         if isinstance(outputs, list) and len(outputs) > 0 and 'generated_text' in outputs[0]:
             return [out['generated_text'].strip() for out in outputs]
         return ["ERROR_PARSING_BATCH"] * len(prompts)

    results = [out[0]['generated_text'].strip() for out in outputs]
    print(f"  -> Batch generation complete.")
    return results

# --- 3. Robust JSON Parsing ---

def robust_json_parser(text: str) -> dict | list | None:
    """
    Extracts the first valid JSON block from an LLM's potentially "chatty" output.
    """
    import json, re
    try:
        m = re.search(r'```json\s*(\{.*?\}|\[.*?\])\s*```', text, re.DOTALL)
        if m:
            return json.loads(m.group(1))
        
        start_obj = text.find('{')
        start_arr = text.find('[')
        starts = [(start_obj, '{'), (start_arr, '[')]
        starts = [s for s in starts if s[0] != -1]
        if not starts:
            return None
        starts.sort()
        start, ch = starts[0]
        if ch == '{':
            end = text.rfind('}')
        else:
            end = text.rfind(']')
        if start != -1 and end != -1 and end > start:
            candidate = text[start:end+1]
            return json.loads(candidate)
            
    except Exception as e:
        print(f"  -> Robust parser failed: {e}. Text: {text[:100]}...")
        return None
    return None

def parse_variables_json(text: str) -> list[str]:
    """
    Takes raw LLM text, parses the JSON, and extracts a clean list of variable names.
    """
    parsed_data = robust_json_parser(text)

    if isinstance(parsed_data, dict) and "variables" in parsed_data:
        var_list = parsed_data["variables"]
        normalized_vars = []
        for item in var_list:
            if isinstance(item, str):
                normalized_vars.append(item)
            elif isinstance(item, dict):
                if 'name' in item:
                    normalized_vars.append(item['name'])
                elif 'variable_name' in item:
                    normalized_vars.append(item['variable_name'])
                else:
                    normalized_vars.append(list(item.keys())[0] if item else "unknown_var")
            else:
                normalized_vars.append(str(item))
        if all(v == 'name' for v in normalized_vars):
            return ['drugs', 'methods', 'results']
        return normalized_vars

    print("Parsing variables failed; using default.")
    return ["drug_name", "experiment_type", "observed_results"]

def parse_doc_json(text: str, doc_name: str, variables: list[str]) -> dict:
    """
    Takes raw LLM text for data extraction and builds a robust dictionary.
    (Includes fix to always use the correct doc_name).
    """
    # ALWAYS start with the *correct* document name.
    result = {"document": os.path.basename(doc_name)}
    
    parsed = robust_json_parser(text)

    if isinstance(parsed, dict):
        for k, v in parsed.items():
            key = str(k).strip()
            # Ignore the 'document' key from the LLM; we already set the correct one.
            if key.lower() == "document":
                continue 
            result[key] = v if v is not None else None
            
    elif isinstance(parsed, list):
        merged = {}
        for item in parsed:
            if isinstance(item, dict):
                merged.update(item)
        for k, v in merged.items():
            if str(k).strip().lower() != "document": # Also apply the fix here
                result[str(k).strip()] = v if v is not None else None

    # Schema enforcement
    for var in variables:
        if var not in result:
            result[var] = None
    return result

json_example_str = '{"variables": ["drug_name", "experiment_type", "observed_results"]}'

# --- 4. Batch Extraction Logic ---

def extract_variables_batch(txt_paths: list[str]) -> list[str]:
    """
    Builds and executes a batch job to discover all unique variables from all docs.
    """
    if not txt_paths:
        return ["drug_name", "experiment_type", "observed_results"]

    prompts = []
    print(f"Preparing variable extraction batch for {len(txt_paths)} documents...")
    for p in txt_paths:
        try:
            with open(p, 'r', encoding='utf-8') as f:
                t = f.read()
            truncated_text = t[:8000]
            prompt = f"""
            Analyze the following text from a pharmaceutical paper. Identify 3-5 key variables (like drugs, placebos, methods, or results).
            Respond ONLY with a valid JSON object in the format: {json_example_str}.
            Do not add explanations or chat.

            Text:
            {truncated_text}
            """
            prompts.append(prompt)
        except FileNotFoundError:
            print(f"Warning: File not found at {p}. Skipping.")
        except Exception as e:
            print(f"Warning: Failed to read {p}: {e}. Skipping.")


    if not prompts:
        print("No valid text files found to process.")
        return ["drug_name", "experiment_type", "observed_results"]

    raw_outputs = generate_batch(prompts, max_new_tokens=200, temperature=0.05, batch_size=4)

    variables_per_doc = []
    for i, raw_text in enumerate(raw_outputs):
        print(f"Vars raw for doc {i}: {raw_text[:100]}...")
        vars_per_doc_list = parse_variables_json(raw_text)
        print(f"Parsed vars for doc {i}: {vars_per_doc_list}")
        variables_per_doc.append(set(vars_per_doc_list))

    all_vars = list(set().union(*variables_per_doc))
    all_vars = [v.lower().strip() for v in all_vars]
    all_vars = list(set(all_vars))
    all_vars = [v.capitalize() for v in all_vars]
    
    print(f"Extracted unique variables: {all_vars}")
    return all_vars or ["drug_name", "experiment_type", "observed_results"]

def extract_data_batch(txt_paths: list[str], variables: list[str]) -> list[dict]:
    """
    Builds and executes a batch job to extract the structured data for all docs.
    """
    prompts = []
    doc_names = []
    print(f"Preparing data extraction batch for {len(txt_paths)} documents...")
    
    for p in txt_paths:
        try:
            with open(p, 'r', encoding='utf-8') as f:
                text = f.read()
            doc_names.append(os.path.basename(p)) # Add doc_name only if read is successful
            truncated_text = text[:8000]

            example_json_structure = {"document": os.path.basename(p)}
            for var in variables:
                example_json_structure[var] = "null"
            json_format_instruction = json.dumps(example_json_structure, ensure_ascii=False, indent=2)

            few_shot = '{"document": "example.txt", "placebo": "observed effect", "depression": "symptoms reduction"}'

            prompt = f"""
            You are an expert pharma data extractor. Read the text and extract all relevant info.
            Respond ONLY with a valid JSON object. Do not add explanations.
            Use "null" if data is absent. Do not duplicate keys.

            Example format: {json_format_instruction}
            Example data: {few_shot}

            Text to analyze:
            {truncated_text}
            """
            prompts.append(prompt)
        except FileNotFoundError:
            print(f"Warning: File not found at {p}. Skipping data extraction.")
        except Exception as e:
            print(f"Warning: Failed to read {p}: {e}. Skipping data extraction.")

    if not prompts:
        print("No valid text files found for data extraction.")
        return []

    raw_outputs = generate_batch(prompts, max_new_tokens=500, temperature=0.05, batch_size=2)

    results = []
    for doc_name, raw_text in zip(doc_names, raw_outputs):
        print(f"Raw output for {doc_name}: {raw_text[:100]}...")
        result = parse_doc_json(raw_text, doc_name, variables)
        results.append(result)
    
    return results

# --- 5. Post-Processing & DataFrame Creation ---

def normalize_key(k: str) -> str:
    """A simple normalization function to clean column names."""
    if k is None:
        return "unknown"
    k = str(k).strip()
    k = k.replace("’", "'").replace("–", "-")
    k = re.sub(r"[^\w\s]", " ", k)
    k = re.sub(r"\s+", " ", k).strip().lower()
    return k

def normalize_value(v):
    """Cleans up values, converting "null" strings to None and serializing JSON."""
    if v is None:
        return None
    if isinstance(v, str):
        vv = v.strip()
        if vv.lower() in ("null", "none", "", "n/a"):
            return None
        return vv
    try:
        return json.dumps(v, ensure_ascii=False)
    except Exception:
        return str(v)

# --- 6. Execution ---
gc.collect()
if device_has_cuda:
    torch.cuda.empty_cache()

if 'txt_paths' not in globals() or not txt_paths:
    print("Error: `txt_paths` is not defined. Run the PDF-to-text cell (Cell 3) first.")
else:
    print("--- Starting Efficient Batch Extraction ---")
    print("Processed text files (n):", len(txt_paths))
    for p in txt_paths:
        print(" -", p) # Show txt paths

    # Step 1: Discover all unique columns from all docs in one batch
    variables = extract_variables_batch(txt_paths)
    
    # Step 2: "Fill in the table" for all docs in a second batch
    results = extract_data_batch(txt_paths, variables)

    # Step 3: Sanitize, normalize, and merge results
    print("Sanitizing and merging results...")
    sanitized_results = []
    for r in results:
        nr = {}
        for k, v in r.items():
            nk = normalize_key(k)
            nv = normalize_value(v)
            nr[nk] = nv
        sanitized_results.append(nr)

    all_keys = sorted({k for r in sanitized_results for k in r.keys()})
    canonical = {}
    used = set()
    for k in all_keys:
        if k in used:
            continue
        matches = difflib.get_close_matches(k, all_keys, cutoff=0.85)
        group = set(matches) | {k}
        for m in group:
            canonical[m] = k
            used.add(m)

    merged_results = []
    for r in sanitized_results:
        mr = {}
        for k, v in r.items():
            ck = canonical.get(k, k)
            if ck not in mr or mr[ck] is None:
                mr[ck] = v
        if 'document' not in mr and 'document' in r:
            mr['document'] = r.get('document')
        merged_results.append(mr)

    for r in merged_results:
        for kk in list(r.keys()):
            if r[kk] is None:
                r[kk] = np.nan

    df = pd.DataFrame(merged_results)

    cols = list(df.columns)
    if 'document' in cols:
        cols = ['document'] + [c for c in cols if c != 'document']
        df = df[cols]


    output_path = "../data/output/dynamic_table_final.csv"
    
    # Ensure output directory exists
    os.makedirs(os.path.dirname(output_path), exist_ok=True)
    
    df.to_csv(output_path, index=False, na_rep='null')
    
    print("\n--- Process Complete ---")
    print(f"DataFrame saved to '{output_path}'  |  shape: {df.shape}")
    print(f"Total Time: {time.time()-start_time:.2f} s")
    display(df)

CUDA available: True


Loading checkpoint shards:   0%|          | 0/3 [00:00<?, ?it/s]

Device set to use cuda:0


--- Starting Efficient Batch Extraction ---
Processed text files (n): 6
 - fpsyg-12-639236.txt
 - fpsyt-14-1301143.txt
 - fpubh-10-1045777.txt
 - fpsyg-11-01354.txt
 - fpsyg-08-00308.txt
 - fpsyg-09-01240.txt
Preparing variable extraction batch for 6 documents...
  -> Generating batch of 6 prompts (batch_size=4)...
  -> Batch generation complete.
Vars raw for doc 0: {"variables": ["Placebo cream", "Experiment type (within-subjects and mixed design)", "Placebo effec...
Parsed vars for doc 0: ['Placebo cream', 'Experiment type (within-subjects and mixed design)', 'Placebo effect on pain and stress, moderated by participant and experimenter sex']
Vars raw for doc 1: {"variables": ["Placebo", "Neuroplasticity Placebo Theory", "Depression", "Fronto-limbic areas", "Pl...
Parsed vars for doc 1: ['Placebo', 'Neuroplasticity Placebo Theory', 'Depression', 'Fronto-limbic areas', 'Placebo response in antidepressant trials']
Vars raw for doc 2: {"variables": ["depression", "cognitive distortion de

Unnamed: 0,document,placebo,pain,stress,participant sex,experimenter sex,depression,depression tests,neuroplasticity placebo theory,nocebo hyperalgesia,...,barlow 2015,layard 2013,black et al 1999,black and craig 2002,barlow et al 2013,lesaffre et al 2007,begg 1987,hui and zhou 1998,zhou 1998,van stralen et al 2009
0,fpsyg-12-639236.txt,observed effect,no placebo effect,placebo effect,,,,,,,...,,,,,,,,,,
1,fpsyt-14-1301143.txt,designed to be inert in randomized controlled ...,,,,,disorder that has the highest placebo response...,,,,...,,,,,,,,,,
2,fpubh-10-1045777.txt,,,,,,,,,,...,,,,,,,,,,
3,fpsyg-11-01354.txt,observed effect,,,,,symptoms reduction,,,,...,,,,,,,,,,
4,ORIGINALRESEARCH,hypoalgesia,,,,,,,,,...,,,,,,,,,,
5,fpsyg-09-01240.txt,,,,,,,,,,...,cited,cited,cited,cited,cited,cited,cited,cited,cited,cited
