In [0]:
from pyspark.sql import functions as F, types as T
from pyspark.sql.functions import pandas_udf, PandasUDFType
from pyspark.sql.window import Window
from datetime import datetime
import pandas as pd
import numpy as np
import io, re, random, string, time, os, traceback, tempfile, shutil, subprocess
import signal
from contextlib import contextmanager
from pyspark.sql.functions import broadcast
from concurrent.futures import ProcessPoolExecutor, TimeoutError as _FutTimeout  
import multiprocessing  
import threading  

# Optional libs
try:
    import magic
except Exception:
    magic = None

import chardet
from charset_normalizer import detect as cn_detect
from ocflzw_decompress.lzw import LzwDecompress
from striprtf.striprtf import rtf_to_text
from bs4 import BeautifulSoup
import docx2txt
from openpyxl import load_workbook
import xlrd
import pdfplumber
from pdfminer.high_level import extract_text as pdfminer_extract_text

# OLE/MSG support
try:
    import olefile
    HAVE_OLE = True
except Exception:
    HAVE_OLE = False

try:
    import extract_msg
    HAVE_EXTRACT_MSG = True
except Exception:
    HAVE_EXTRACT_MSG = False


try:  
    import fitz  # PyMuPDF  
    HAVE_FITZ = True  
except Exception:  
    HAVE_FITZ = False  
  
try:  
    import pypdf  
    HAVE_PYPDF = True  
except Exception:  
    HAVE_PYPDF = False  
  
try:  
    import ocrmypdf  
    HAVE_OCRMYPDF = True  
except Exception:  
    HAVE_OCRMYPDF = False  
  
try:  
    import pytesseract  
    from PIL import Image  
    HAVE_TESS = True  
except Exception:  
    HAVE_TESS = False  

# --------------------------
# Config
# --------------------------
SOURCE_TABLE = "4_prod.raw.mill_ce_blob"
TARGET_TABLE = "4_prod.bronze.mill_blob_text"
MAX_BLOB_SIZE = 16 * 1024 * 1024  # 16 MB limit per individual blob
LZW_TIMEOUT_SECONDS = 30
EVENT_LIMIT = None  # Set to None for all events
BATCH_SIZE_BYTES = 1 * 1024 * 1024 * 1024  # 2 GB per batch (based on compressed size)
MIN_BATCH_SIZE = 10  # Minimum events per batch to avoid too many tiny batches
MAX_BATCH_SIZE = 5000  # Maximum events per batch to avoid memory issues
EVENT_TIMEOUT_SECONDS = 240   # Adjust per your environment  
EVENT_TIMEOUT_MODE = "signal"  # "process" causes pickle errors.
CANDIDATE_MULTIPLIER = 1
MAX_PARALLEL_BATCHES = 25

OCF_MARKER = b'ocf_blob\0'

class TimeoutException(Exception):
    pass

def _supports_posix_alarm():  
    try:  
        import signal, os  
        return hasattr(signal, "SIGALRM") and os.name == "posix"  
    except Exception:  
        return False  
  
def run_with_signal_timeout(seconds, fn, *args, **kwargs):  
    # Fast, low-overhead, iff available  
    import signal  
    class _Timeout(Exception): pass  
    def _handler(signum, frame): raise _Timeout()  
    old = signal.signal(signal.SIGALRM, _handler)  
    signal.setitimer(signal.ITIMER_REAL, seconds)  
    try:  
        return fn(*args, **kwargs)  
    finally:  
        try:  
            signal.setitimer(signal.ITIMER_REAL, 0)  
        except Exception:  
            pass  
        signal.signal(signal.SIGALRM, old)  
  
def run_with_process_timeout(seconds, fn, *args, **kwargs):  
    # Hard timeout: isolate the work in a child process and kill it on timeout.  
    # Use spawn for safety inside Spark workers.  
    ctx = multiprocessing.get_context("spawn")  
    with ctx.Pool(1) as pool:  
        async_res = pool.apply_async(fn, args, kwargs or {})  
        try:  
            return async_res.get(timeout=seconds)  
        except multiprocessing.TimeoutError:  
            pool.terminate()  
            raise TimeoutException(f"Per-event timeout after {seconds}s")  
        except Exception:  
            # Ensure pool is shut down on any error  
            pool.terminate()  
            raise  
        
def run_with_event_timeout(seconds, fn, *args, **kwargs):
    """
    Run fn(*args, **kwargs) with a timeout. 
    Automatically detects Spark worker context and disables signal-based timeout.
    """
    # Check if we're in a Spark worker (not main thread)
    import threading
    is_main_thread = threading.current_thread() is threading.main_thread()
    
    # In Spark workers, avoid both signal and process timeouts due to issues
    if not is_main_thread:
        # Just run without timeout in worker processes
        return fn(*args, **kwargs)
    
    # In main thread, try signal-based timeout
    if EVENT_TIMEOUT_MODE == "signal" and _supports_posix_alarm() and is_main_thread:
        try:
            return run_with_signal_timeout(seconds, fn, *args, **kwargs)
        except Exception as e:
            if "signal only works in main thread" in str(e):
                # Fallback to no timeout
                return fn(*args, **kwargs)
            raise
    
    # Try process-based timeout
    if EVENT_TIMEOUT_MODE == "process":
        try:
            return run_with_process_timeout(seconds, fn, *args, **kwargs)
        except Exception as e:
            # If pickle error, just run without timeout
            if "pickle" in repr(e).lower():
                print(f"WARN: Timeout disabled due to pickle error: {e}")
                return fn(*args, **kwargs)
            raise
    
    # Default: run without timeout
    return fn(*args, **kwargs)

# --------------------------
# Timing utilities
# --------------------------
def log_time(message, start_time=None):
    """Print timestamped message with optional duration"""
    timestamp = datetime.now().strftime("%H:%M:%S.%f")[:-3]
    if start_time:
        duration = time.time() - start_time
        print(f"[{timestamp}] {message} (took {duration:.2f}s)")
    else:
        print(f"[{timestamp}] {message}")
    return time.time()

def force_materialize(df, operation_name):
    """Force Spark to materialize a dataframe and return count with timing"""
    start = time.time()
    count = df.count()
    log_time(f"{operation_name}: {count} rows", start)
    return count, df

# --------------------------
# Output schema (explicit to avoid inference issues)
# --------------------------
result_schema = T.StructType([
    T.StructField("EVENT_ID", T.LongType(), True),
    T.StructField("VALID_UNTIL_DT_TM", T.TimestampType(), True),
    T.StructField("VALID_FROM_DT_TM", T.TimestampType(), True),
    T.StructField("UPDT_DT_TM", T.TimestampType(), True),
    T.StructField("UPDT_ID", T.LongType(), True),
    T.StructField("UPDT_TASK", T.LongType(), True),
    T.StructField("UPDT_CNT", T.LongType(), True),
    T.StructField("UPDT_APPLCTX", T.LongType(), True),
    T.StructField("LAST_UTC_TS", T.TimestampType(), True),
    T.StructField("ADC_UPDT", T.TimestampType(), True),
    T.StructField("BLOB_BINARY", T.BinaryType(), True),
    T.StructField("CONTENT_TYPE", T.StringType(), True),
    T.StructField("ENCODING", T.StringType(), True),
    T.StructField("BLOB_TEXT", T.StringType(), True),
    T.StructField("BINARY_SIZE", T.LongType(), True),
    T.StructField("TEXT_LENGTH", T.LongType(), True),
    T.StructField("STATUS", T.StringType(), True),
    T.StructField("anon_text", T.StringType(), True)
])

# --------------------------
# Helpers
# --------------------------
OCF_MARKER = b'ocf_blob\0'

class TimeoutException(Exception):
    pass

@contextmanager
def time_limit(seconds):
    """Context manager for timing out operations"""
    def signal_handler(signum, frame):
        raise TimeoutException(f"Operation timed out after {seconds} seconds")
    
    # Set the signal handler and a alarm
    old_handler = signal.signal(signal.SIGALRM, signal_handler)
    signal.alarm(seconds)
    try:
        yield
    finally:
        signal.alarm(0)
        signal.signal(signal.SIGALRM, old_handler)

def get_max_adc_updt(table_name, default_dt=datetime(1980, 1, 1)):  
    try:  
        row = spark.sql(f"SELECT MAX(ADC_UPDT) AS max_dt FROM {table_name}").first()  
        max_dt = row["max_dt"]  
    except Exception:  
        return default_dt  
  
    if max_dt is None or max_dt > datetime.now():  
        return default_dt  
    return max_dt  

def format_size(size_bytes):
    size_bytes = float(size_bytes or 0)
    for unit in ['bytes', 'KB', 'MB', 'GB', 'TB']:
        if size_bytes < 1024.0:
            return f"{size_bytes:.2f} {unit}"
        size_bytes /= 1024.0
    return f"{size_bytes:.2f} PB"

def combine_blob_chunks(chunks):
    """Combine blob chunks into single bytes object"""
    combined = bytearray()
    for chunk in chunks or []:
        if chunk is not None:
            combined.extend(chunk)
    return bytes(combined)

def remove_ocf_wrapper_aggressive(data: bytes):
    """Remove ALL occurrences of OCF marker - needed for multi-chunk files"""
    try:
        if not data:
            return data
        # Remove trailing marker first
        if data.endswith(OCF_MARKER):
            data = data[:-len(OCF_MARKER)]
        # Remove ALL embedded markers (critical for multi-chunk files)
        if OCF_MARKER in data:
            data = b''.join(data.split(OCF_MARKER))
        return data
    except Exception:
        return data

def remove_ocf_wrapper_conservative(data: bytes):
    """Only remove trailing OCF marker"""
    try:
        if not data:
            return data
        if data.endswith(OCF_MARKER):
            return data[:-len(OCF_MARKER)]
        return data
    except Exception:
        return data

def decompress_lzw_with_timeout(data: bytes, timeout_seconds=LZW_TIMEOUT_SECONDS):
    """Decompress LZW with timeout protection"""
    try:
        # Skip timeout on very small data or if timeout is disabled
        if timeout_seconds <= 0 or len(data) < 100000:  # < 100KB
            return bytes(LzwDecompress().decompress(data))
        
        # For larger data, use timeout but handle gracefully
        try:
            with time_limit(timeout_seconds):
                return bytes(LzwDecompress().decompress(data))
        except:
            # If timeout fails (e.g., in Databricks), try without it
            return bytes(LzwDecompress().decompress(data))
    except TimeoutException as e:
        raise TimeoutException(str(e))
    except Exception as e:
        raise e

def decompress_blob_improved(raw: bytes, compression_cd, chunk_count=1, has_duplicates=False):
    """
    Improved decompression with better handling of multi-chunk files.
    Now considers chunk_count and duplicate sequences to determine strategy.
    """
    if not raw:
        return None, "Empty content"
    
    # Ensure int comparison
    try:
        cd = int(compression_cd) if compression_cd is not None else None
    except Exception:
        cd = None
    
    try:
        if cd == 728:  # LZW
            # Count OCF markers to determine strategy
            ocf_count = raw.count(OCF_MARKER)
            
            # CRITICAL: For very large multi-chunk files, ONLY try aggressive cleanup
            if chunk_count > 10 or ocf_count > 10:
                try:
                    cleaned_aggressive = remove_ocf_wrapper_aggressive(raw)
                    # Fix D: Use decompress_lzw_with_timeout
                    out = decompress_lzw_with_timeout(cleaned_aggressive)
                    return out, None
                except Exception as e:
                    error_msg = str(e)
                    
                    if "list assignment index" in error_msg:
                        return None, f"LZW decompression failed (likely corrupted data after {chunk_count} chunks): {error_msg}"
                    elif "bytes must be in range" in error_msg:
                        return None, f"LZW decompression failed (invalid byte values after OCF cleanup): {error_msg}"
                    else:
                        # Try fallback methods only for non-corruption errors
                        try:
                            cleaned_conservative = remove_ocf_wrapper_conservative(raw)
                            # Fix D: Use decompress_lzw_with_timeout
                            out = decompress_lzw_with_timeout(cleaned_conservative)
                            return out, None
                        except:
                            try:
                                # Fix D: Use decompress_lzw_with_timeout
                                out = decompress_lzw_with_timeout(raw)
                                return out, None
                            except:
                                return None, f"LZW failed all attempts for {chunk_count}-chunk file: {error_msg}"
            
            else:
                # For smaller files, try all methods and pick best
                results = []
                
                # Method 1: Aggressive cleanup (usually best)
                try:
                    cleaned_aggressive = remove_ocf_wrapper_aggressive(raw)
                    # Fix D: Use decompress_lzw_with_timeout
                    out = decompress_lzw_with_timeout(cleaned_aggressive)
                    results.append((out, "aggressive", len(out)))
                except Exception as e:
                    results.append((None, "aggressive", str(e)))
                
                # Method 2: Conservative cleanup
                try:
                    cleaned_conservative = remove_ocf_wrapper_conservative(raw)
                    # Fix D: Use decompress_lzw_with_timeout
                    out = decompress_lzw_with_timeout(cleaned_conservative)
                    results.append((out, "conservative", len(out)))
                except Exception as e:
                    results.append((None, "conservative", str(e)))
                
                # Method 3: Raw (no cleanup)
                try:
                    # Fix D: Use decompress_lzw_with_timeout
                    out = decompress_lzw_with_timeout(raw)
                    results.append((out, "raw", len(out)))
                except Exception as e:
                    results.append((None, "raw", str(e)))
                
                # Pick the best result (prefer larger output)
                successful = [(r, m, s) for r, m, s in results if r is not None]
                
                if successful:
                    successful.sort(key=lambda x: x[2], reverse=True)
                    best = successful[0]
                    return best[0], None
                else:
                    errors = [f"{m}: {s}" for r, m, s in results if r is None]
                    return None, f"LZW failed all attempts - {'; '.join(errors)}"
                            
        elif cd == 727:  # No compression
            # For uncompressed data, OCF markers MUST be removed as they corrupt content
            if raw.count(OCF_MARKER) > 0:
                cleaned = remove_ocf_wrapper_aggressive(raw)
                return cleaned, None
            else:
                return raw, None
        else:
            return None, f"Unknown compression type: {compression_cd}"
    except Exception as e:
        return None, f"Decompression error: {str(e)}"

def calculate_printable_ratio(text, sample_size=1000):
    if not text:
        return 0.0
    if len(text) <= sample_size:
        sample = text
    else:
        sample = ''.join(random.choice(text) for _ in range(sample_size))
    printable = sum(1 for c in sample if c in string.printable)
    return printable / len(sample) if sample else 0.0

def guess_text(content: bytes):
    if not content:
        return None, None, 0.0
    
    ch = chardet.detect(content) or {}
    cn = cn_detect(content) or {}
    candidates = [ch.get('encoding'), cn.get('encoding'), 'utf-8', 'windows-1252', 'latin-1', 'ascii']
    best_decoded, best_encoding, best_ratio = None, None, 0.0
    
    for enc in candidates:
        if not enc:
            continue
        try:
            decoded = content.decode(enc, errors='ignore')
            r = calculate_printable_ratio(decoded)
            if r > best_ratio:
                best_ratio, best_decoded, best_encoding = r, decoded, enc
            if best_ratio > 0.95:
                break
        except Exception:
            continue
    
    return best_decoded, best_encoding, best_ratio

def detect_mime(content: bytes):
    if not content:
        return 'application/octet-stream'
    
    if content.startswith(b'%PDF-'):
        return 'application/pdf'
    
    if content.startswith(b'\x50\x4B\x03\x04'):
        head = content[:4096]
        if b'word/' in head:
            return 'application/vnd.openxmlformats-officedocument.wordprocessingml.document'
        if b'xl/' in head:
            return 'application/vnd.openxmlformats-officedocument.spreadsheetml.sheet'
        if b'ppt/' in head:
            return 'application/vnd.openxmlformats-officedocument.presentationml.presentation'
        return 'application/zip'
    
    if content.startswith(b'\xD0\xCF\x11\xE0\xA1\xB1\x1A\xE1'):
        return 'application/x-ole-storage'
    
    if content.startswith(b'{\\'):
        return 'text/rtf'
    
    if magic:
        try:
            return magic.Magic(mime=True).from_buffer(content) or 'application/octet-stream'
        except Exception:
            pass
    
    return 'application/octet-stream'

# [Keep all the OLE helpers and extractors unchanged]
def classify_ole(data: bytes):
    if not (HAVE_OLE and data):
        return 'application/x-ole-storage'
    
    try:
        with olefile.OleFileIO(io.BytesIO(data)) as ole:
            if ole.exists('WordDocument'):
                return 'application/msword'
            if ole.exists('Workbook') or ole.exists('Book'):
                return 'application/vnd.ms-excel'
            if ole.exists('PowerPoint Document'):
                return 'application/vnd.ms-powerpoint'
            if ole.exists('__properties_version1.0') and (
                ole.exists('__recip_version1.0') or ole.exists('__attach_version1.0')
            ):
                return 'application/vnd.ms-outlook'
            return 'application/x-ole-storage'
    except Exception:
        return 'application/x-ole-storage'

def refine_mime_with_ole(content_type, data: bytes):
    if content_type == 'application/x-ole-storage':
        return classify_ole(data)
    return content_type

def extract_text_from_ole_doc(data: bytes):
    exe = shutil.which('antiword')
    if exe:
        with tempfile.NamedTemporaryFile(suffix='.doc', delete=False) as tmp:
            tmp.write(data)
            path = tmp.name
        try:
            res = subprocess.run([exe, '-w', '0', path], capture_output=True, timeout=120)
            if res.returncode == 0:
                out = res.stdout.decode('utf-8', errors='ignore')
                if out.strip():
                    return out
        finally:
            try:
                os.unlink(path)
            except Exception:
                pass
    
    exe = shutil.which('catdoc')
    if exe:
        with tempfile.NamedTemporaryFile(suffix='.doc', delete=False) as tmp:
            tmp.write(data)
            path = tmp.name
        try:
            res = subprocess.run([exe, '-w', path], capture_output=True, timeout=120)
            if res.returncode == 0:
                out = res.stdout.decode('utf-8', errors='ignore')
                if out.strip():
                    return out
        finally:
            try:
                os.unlink(path)
            except Exception:
                pass
    
    return None

def extract_text_from_ole_ppt(data: bytes):
    exe = shutil.which('catppt')
    if not exe:
        return None
    
    with tempfile.NamedTemporaryFile(suffix='.ppt', delete=False) as tmp:
        tmp.write(data)
        path = tmp.name
    
    try:
        res = subprocess.run([exe, path], capture_output=True, timeout=120)
        if res.returncode == 0:
            return res.stdout.decode('utf-8', errors='ignore')
    finally:
        try:
            os.unlink(path)
        except Exception:
            pass
    
    return None

def extract_text_from_msg(data: bytes):
    if not HAVE_EXTRACT_MSG:
        return None
    
    try:
        with tempfile.NamedTemporaryFile(suffix='.msg', delete=False) as tmp:
            tmp.write(data)
            path = tmp.name
        
        try:
            m = extract_msg.Message(path)
            subj = (m.subject or '').strip()
            body = (m.body or '').strip()
            text = (subj + '\n\n' + body).strip()
            return text or None
        finally:
            try:
                os.unlink(path)
            except Exception:
                pass
    except Exception:
        return None

def extract_text_from_docx(content):
    try:
        return docx2txt.process(io.BytesIO(content))
    except Exception:
        return None

def extract_text_from_excel(content):
    try:
        wb = load_workbook(io.BytesIO(content), read_only=True, data_only=True)
        parts = []
        for sheet in wb.sheetnames:
            ws = wb[sheet]
            for row in ws.iter_rows(values_only=True):
                row_text = ' '.join(str(cell) for cell in row if cell is not None)
                if row_text.strip():
                    parts.append(row_text)
        return '\n'.join(parts)
    except Exception:
        try:
            workbook = xlrd.open_workbook(file_contents=content)
            parts = []
            for sheet in workbook.sheets():
                for r in range(sheet.nrows):
                    parts.append(' '.join(str(cell.value) for cell in sheet.row(r)))
            return '\n'.join(parts)
        except Exception:
            return None

def extract_pdf_with_pdftotext(content: bytes, layout=True):  
    exe = shutil.which('pdftotext')  
    if not exe:  
        return None  
    with tempfile.NamedTemporaryFile(suffix='.pdf', delete=False) as f:  
        f.write(content)  
        in_path = f.name  
    try:  
        args = [exe, "-enc", "UTF-8"]  
        if layout:  
            args.append("-layout")  
        args.extend([in_path, "-"])  # write to stdout  
        res = subprocess.run(args, capture_output=True, timeout=180)  
        if res.returncode == 0:  
            out = res.stdout.decode('utf-8', errors='ignore')  
            return out if out.strip() else None  
        return None  
    finally:  
        try: os.unlink(in_path)  
        except: pass  
  
def extract_pdf_with_pypdf(content: bytes):  
    if not HAVE_PYPDF:  
        return None  
    try:  
        reader = pypdf.PdfReader(io.BytesIO(content), strict=False)  
        if getattr(reader, "is_encrypted", False):  
            try:  
                reader.decrypt("")  # empty password often works  
            except Exception:  
                pass  
        parts = []  
        for page in reader.pages:  
            t = page.extract_text() or ""  
            if t.strip():  
                parts.append(t)  
        return "\n".join(parts) if parts else None  
    except Exception:  
        return None  
  
def extract_pdf_with_pymupdf(content: bytes):  
    if not HAVE_FITZ:  
        return None  
    try:  
        doc = fitz.open(stream=content, filetype="pdf")  
        if doc.needs_pass:  
            try:  
                doc.authenticate("")  # empty password  
            except Exception:  
                pass  
        parts = []  
        for page in doc:  
            t = page.get_text("text") or ""  
            if t.strip():  
                parts.append(t)  
        return "\n".join(parts) if parts else None  
    except Exception:  
        return None  
  
def repair_pdf_with_qpdf(content: bytes):  
    exe = shutil.which('qpdf')  
    if not exe:  
        return None  
    with tempfile.NamedTemporaryFile(suffix='.pdf', delete=False) as fi:  
        fi.write(content)  
        in_path = fi.name  
    out_fd, out_path = tempfile.mkstemp(suffix='.pdf')  
    os.close(out_fd)  
    try:  
        # --decrypt with empty password, disable object streams, uncompress streams  
        args = [exe, "--password=", "--decrypt",  
                "--object-streams=disable", "--stream-data=uncompress",  
                in_path, out_path]  
        res = subprocess.run(args, capture_output=True, timeout=180)  
        if res.returncode == 0 and os.path.exists(out_path):  
            with open(out_path, "rb") as f:  
                return f.read()  
        return None  
    finally:  
        for p in (in_path, out_path):  
            try: os.unlink(p)  
            except: pass  
  
def ocr_pdf_best_effort(content: bytes, max_pages=5, lang="eng"):  
    # Prefer ocrmypdf sidecar text if available (fast and good quality)  
    if HAVE_OCRMYDF := HAVE_OCRMYPDF:  
        in_f = tempfile.NamedTemporaryFile(suffix=".pdf", delete=False)  
        out_pdf = tempfile.NamedTemporaryFile(suffix=".pdf", delete=False)  
        sidecar = tempfile.NamedTemporaryFile(suffix=".txt", delete=False)  
        in_f.write(content); in_f.close(); out_pdf.close(); sidecar.close()  
        try:  
            args = ["ocrmypdf", "-l", lang, "--sidecar", sidecar.name,  
                    "--optimize", "0", "--tesseract-timeout", "120",  
                    in_f.name, out_pdf.name]  
            res = subprocess.run(args, capture_output=True, timeout=1200)  
            if res.returncode == 0 and os.path.exists(sidecar.name):  
                txt = open(sidecar.name, "r", encoding="utf-8", errors="ignore").read()  
                return txt if txt.strip() else None  
        finally:  
            for p in (in_f.name, out_pdf.name, sidecar.name):  
                try: os.unlink(p)  
                except: pass  
        return None  
    # Fallback: render pages with PyMuPDF and OCR via pytesseract  
    if not (HAVE_FITZ and HAVE_TESS):  
        return None  
    try:  
        doc = fitz.open(stream=content, filetype="pdf")  
        if doc.needs_pass:  
            try: doc.authenticate("")  
            except Exception: pass  
        parts = []  
        n = min(max_pages, len(doc))  
        for i in range(n):  
            page = doc[i]  
            # 300 DPI render is a reasonable trade-off  
            pix = page.get_pixmap(matrix=fitz.Matrix(300/72, 300/72))  
            img = Image.open(io.BytesIO(pix.tobytes("png")))  
            t = pytesseract.image_to_string(img, lang=lang) or ""  
            if t.strip():  
                parts.append(t)  
        return "\n".join(parts) if parts else None  
    except Exception:  
        return None  

# Fix C: Add OCR for images
def ocr_image_best_effort(content: bytes, lang="eng", max_frames=10):
    """OCR images including multi-page TIFFs"""
    if not HAVE_TESS:
        return None
    try:
        img = Image.open(io.BytesIO(content))
        texts = []
        n_frames = getattr(img, "n_frames", 1)
        for i in range(min(n_frames, max_frames)):
            try:
                img.seek(i)
            except Exception:
                break
            # Convert to a good mode for OCR
            frame = img.convert("L")
            t = pytesseract.image_to_string(frame, lang=lang) or ""
            if t.strip():
                texts.append(t)
        return "\n".join(texts) if texts else None
    except Exception:
        return None

def pdf_page_stats(content: bytes):
    stats = {
        'pages': None,
        'text_pages': 0,
        'image_only_pages': 0,
        'images_total': 0,
        'has_any_text': False,
        'encrypted': False,
    }
    
    # Prefer PyMuPDF
    if HAVE_FITZ:
        try:
            doc = fitz.open(stream=content, filetype="pdf")
            stats['encrypted'] = doc.needs_pass
            pages = len(doc)
            stats['pages'] = pages
            for page in doc:
                t = page.get_text("text") or ""
                if t.strip():
                    stats['text_pages'] += 1
                else:
                    imgs = page.get_images(full=True)
                    if imgs:
                        stats['image_only_pages'] += 1
                        stats['images_total'] += len(imgs)
            stats['has_any_text'] = stats['text_pages'] > 0
            return stats
        except Exception:
            pass
    
    # Fallback to pypdf (no image counting, just text presence)
    if HAVE_PYPDF:
        try:
            reader = pypdf.PdfReader(io.BytesIO(content), strict=False)
            if getattr(reader, "is_encrypted", False):
                try:
                    reader.decrypt("")
                except Exception:
                    pass
            stats['pages'] = len(reader.pages)
            for pg in reader.pages:
                t = pg.extract_text() or ""
                if t.strip():
                    stats['text_pages'] += 1
            stats['has_any_text'] = stats['text_pages'] > 0
            return stats
        except Exception:
            pass
    
    return stats

def parse_pdf(content: bytes):
    # Quick classification: is there any real text?
    stats = pdf_page_stats(content)

    # If there appears to be text, try text extractors first  
    if stats.get('has_any_text', False):  
        # 1) pdfplumber  
        try:  
            if pdfplumber:  
                with io.BytesIO(content) as f:  
                    with pdfplumber.open(f) as pdf:  
                        texts = []  
                        for page in pdf.pages:  
                            try:  
                                # Prefer words -> lines  
                                words = page.extract_words(  
                                    x_tolerance=3, y_tolerance=3,  
                                    keep_blank_chars=False, use_text_flow=False  
                                )  
                                if words:  
                                    lines = []  
                                    current_y = None  
                                    line_words = []  
                                    for w in words:  
                                        y = float(w.get('top', 0))  
                                        if current_y is None or abs(y - current_y) > 3:  
                                            if line_words:  
                                                line_words.sort(key=lambda ww: float(ww.get('x0', 0)))  
                                                lines.append(' '.join(ww['text'] for ww in line_words))  
                                            line_words = [w]  
                                            current_y = y  
                                        else:  
                                            line_words.append(w)  
                                    if line_words:  
                                        line_words.sort(key=lambda ww: float(ww.get('x0', 0)))  
                                        lines.append(' '.join(ww['text'] for ww in line_words))  
                                    if lines:  
                                        texts.extend(lines)  
                                else:  
                                    t = page.extract_text() or ''  
                                    if t.strip():  
                                        texts.append(t)  
                            except Exception:  
                                continue  
                        txt = '\n'.join(t for t in texts if t is not None)  
                        if txt and txt.strip():  
                            return txt  
        except Exception:  
            pass  
        
        # 2) pdfminer (standalone)  
        try:  
            if pdfminer_extract_text:  
                txt = pdfminer_extract_text(io.BytesIO(content)) or ''  
                if txt.strip():  
                    return txt  
        except Exception:  
            pass  
        
        # 3) Poppler pdftotext  
        txt = extract_pdf_with_pdftotext(content, layout=True)  
        if txt:  
            return txt  
        
        # Try -raw as a second attempt (helps some PDFs)  
        txt = extract_pdf_with_pdftotext(content, layout=False)  
        if txt:  
            return txt  
        
        # 4) pypdf  
        txt = extract_pdf_with_pypdf(content)  
        if txt:  
            return txt  
        
        # 5) PyMuPDF text  
        txt = extract_pdf_with_pymupdf(content)  
        if txt:  
            return txt  
        
        # 6) Repair/decrypt then retry core extractors  
        repaired = repair_pdf_with_qpdf(content)  
        if repaired:  
            txt = extract_pdf_with_pdftotext(repaired, layout=True) or extract_pdf_with_pdftotext(repaired, layout=False)  
            if txt:  
                return txt  
            txt = extract_pdf_with_pypdf(repaired) or extract_pdf_with_pymupdf(repaired)  
            if txt:  
                return txt  
        
        # If we thought it had text but none of the above worked, fall through to OCR as a last resort  
        ocr = ocr_pdf_best_effort(content, max_pages=min(10, stats.get('pages') or 10), lang="eng")  
        if ocr and ocr.strip():  
            return "[OCR]\n" + ocr  
        return "[PDF Content - Error extracting text]"  

    # If it looks like image-only (no real text detected), OCR immediately  
    if stats.get('pages') is not None and stats['text_pages'] == 0:  
        ocr = ocr_pdf_best_effort(content, max_pages=min(20, stats['pages']), lang="eng")  
        if ocr and ocr.strip():  
            return "[OCR]\n" + ocr  
        # Provide a clearer note when OCR is unavailable  
        if HAVE_OCRMYPDF or HAVE_TESS:  
            return "[PDF appears image-only; OCR attempted but no text was produced]"  
        else:  
            return "[PDF appears image-only; OCR not available on this cluster]"  

    # If we couldn't classify, try the extractors anyway  
    txt = extract_pdf_with_pdftotext(content, layout=True) or extract_pdf_with_pypdf(content) or extract_pdf_with_pymupdf(content)  
    if txt and txt.strip():  
        return txt  
    
    repaired = repair_pdf_with_qpdf(content)  
    if repaired:  
        txt = extract_pdf_with_pdftotext(repaired, layout=True) or extract_pdf_with_pypdf(repaired) or extract_pdf_with_pymupdf(repaired)  
        if txt and txt.strip():  
            return txt  
    
    # Final OCR attempt  
    ocr = ocr_pdf_best_effort(content, max_pages=10, lang="eng")  
    if ocr and ocr.strip():  
        return "[OCR]\n" + ocr  
    
    return "[PDF Content - Error extracting text]"  


def clean_text(text):
    if not isinstance(text, str):
        return text
    
    cleaned = re.sub(r'<%.*?%>', '', text, flags=re.DOTALL)
    cleaned = cleaned.replace('|', '\n')
    cleaned = re.sub(r'\n{3,}', '\n\n', cleaned)
    cleaned = re.sub(r'\n+', '\n', cleaned)
    return cleaned.strip()

def extract_text_from_binary(content: bytes, content_type: str):
    if not content or len(content) < 1:
        return None
    
    try:
        if content_type == 'application/pdf':
            return parse_pdf(content)
        elif content_type in ('application/vnd.openxmlformats-officedocument.wordprocessingml.document',):
            return extract_text_from_docx(content)
        elif content_type in ('application/vnd.ms-excel', 'application/vnd.openxmlformats-officedocument.spreadsheetml.sheet'):
            return extract_text_from_excel(content)
        elif content_type == 'text/rtf':
            try:
                return rtf_to_text(content.decode('latin-1', errors='ignore'))
            except Exception:
                return None
        elif content_type in ('text/html', 'text/xml', 'application/xhtml+xml'):
            txt, enc, ratio = guess_text(content)
            if txt:
                soup = BeautifulSoup(txt, 'html.parser')
                return soup.get_text(separator='\n', strip=True)
            return None
        elif content_type.startswith('text/'):
            txt, enc, ratio = guess_text(content)
            return clean_text(txt) if txt else None
        elif content_type in ('application/msword',):
            return extract_text_from_ole_doc(content)
        elif content_type in ('application/vnd.ms-powerpoint',):
            return extract_text_from_ole_ppt(content)
        elif content_type in ('application/vnd.ms-outlook',):
            return extract_text_from_msg(content)
        elif content_type in ('application/x-ole-storage',):
            refined = refine_mime_with_ole(content_type, content)
            if refined != content_type:
                return extract_text_from_binary(content, refined)
            return None
        else:
            return None
    except Exception as e:
        return f"[Binary Content - Extraction Error: {str(e)}]"

# Fix C: Updated parse_blob_content with OCR support
def parse_blob_content(content: bytes, provided_type=None):
    if not content:
        return None, None, None
    
    content_type = provided_type or detect_mime(content)
    content_type = refine_mime_with_ole(content_type, content)
    
    # OCR images
    if content_type and content_type.startswith('image/'):
        txt = ocr_image_best_effort(content, lang="eng")
        if txt and txt.strip():
            return clean_text(txt), content_type, "utf-8"
        else:
            return ("[Image appears to require OCR; OCR {}]".format(
                        "not available" if not HAVE_TESS else "attempted but produced no text"
                    ),
                    content_type, None)
    
    if content_type == 'application/zip':
        return f"[{content_type} Content]", content_type, None
    
    extracted_text = extract_text_from_binary(content, content_type)
    if extracted_text and isinstance(extracted_text, str):
        return clean_text(extracted_text), content_type, 'utf-8'
    
    decoded, best_enc, ratio = guess_text(content)
    if not decoded:
        return f"[Binary data, unable to decode. Best printable ratio: {ratio:.2f}]", content_type, None
    
    if content_type == "text/rtf":
        return rtf_to_text(decoded), content_type, best_enc
    elif content_type in ["text/html", "text/xml", "application/xhtml+xml"]:
        soup = BeautifulSoup(decoded, 'html.parser')
        return soup.get_text(separator='\n', strip=True), content_type, best_enc
    else:
        return clean_text(decoded), content_type, best_enc

def safe_numeric(value, default=None):
    if value is None or value == '':
        return default
    try:
        if isinstance(value, (int, float)):
            return int(value)
        return int(float(str(value)))
    except Exception:
        return default

def create_result_dict(row, decompressed=None, content_type=None, encoding=None, blob_text=None, status="Error"):
    event_id = safe_numeric(row['EVENT_ID'])
    updt_id = safe_numeric(row['UPDT_ID'])
    updt_task = safe_numeric(row['UPDT_TASK'])
    updt_cnt = safe_numeric(row['UPDT_CNT'])
    updt_applctx = safe_numeric(row['UPDT_APPLCTX'])
    
    binary_size = len(decompressed) if isinstance(decompressed, (bytes, bytearray)) else None
    text_length = len(blob_text) if isinstance(blob_text, str) else None
    
    def safe_encode(text):
        if text is None:
            return None
        if isinstance(text, str):
            return text.encode('utf-8', errors='ignore').decode('utf-8')
        try:
            return str(text)
        except Exception:
            return "[Non-string content]"
    
    return {
        "EVENT_ID": event_id,
        "VALID_UNTIL_DT_TM": row['VALID_UNTIL_DT_TM'],
        "VALID_FROM_DT_TM": row['VALID_FROM_DT_TM'],
        "UPDT_DT_TM": row['UPDT_DT_TM'],
        "UPDT_ID": updt_id,
        "UPDT_TASK": updt_task,
        "UPDT_CNT": updt_cnt,
        "UPDT_APPLCTX": updt_applctx,
        "LAST_UTC_TS": row['LAST_UTC_TS'],
        "ADC_UPDT": row['ADC_UPDT'],
        "BLOB_BINARY": bytes(decompressed) if isinstance(decompressed, (bytes, bytearray)) else None,
        "CONTENT_TYPE": content_type,
        "ENCODING": encoding,
        "BLOB_TEXT": safe_encode(blob_text),
        "BINARY_SIZE": binary_size,
        "TEXT_LENGTH": text_length,
        "STATUS": str(status),
        "anon_text": None
    }



udf_output_schema = T.StructType([
    T.StructField("EVENT_ID", T.LongType(), True),
    T.StructField("VALID_UNTIL_DT_TM", T.TimestampType(), True),
    T.StructField("VALID_FROM_DT_TM", T.TimestampType(), True),
    T.StructField("UPDT_DT_TM", T.TimestampType(), True),
    T.StructField("UPDT_ID", T.LongType(), True),
    T.StructField("UPDT_TASK", T.LongType(), True),
    T.StructField("UPDT_CNT", T.LongType(), True),
    T.StructField("UPDT_APPLCTX", T.LongType(), True),
    T.StructField("LAST_UTC_TS", T.TimestampType(), True),
    T.StructField("ADC_UPDT", T.TimestampType(), True),
    T.StructField("BLOB_BINARY", T.BinaryType(), True),
    T.StructField("CONTENT_TYPE", T.StringType(), True),
    T.StructField("ENCODING", T.StringType(), True),
    T.StructField("BLOB_TEXT", T.StringType(), True),
    T.StructField("BINARY_SIZE", T.LongType(), True),
    T.StructField("TEXT_LENGTH", T.LongType(), True),
    T.StructField("STATUS", T.StringType(), True),
    T.StructField("anon_text", T.StringType(), True)
])

def _build_result_for_event(first_row: dict, chunks: list):  
    """  
    Pure function that builds a single output row (dict) for an EVENT_ID.  
    'first_row' is a plain dict of the group's first row (no pandas objects).  
    'chunks' is a list of bytes (blob chunks) for this event.  
    """  
    event_id = safe_numeric(first_row.get('EVENT_ID'))  
  
    # Aggregate chunk info  
    total_blob_length = sum(len(c) for c in chunks if c is not None)  
    chunk_count = len(chunks)  
    compression_cd = first_row.get('COMPRESSION_CD')  
  
    # Enforce compressed size  
    if total_blob_length > MAX_BLOB_SIZE:  
        return create_result_dict(first_row, status=f"Compressed Too Large: {total_blob_length} bytes")  
  
    # Combine and decompress  
    blob_contents = combine_blob_chunks(chunks)  
    decompressed, dec_err = decompress_blob_improved(  
        blob_contents,  
        compression_cd,  
        chunk_count,  
        False  # has_duplicates  
    )  
    if decompressed is None:  
        return create_result_dict(first_row, status=dec_err or "Decompression returned None")  
  
    # Enforce decompressed size  
    if isinstance(decompressed, (bytes, bytearray)) and len(decompressed) > MAX_BLOB_SIZE:  
        return create_result_dict(first_row, status=f"Decompressed too large: {len(decompressed)} bytes")  
  
    # MIME + parse  
    content_type = detect_mime(decompressed)  
    content_type = refine_mime_with_ole(content_type, decompressed)  
    blob_text, detected_type, encoding = parse_blob_content(decompressed, content_type)  
    if detected_type:  
        content_type = detected_type  
  

    if blob_text:
        # Check for known error indicators
        if isinstance(blob_text, str):
            if blob_text.startswith("[Binary Content - "):
                status = 'Binary extraction error'
            elif blob_text.startswith("[PDF Content - Error"):
                status = 'PDF extraction error'
            elif blob_text.startswith("[Binary data, unable to decode"):
                status = 'Unable to decode binary'
            elif blob_text.startswith("[LZW failed all attempts - aggressive"):
                status = 'LZW Failed all attempts'
            elif blob_text.startswith("[LZW decompression failed"):
                status = 'LZW decompression failed'
            elif blob_text.startswith("[PDF appears image-only"):
                status = 'PDF appears image-only'
            elif blob_text.startswith("[Image appears to require OCR"):
                status = 'Image appears to require OCR'            
            elif blob_text.startswith("Error: 'charmap'"):
                status = 'Charmapping Error'  
            elif blob_text.startswith("Error:"):
                status = 'Error:' + blob_text[:200]  
            elif blob_text.startswith("[") and "appears" in blob_text and "]" in blob_text[:200]:
                status = 'Error: ' + blob_text[:200]
            elif blob_text.startswith("[OCR]"):
                status = 'Decoded'
            else:
                status = 'Decoded'
        else:
            status = 'Decoded'
    else:
        status = 'Failed to decode'

  
    # Safe encode  
    if blob_text is not None and isinstance(blob_text, str):  
        blob_text = blob_text.encode('utf-8', errors='ignore').decode('utf-8')  
  
    return create_result_dict(  
        first_row,  
        decompressed=decompressed,  
        content_type=content_type,  
        encoding=encoding,  
        blob_text=blob_text,  
        status=str(status)  
    )  

# Fix B: Updated process_blob_batch to use process-based timeout
@pandas_udf(returnType=udf_output_schema, functionType=PandasUDFType.GROUPED_MAP)  
def process_blob_batch(pdf):  
    """  
    Pandas UDF to process a batch of blob records.  
    Each group represents one EVENT_ID with all its chunks.  
    Uses process-based timeout instead of SIGALRM.  
    """  
    results = []  
  
    for _, group_data in pdf.groupby('EVENT_ID'):  
        first_row = group_data.iloc[0]  
        event_id = first_row['EVENT_ID']  
  
        try:  
            chunks = []  
            for _, row in group_data.iterrows():  
                if pd.notna(row.get('BLOB_CONTENTS')):  
                    chunks.append(row['BLOB_CONTENTS'])  
  
            event_timeout_seconds = int(globals().get('EVENT_TIMEOUT_SECONDS', 240) or 0)  
  
            # Convert first_row to a plain dict so it's picklable for subprocess  
            first_row_dict = {k: (v.item() if hasattr(v, "item") else v) for k, v in first_row.to_dict().items()}  
  
            if event_timeout_seconds > 0:  
                try:  
                    result = run_with_event_timeout(event_timeout_seconds, _build_result_for_event, first_row_dict, chunks)  
                except TimeoutException:  
                    result = {  
                        "EVENT_ID": event_id,  
                        "VALID_UNTIL_DT_TM": first_row.get('VALID_UNTIL_DT_TM'),  
                        "VALID_FROM_DT_TM": first_row.get('VALID_FROM_DT_TM'),  
                        "UPDT_DT_TM": first_row.get('UPDT_DT_TM'),  
                        "UPDT_ID": safe_numeric(first_row.get('UPDT_ID')),  
                        "UPDT_TASK": safe_numeric(first_row.get('UPDT_TASK')),  
                        "UPDT_CNT": safe_numeric(first_row.get('UPDT_CNT')),  
                        "UPDT_APPLCTX": safe_numeric(first_row.get('UPDT_APPLCTX')),  
                        "LAST_UTC_TS": first_row.get('LAST_UTC_TS'),  
                        "ADC_UPDT": first_row.get('ADC_UPDT'),  
                        "BLOB_BINARY": None,  
                        "CONTENT_TYPE": None,  
                        "ENCODING": None,  
                        "BLOB_TEXT": None,  
                        "BINARY_SIZE": None,  
                        "TEXT_LENGTH": None,  
                        "STATUS": f"Timeout after {event_timeout_seconds}s",  
                        "anon_text": None  
                    }  
            else:  
                result = _build_result_for_event(first_row_dict, chunks)  
  
            results.append(result)  
  
        except Exception as e:  
            results.append({  
                "EVENT_ID": event_id,  
                "VALID_UNTIL_DT_TM": first_row.get('VALID_UNTIL_DT_TM'),  
                "VALID_FROM_DT_TM": first_row.get('VALID_FROM_DT_TM'),  
                "UPDT_DT_TM": first_row.get('UPDT_DT_TM'),  
                "UPDT_ID": safe_numeric(first_row.get('UPDT_ID')),  
                "UPDT_TASK": safe_numeric(first_row.get('UPDT_TASK')),  
                "UPDT_CNT": safe_numeric(first_row.get('UPDT_CNT')),  
                "UPDT_APPLCTX": safe_numeric(first_row.get('UPDT_APPLCTX')),  
                "LAST_UTC_TS": first_row.get('LAST_UTC_TS'),  
                "ADC_UPDT": first_row.get('ADC_UPDT'),  
                "BLOB_BINARY": None,  
                "CONTENT_TYPE": None,  
                "ENCODING": None,  
                "BLOB_TEXT": None,  
                "BINARY_SIZE": None,  
                "TEXT_LENGTH": None,  
                "STATUS": f"Error: {str(e)}",  
                "anon_text": None  
            })  
  
    return pd.DataFrame(results)


def run_parallel_processing(batch_limit=100000, cutoff_date=None):
    """Main function to run parallel blob processing with incremental ADC_UPDT processing"""
    
    script_start = time.time()
    
    print("="*80)
    print("PARALLEL BLOB PROCESSOR - INCREMENTAL MODE")
    print(f"Processing batch of up to {batch_limit:,} new events")
    print("="*80)
    
    # Step 1: Get the cutoff date if not provided
    if cutoff_date is None:
        cutoff_date = get_max_adc_updt(TARGET_TABLE, default_dt=datetime(1980, 1, 1))
    print(f"[{datetime.now().strftime('%H:%M:%S')}] Processing events with ADC_UPDT > {cutoff_date}")
    
    # Get new EVENT_IDs from source table
    print(f"[{datetime.now().strftime('%H:%M:%S')}] Finding new events...")
    new_events = (spark.table(SOURCE_TABLE)
                  .filter(F.col("ADC_UPDT") > cutoff_date)
                  .select("EVENT_ID")
                  .distinct())
    
    new_count = new_events.count()
    if new_count == 0:
        print("No new events to process")
        return False
    
    print(f"Found {new_count:,} new events total")
    
    # Inspect more candidates than we will actually process to build a bytes-based batch
    candidate_limit = min(max(batch_limit * CANDIDATE_MULTIPLIER, batch_limit), new_count)
    print(f"Pulling {candidate_limit:,} candidate EVENT_IDs for prefiltering...")
    
    # Order by ADC_UPDT to process oldest first
    candidates = (spark.table(SOURCE_TABLE)
                  .filter(F.col("ADC_UPDT") > cutoff_date)
                  .select("EVENT_ID", "ADC_UPDT")
                  .distinct()
                  .orderBy("ADC_UPDT", "EVENT_ID")
                  .limit(candidate_limit)
                  .select("EVENT_ID")
                  .cache())
    _ = candidates.count()

    # Step 2: Read only metadata needed for dedup + sizing (NO BLOB_CONTENTS)
    META_COLS = [
        "EVENT_ID", "BLOB_SEQ_NUM",
        "VALID_UNTIL_DT_TM", "VALID_FROM_DT_TM",
        "UPDT_DT_TM", "UPDT_ID", "UPDT_TASK", "UPDT_CNT", "UPDT_APPLCTX",
        "LAST_UTC_TS", "ADC_UPDT", "COMPRESSION_CD", "BLOB_LENGTH"
    ]
    
    print(f"[{datetime.now().strftime('%H:%M:%S')}] Loading lightweight metadata for candidates...")
    raw_meta = (spark.table(SOURCE_TABLE)
            .join(F.broadcast(candidates), on="EVENT_ID", how="inner")
            .select(*[c for c in META_COLS if c in spark.table(SOURCE_TABLE).columns]))
    
    # Ensure BLOB_LENGTH is present and long
    raw_meta = raw_meta.withColumn(
        "chunk_size",
        F.coalesce(F.col("BLOB_LENGTH").cast("long"), F.lit(0))
    )
    
    # Step 3: Deduplicate on metadata only (most recent version per EVENT_ID, BLOB_SEQ_NUM)
    print(f"[{datetime.now().strftime('%H:%M:%S')}] Deduplicating metadata...")
    w_temporal = Window.partitionBy("EVENT_ID", "BLOB_SEQ_NUM").orderBy(
        F.col("VALID_UNTIL_DT_TM").desc(),
        F.col("UPDT_DT_TM").desc(),
        F.col("LAST_UTC_TS").desc()
    )
    
    raw_meta_deduped = (raw_meta
                    .withColumn("version_rank", F.row_number().over(w_temporal))
                    .filter(F.col("version_rank") == 1)
                    .drop("version_rank")
                    .cache())
    _ = raw_meta_deduped.count()
    
    # Step 4: Per-event sizes and early filtering
    print(f"[{datetime.now().strftime('%H:%M:%S')}] Calculating per-event compressed sizes (from BLOB_LENGTH)...")
    event_sizes = (raw_meta_deduped
               .groupBy("EVENT_ID")
               .agg(
                   F.sum("chunk_size").alias("total_compressed_size"),
                   F.count("*").alias("chunk_count"),
                   F.max("ADC_UPDT").alias("event_adc_updt")
               )
               .cache())
    _ = event_sizes.count()
    
    # Track oversized events (we'll skip them but note them)
    oversized_events_df = event_sizes.filter(F.col("total_compressed_size") > MAX_BLOB_SIZE).select("EVENT_ID")
    oversized_count = oversized_events_df.count()
    if oversized_count > 0:
        print(f"[{datetime.now().strftime('%H:%M:%S')}] Skipping {oversized_count} oversized events")
        oversized_events_df.unpersist()
    
    # Keep only within-size candidates
    within_size = event_sizes.filter(F.col("total_compressed_size") <= MAX_BLOB_SIZE)
    
    # Step 5: Build byte-aware batches from within-size candidates
    print(f"[{datetime.now().strftime('%H:%M:%S')}] Assigning byte-aware batches...")
    cumulative_window = Window.orderBy("event_adc_updt", "EVENT_ID").rowsBetween(Window.unboundedPreceding, Window.currentRow)
    
    batched_events = (within_size
                  .withColumn("cumulative_size", F.sum("total_compressed_size").over(cumulative_window))
                  .withColumn("cumulative_count", F.row_number().over(Window.orderBy("event_adc_updt", "EVENT_ID")))
                  .withColumn("size_batch", F.floor(F.col("cumulative_size") / BATCH_SIZE_BYTES).cast("int"))
                  .withColumn("count_batch", F.floor((F.col("cumulative_count") - 1) / MAX_BATCH_SIZE).cast("int"))
                  .withColumn("batch_id", F.greatest("size_batch", "count_batch"))
                  .cache())
    _ = batched_events.count()
    
    # We only process the first batch_id this iteration
    first_batch_id = batched_events.agg(F.min("batch_id").alias("min_batch")).collect()[0]["min_batch"]
    if first_batch_id is None:
        print("No within-size events to process in this iteration.")
        return True
    
    last_batch_id = first_batch_id + MAX_PARALLEL_BATCHES - 1
    chosen_events = (
        batched_events
        .filter((F.col("batch_id") >= first_batch_id) & (F.col("batch_id") <= last_batch_id))
        .select("EVENT_ID")
        .cache()
    )
    chosen_count = chosen_events.count()
    print(f"Selected {chosen_count:,} events across batch_ids [{first_batch_id}, {last_batch_id}]")
    
    # Step 6: Filter source by chosen EVENT_IDs, then deduplicate with the same window
    print(f"[{datetime.now().strftime('%H:%M:%S')}] Fetching chosen rows (with BLOB_CONTENTS) and deduplicating...")
    
    META_AND_BLOB_COLS = [
        "EVENT_ID", "BLOB_SEQ_NUM",
        "VALID_UNTIL_DT_TM", "VALID_FROM_DT_TM",
        "UPDT_DT_TM", "UPDT_ID", "UPDT_TASK", "UPDT_CNT", "UPDT_APPLCTX",
        "LAST_UTC_TS", "ADC_UPDT", "COMPRESSION_CD", "BLOB_CONTENTS"
    ]
    
    # Get all rows for chosen EVENT_IDs
    source_filtered = (
        spark.table(SOURCE_TABLE)
        .join(F.broadcast(chosen_events), on="EVENT_ID", how="inner")
        .select(*META_AND_BLOB_COLS)
    )
    
    # Apply the same deduplication window
    w_temporal = (
        Window.partitionBy("EVENT_ID", "BLOB_SEQ_NUM")
        .orderBy(F.col("VALID_UNTIL_DT_TM").desc(),
                 F.col("UPDT_DT_TM").desc(),
                 F.col("LAST_UTC_TS").desc())
    )
    
    source_deduped = (
        source_filtered
        .withColumn("version_rank", F.row_number().over(w_temporal))
        .filter(F.col("version_rank") == 1)
        .drop("version_rank")
        .cache()
    )
    
    deduped_count = source_deduped.count()
    print(f"[{datetime.now().strftime('%H:%M:%S')}] Deduped source rows: {deduped_count:,}")
    
    if deduped_count == 0:
        print(f"[{datetime.now().strftime('%H:%M:%S')}] WARNING: No rows after deduplication. Skipping this batch.")
        # Clean up and return True to continue with next batch
        for df in [source_deduped, chosen_events, batched_events, event_sizes, raw_meta_deduped, raw_meta, candidates]:
            try:
                df.unpersist()
            except:
                pass
        return True
    
    final_data = (
        source_deduped
        .orderBy("EVENT_ID", F.col("BLOB_SEQ_NUM").asc_nulls_last())
    )
    
    # Step 7: Parallel processing via Pandas UDF
    default_shuffle_partitions = int(spark.conf.get("spark.sql.shuffle.partitions", "200"))
    optimal_partitions = max(200, default_shuffle_partitions * 2, chosen_count // 10 + 1)
    print(f"[{datetime.now().strftime('%H:%M:%S')}] Repartitioning to {optimal_partitions} partitions...")
    final_data = final_data.repartition(optimal_partitions, "EVENT_ID").persist()
    final_count = final_data.count()
    print(f"[{datetime.now().strftime('%H:%M:%S')}] Final dataset ready: {final_count} rows for processing")
    
    print(f"[{datetime.now().strftime('%H:%M:%S')}] Starting processing...")
    processing_start = time.time()
    
    processed_df = (final_data
        .groupBy("EVENT_ID")
        .apply(process_blob_batch))
    
    final_columns = [
        "EVENT_ID", "VALID_UNTIL_DT_TM", "VALID_FROM_DT_TM", "UPDT_DT_TM",
        "UPDT_ID", "UPDT_TASK", "UPDT_CNT", "UPDT_APPLCTX",
        "LAST_UTC_TS", "ADC_UPDT", "BLOB_BINARY", "CONTENT_TYPE",
        "ENCODING", "BLOB_TEXT", "BINARY_SIZE", "TEXT_LENGTH",
        "STATUS", "anon_text"
    ]
    processed_df = processed_df.select(*final_columns).cache()
    processed_count = processed_df.count()
    processing_time = time.time() - processing_start
    print(f"[{datetime.now().strftime('%H:%M:%S')}] Processed {processed_count} events (took {processing_time:.2f}s)")
    
    # Check if we have any results to write
    if processed_count == 0:
        print(f"[{datetime.now().strftime('%H:%M:%S')}] No processed rows; skipping write.")
        # Clean up
        for df in [processed_df, final_data, source_deduped, chosen_events, batched_events, event_sizes, raw_meta_deduped, raw_meta, candidates]:
            try:
                df.unpersist()
            except:
                pass
        return True  # Return True to continue with next batch
    
    # Step 8: Stats and write
    print(f"[{datetime.now().strftime('%H:%M:%S')}] Gathering statistics...")
    stats_start = time.time()
    stats_df = processed_df.agg(
        F.count("*").alias("total_processed"),
        F.sum(F.when(F.col("STATUS") == "Decoded", 1).otherwise(0)).alias("successful"),
        F.sum(F.when(F.col("STATUS") != "Decoded", 1).otherwise(0)).alias("failed"),
        F.sum("BINARY_SIZE").alias("total_decompressed_bytes")
    ).collect()[0]
    stats_time = time.time() - stats_start
    
    print(f"[{datetime.now().strftime('%H:%M:%S')}] Writing results to {TARGET_TABLE}...")
    write_start = time.time()
    write_partitions = min(max(50, chosen_count // 100 + 1), 400)
    print(f"[{datetime.now().strftime('%H:%M:%S')}] Using {write_partitions} partitions for write...")
    
    write_success = False
    try:
        (processed_df
            .repartition(write_partitions)
            .write
            .mode("append")
            .option("mergeSchema", "false")
            .insertInto(TARGET_TABLE))
        write_success = True
        write_time = time.time() - write_start
        print(f"[{datetime.now().strftime('%H:%M:%S')}] Write completed (took {write_time:.2f}s)")
    except Exception as e:
        print(f"[{datetime.now().strftime('%H:%M:%S')}] Write failed: {e}")
        write_time = 0
    
    total_time = time.time() - script_start
    print("\n" + "="*80)
    print("BATCH PROCESSING COMPLETE")
    print("="*80)
    print(f"Total time: {total_time:.2f}s")
    print(f"Processed: {stats_df['total_processed']}, successful: {stats_df['successful']}, failed: {stats_df['failed']}")
    if stats_df['total_decompressed_bytes']:
        print(f"Total decompressed size: {stats_df['total_decompressed_bytes']/(1024**3):.2f} GB")
    print("="*80)
    
    # Cleanup
    for df in [processed_df, final_data, source_deduped, chosen_events, batched_events, event_sizes, raw_meta_deduped, raw_meta, candidates]:
        try:
            df.unpersist()
        except:
            pass
    
    return write_success


def process_all_pending_blobs(batch_size=100000, max_iterations=None):
    """
    Process all new blobs incrementally based on ADC_UPDT timestamp.
    Processes events from source table where ADC_UPDT > max(ADC_UPDT) from target table.
    
    Args:
        batch_size: Number of events to process in each batch
        max_iterations: Maximum number of iterations (None for unlimited)
    """
    iteration = 0
    total_start = time.time()
    
    print("="*80)
    print("STARTING INCREMENTAL BLOB PROCESSING")
    print(f"Batch size: {batch_size:,} events per iteration")
    print(f"Max iterations: {max_iterations if max_iterations else 'Unlimited'}")
    print("="*80)
    
    # Get initial cutoff date
    initial_cutoff = get_max_adc_updt(TARGET_TABLE, default_dt=datetime(1980, 1, 1))
    print(f"\nInitial cutoff date: {initial_cutoff}")
    
    # Check initial count of new events
    initial_new = (spark.table(SOURCE_TABLE)
                   .filter(F.col("ADC_UPDT") > initial_cutoff)
                   .select("EVENT_ID")
                   .distinct()
                   .count())
    print(f"Initial new events to process: {initial_new:,}")
    
    if initial_new == 0:
        print("No new events to process!")
        return
    
    successful_iterations = 0
    failed_iterations = 0
    total_events_processed = 0
    consecutive_failures = 0
    max_consecutive_failures = 5  # Stop after 5 consecutive failures
    last_cutoff = initial_cutoff
    
    while True:
        iteration += 1
        
        # Check if we've hit max iterations
        if max_iterations and iteration > max_iterations:
            print(f"\nReached maximum iterations ({max_iterations}). Stopping.")
            break
        
        print(f"\n{'='*80}")
        print(f"ITERATION {iteration}")
        print(f"{'='*80}")
        
        # Get current cutoff date
        current_cutoff = get_max_adc_updt(TARGET_TABLE, default_dt=datetime(1980, 1, 1))
        print(f"Current cutoff date: {current_cutoff}")
        
        # Check remaining count
        remaining_count = (spark.table(SOURCE_TABLE)
                          .filter(F.col("ADC_UPDT") > current_cutoff)
                          .select("EVENT_ID")
                          .distinct()
                          .count())
        print(f"Remaining new events: {remaining_count:,}")
        
        if remaining_count == 0:
            print("All new events have been processed!")
            break
        
        # Check if we're making progress
        if current_cutoff == last_cutoff and iteration > 1:
            print("No progress made in last iteration (cutoff date unchanged)")
            consecutive_failures += 1
            if consecutive_failures >= max_consecutive_failures:
                print(f"Too many iterations without progress. Stopping.")
                break
        else:
            last_cutoff = current_cutoff
        
        # Run processing for this batch
        try:
            success = run_parallel_processing(batch_limit=batch_size, cutoff_date=current_cutoff)
            
            if success is False:  # No more new events
                print("No more new events to process")
                break
            elif success:
                successful_iterations += 1
                consecutive_failures = 0  # Reset consecutive failure counter
                
                # Get the new cutoff after processing
                new_cutoff = get_max_adc_updt(TARGET_TABLE, default_dt=datetime(1980, 1, 1))
                new_remaining = (spark.table(SOURCE_TABLE)
                                .filter(F.col("ADC_UPDT") > new_cutoff)
                                .select("EVENT_ID")
                                .distinct()
                                .count())
                events_processed = remaining_count - new_remaining
                total_events_processed += events_processed
                
                print(f"\nIteration {iteration} completed successfully")
                print(f"Events processed in this iteration: ~{events_processed:,}")
                print(f"Cutoff date advanced from {current_cutoff} to {new_cutoff}")
            else:
                failed_iterations += 1
                consecutive_failures += 1
                print(f"\nIteration {iteration} failed")
                
                if consecutive_failures >= max_consecutive_failures:
                    print(f"\nToo many consecutive failures ({consecutive_failures}). Stopping.")
                    break
        
        except Exception as e:
            failed_iterations += 1
            consecutive_failures += 1
            print(f"\nIteration {iteration} encountered an error: {e}")
            import traceback
            traceback.print_exc()
            
            if consecutive_failures >= max_consecutive_failures:
                print(f"\nToo many consecutive failures ({consecutive_failures}). Stopping.")
                break
            
            # Decide whether to continue or stop on error
            if iteration == 1:
                # If first iteration fails, likely a configuration issue
                print("\nFirst iteration failed. Stopping.")
                break
            else:
                print("\nContinuing despite error...")
                # Add longer delay after error
                time.sleep(10)
                continue
        
        # Add a small delay between iterations to avoid overwhelming the system
        if remaining_count > batch_size:  # More work to do
            print(f"\nWaiting 5 seconds before next iteration...")
            time.sleep(5)
    
    # Final summary
    total_time = time.time() - total_start
    print("\n" + "="*80)
    print("INCREMENTAL PROCESSING SUMMARY")
    print("="*80)
    print(f"Total iterations: {iteration}")
    print(f"Successful iterations: {successful_iterations}")
    print(f"Failed iterations: {failed_iterations}")
    print(f"Total events processed: ~{total_events_processed:,}")
    print(f"Total processing time: {total_time:.2f} seconds ({total_time/60:.1f} minutes)")
    print(f"Final cutoff date: {get_max_adc_updt(TARGET_TABLE, default_dt=datetime(1980, 1, 1))}")
    print("="*80)

In [0]:
# Execute the full processing
if __name__ == "__main__":
    # Process all pending blobs in batches of 100,000
    process_all_pending_blobs(batch_size=50000, max_iterations=5)