In [None]:
import torch
import asyncio
import concurrent.futures
import pandas as pd
import numpy as np
import logging
import GPUtil
import teradatasql
import inflect
import re
from datetime import datetime
from typing import List, Dict, Any
from itertools import chain
from queue import Queue
from threading import Event, Thread


In [None]:
# Configure logging
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(levelname)s - %(message)s'
)

# Initialize inflect engine
p = inflect.engine()

# Teradata connection parameters
TERADATA_CONFIG = {
    'host': 'hostname',
    'user': 'user_id',
    'password': 'password',
    'logmech': 'LDAP'
}

In [None]:

# Table names
SOURCE_TABLE = "source_table"
TARGET_TABLE = "xyz"
ERROR_LOG_TABLE = "error_log"


In [None]:
class GPUMonitor:
    @staticmethod
    def get_gpu_memory_usage(gpu_id: int) -> tuple:
        gpu = GPUtil.getGPUs()[gpu_id]
        return gpu.memoryUsed, gpu.memoryTotal

    @staticmethod
    def calculate_optimal_batch_size(gpu_id: int, current_batch_size: int) -> int:
        used, total = GPUMonitor.get_gpu_memory_usage(gpu_id)
        memory_utilization = used / total

        if memory_utilization > 0.85:
            return max(32, current_batch_size // 2)
        elif memory_utilization < 0.50:
            return min(256, current_batch_size * 2)
        return current_batch_size


In [None]:
class BatchMetrics:
    def __init__(self):
        self.successful_records = 0
        self.failed_records = 0
        self.current_batch = 0
        self.total_batches = 0

    def log_batch_metrics(self, batch_id: int, success_count: int, fail_count: int):
        logging.info(f"""
            Batch {batch_id} Metrics:
            - Successful records: {success_count}
            - Failed records: {fail_count}
            - GPU Memory Usage: {GPUMonitor.get_gpu_memory_usage(0)}
        """)



In [None]:
class DataLoader:
    def __init__(self, batch_size: int = 50000):
        self.batch_size = batch_size
        self.queue = Queue(maxsize=2)
        self.stop_event = Event()

    async def fetch_batch_from_db(self, offset: int) -> pd.DataFrame:
        query = f"""
            SELECT TOP {self.batch_size}
                zi_c_company_id, zi_es_ecid, zi_c_description, zi_industry_primary,
                industries, sub_industries, top3_industries
            FROM {SOURCE_TABLE}
            WHERE row_number > {offset}
            QUALIFY ROW_NUMBER() OVER (ORDER BY zi_c_company_id) > {offset}
        """
        try:
            with teradatasql.connect(**TERADATA_CONFIG) as conn:
                return pd.read_sql(query, conn)
        except Exception as e:
            logging.error(f"Fatal error in database fetch: {e}")
            raise


In [None]:
class NERProcessor:
    def __init__(self, gpu_ids: List[int]):
        self.gpu_ids = gpu_ids
        self.models = {}
        self.batch_size = 128  # Initial batch size
        self.setup_models()

    def setup_models(self):
        model_configs = [
            ("knowledgator/gliner-multitask-large-v0.5", 0.54),
            ("EmergentMethods/gliner_large_news-v2.1", 0.7)
        ]

        for gpu_id in self.gpu_ids:
            self.models[gpu_id] = []
            with torch.cuda.device(gpu_id):
                for model_name, threshold in model_configs:
                    model = GLiNER.from_pretrained(model_name).to(f"cuda:{gpu_id}")
                    model.eval()
                    self.models[gpu_id].append((model, threshold))

    def clean_entity(self, entity: Dict) -> Dict:
        try:
            if entity['label'] == 'year started':
                match = re.search(r'\b(18|19|20)\d{2}\b', entity['text'])
                entity['text'] = match.group(0) if match else ''
            elif entity['label'] not in ['brand']:
                entity['text'] = re.sub(r'[^a-zA-Z\s]', '', entity['text'])
                entity['text'] = p.singular_noun(entity['text']) or entity['text']
        except Exception as e:
            logging.error(f"Error in clean_entity: {e}")
        return entity

    @torch.no_grad()
    async def process_text(self, text: str, gpu_id: int) -> Dict:
        if len(text) < 50:  # Check minimum text length
            return {}

        try:
            entities = []
            for model, threshold in self.models[gpu_id]:
                with torch.cuda.device(gpu_id):
                    batch_entities = model.predict_entities(text, labels, threshold=threshold)
                    entities.extend(batch_entities)

            entities = sorted(
                (self.clean_entity(entity) for entity in entities),
                key=lambda k: (k['label'], -k['score'])
            )

            return self.extract_ner_features(entities)
        except Exception as e:
            logging.error(f"Error processing text on GPU {gpu_id}: {e}")
            # Retry on different GPU if available
            other_gpu = [g for g in self.gpu_ids if g != gpu_id][0]
            return await self.process_text(text, other_gpu)

    def extract_ner_features(self, entities: List[Dict]) -> Dict:
        information = {}
        entity_sets = {}

        for entity in entities:
            label = entity["label"]
            text = entity["text"]

            # Process specific entity types
            if label == "brand":
                text = text.replace('brands', '').replace('brand', '').strip()
            elif label == "industry":
                text = text.lower().replace('industry', '').strip()
            elif label == "product":
                text = text.lower().replace('products', '').replace('product', '').strip()

            if label not in entity_sets:
                entity_sets[label] = set()
            entity_sets[label].add(text)

        # Build information dictionary
        fields = [
            "year started", "product", "brand", "business_categories",
            "business_sub_categories", "industry", "business_services", "offer"
        ]

        for field in fields:
            if field in entity_sets:
                information[field] = " | ".join(entity_sets[field])

        return information

class TeradataManager:
    def __init__(self, config: Dict, batch_size: int = 5000):
        self.config = config
        self.batch_size = batch_size

    async def upload_batch(self, df: pd.DataFrame) -> bool:
        try:
            with teradatasql.connect(**self.config) as conn:
                cursor = conn.cursor()

                data = [tuple(x) for x in df.values]

                for i in range(0, len(data), self.batch_size):
                    batch = data[i:i + self.batch_size]
                    cursor.executemany(self.get_insert_sql(), batch)

                return True
        except Exception as e:
            logging.error(f"Upload error: {e}")
            await self.log_error(df.iloc[0]['zi_c_company_id'], str(e))
            return False

    def get_insert_sql(self) -> str:
        return f"""
        INSERT INTO {TARGET_TABLE} (
            zi_c_company_id, zi_es_ecid, zi_c_description, zi_industry_primary,
            industries, sub_industries, top3_industries, year_started, product,
            brand, industry, business_categories, business_sub_categories,
            business_services, offer
        ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
        """

    async def log_error(self, company_id: str, error_msg: str):
        # Error logging table insert logic (commented out as requested)
        pass

async def main():
    metrics = BatchMetrics()
    gpu_processor = NERProcessor(gpu_ids=[0, 1])
    data_loader = DataLoader()
    db_manager = TeradataManager(TERADATA_CONFIG)

    offset = 0
    total_processed = 0

    try:
        while total_processed < 5_000_000:  # 5 million records
            batch_df = await data_loader.fetch_batch_from_db(offset)
            if batch_df.empty:
                break

            # Process batch
            for gpu_id in gpu_processor.gpu_ids:
                gpu_processor.batch_size = GPUMonitor.calculate_optimal_batch_size(
                    gpu_id, gpu_processor.batch_size
                )

            processed_records = []
            for _, row in batch_df.iterrows():
                if len(row['zi_c_description']) >= 50:
                    result = await gpu_processor.process_text(
                        row['zi_c_description'],
                        gpu_id=total_processed % 2  # Alternate between GPUs
                    )
                    processed_records.append({**row.to_dict(), **result})

            # Prepare for upload
            output_df = pd.DataFrame(processed_records)
            success = await db_manager.upload_batch(output_df)

            # Update metrics
            metrics.log_batch_metrics(
                offset // data_loader.batch_size,
                len(processed_records),
                len(batch_df) - len(processed_records)
            )

            offset += data_loader.batch_size
            total_processed += len(batch_df)

    except Exception as e:
        logging.error(f"Fatal error: {e}")
        raise
    finally:
        logging.info(f"Total records processed: {total_processed}")


In [None]:

if __name__ == "__main__":
    asyncio.run(main())

#  Update 3_8

In [None]:
# Import necessary libraries
import os
import re
import gc
import time
import zlib
import pickle
import socket
import logging
from itertools import chain
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, ArrayType, FloatType
from pyspark.taskcontext import TaskContext

# Configure logging
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(levelname)s - %(message)s',
    filename='ner_processing.log',
    filemode='w'
)
logger = logging.getLogger("NER_Processing")

# Import inflect for singular noun conversion
try:
    import inflect
    p = inflect.engine()
except ImportError:
    logger.warning("Inflect package not found. Singular noun conversion will be skipped.")
    class DummyInflect:
        def singular_noun(self, text):
            return False
    p = DummyInflect()

# Initialize Spark session with optimized configuration
spark = SparkSession.builder \
    .appName("NER Processing") \
    .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer") \
    .config("spark.dynamicAllocation.enabled", "false") \
    .config("spark.sql.files.maxPartitionBytes", "134217728") \
    .config("spark.sql.shuffle.partitions", "200") \
    .config("spark.default.parallelism", "200") \
    .getOrCreate()

sc = spark.sparkContext

# Define schemas
def get_ner_schema():
    """Define the schema for NER entities."""
    return ArrayType(
        StructType([
            StructField("text", StringType(), True),
            StructField("label", StringType(), True),
            StructField("score", FloatType(), True),
            StructField("start", StringType(), True),
            StructField("end", StringType(), True)
        ])
    )

def get_processed_schema(input_schema):
    """Add NER fields to the input schema."""
    fields = list(input_schema.fields)
    if not any(field.name == "entities" for field in fields):
        fields.append(StructField("entities", get_ner_schema(), True))
    return StructType(fields)

def clean_entity(entity):
    """Clean and normalize entity text based on entity label."""
    try:
        if entity['label'] == 'year started':
            match = re.search(r'\b(18|19|20)\d{2}\b', entity['text'])
            entity['text'] = match.group(0) if match else ''
        elif entity['label'] not in ['brand']:
            entity['text'] = re.sub(r'[^a-zA-Z\s]', '', entity['text'])
            entity['text'] = p.singular_noun(entity['text']) or entity['text']
    except Exception as e:
        logger.error(f"Error cleaning entity: {e}")
    return entity

# Load your models
logger.info("Loading NER models")
load_start = time.time()

# Replace these with your actual model loading code
def load_model1():
    # Your model loading code here
    pass

def load_model2():
    # Your model loading code here
    pass

model1 = load_model1()
model2 = load_model2()

# Define labels
label_list = ["PERSON", "ORGANIZATION", "LOCATION"]  # Replace with your actual labels

logger.info(f"Models loaded in {time.time() - load_start:.2f} seconds")

# Compress models to reduce broadcast size
logger.info("Compressing models for broadcast")
compress_start = time.time()

compressed_model1 = zlib.compress(pickle.dumps(model1))
compressed_model2 = zlib.compress(pickle.dumps(model2))

# Log compression stats
original_size1 = len(pickle.dumps(model1)) / (1024 * 1024)
compressed_size1 = len(compressed_model1) / (1024 * 1024)
original_size2 = len(pickle.dumps(model2)) / (1024 * 1024)
compressed_size2 = len(compressed_model2) / (1024 * 1024)

logger.info(f"Model1: {original_size1:.2f}MB -> {compressed_size1:.2f}MB "
           f"({compressed_size1/original_size1*100:.1f}%)")
logger.info(f"Model2: {original_size2:.2f}MB -> {compressed_size2:.2f}MB "
           f"({compressed_size2/original_size2*100:.1f}%)")
logger.info(f"Models compressed in {time.time() - compress_start:.2f} seconds")

# Free up driver memory
del model1
del model2
gc.collect()

# Broadcast variables
logger.info("Broadcasting models to executors")
broadcast_start = time.time()

broadcast_model1 = sc.broadcast(compressed_model1)
broadcast_model2 = sc.broadcast(compressed_model2)
labels = sc.broadcast(label_list)

logger.info(f"Models broadcast in {time.time() - broadcast_start:.2f} seconds")

def process_batch_with_ner(batch, model1, model2, labels_list):
    """
    Process a batch of records with NER models.

    Args:
        batch: List of Row objects to process
        model1: First NER model
        model2: Second NER model
        labels_list: List of valid entity labels

    Returns:
        List of processed records with NER entities
    """
    results = []

    for row in batch:
        try:
            # Convert Row to dictionary for easier manipulation
            record = row.asDict()

            # Extract text from the record - adjust field names as needed
            text = record.get('text', '')
            if not text:
                # Skip records with no text
                results.append(row)
                continue

            # Process with model1
            entities1 = model1.extract_entities(text)

            # Process with model2
            entities2 = model2.extract_entities(text)

            # Clean and combine entities
            entities = sorted(
                (clean_entity(entity) for entity in chain(entities1, entities2)),
                key=lambda k: (k['label'], -k['score'])
            )

            # Filter by labels if needed
            if labels_list:
                entities = [e for e in entities if e['label'] in labels_list]

            # Add entities to the record
            record['entities'] = entities

            # Convert back to Row-like structure and append to results
            from pyspark.sql import Row
            results.append(Row(**record))

        except Exception as e:
            logger.error(f"Error processing record: {str(e)}")
            # Keep the original record in case of error
            results.append(row)

    return results

def extract_ner_entities(iterator):
    """
    Process each partition with batching to improve efficiency.
    Uses broadcast variables that are already in scope.
    """
    # Get the partition ID and executor information for logging
    context = TaskContext.get()
    partition_id = context.partitionId()

    # Get a unique identifier for the executor
    hostname = socket.gethostname()
    pid = str(os.getpid())
    executor_id = f"{hostname}_{pid}"

    start_time = time.time()
    logger.info(f"Starting partition {partition_id} on executor {executor_id}")

    try:
        # Log the decompression of broadcast variables - only once per partition
        logger.info(f"Partition {partition_id}: Starting model decompression")
        decompression_start = time.time()

        # Decompress models - ONLY ONCE PER PARTITION
        model1 = pickle.loads(zlib.decompress(broadcast_model1.value))
        model2 = pickle.loads(zlib.decompress(broadcast_model2.value))
        labels_list = labels.value

        decompression_time = time.time() - decompression_start
        logger.info(f"Partition {partition_id}: Models decompressed in {decompression_time:.2f} seconds")

        # Initialize counters
        record_count = 0
        batch_count = 0
        processing_start = time.time()

        # Process in batches
        batch_size = 100  # Adjust based on your needs
        current_batch = []
        results = []

        for row in iterator:
            record_count += 1
            current_batch.append(row)

            # Process when batch is full
            if len(current_batch) >= batch_size:
                batch_count += 1
                batch_start = time.time()

                try:
                    processed_batch = process_batch_with_ner(current_batch, model1, model2, labels_list)
                    results.extend(processed_batch)

                    batch_time = time.time() - batch_start
                    logger.info(f"Partition {partition_id}: Processed batch {batch_count} ({len(current_batch)} records) in {batch_time:.2f} seconds")
                except Exception as e:
                    logger.error(f"Error processing batch {batch_count} in partition {partition_id}: {str(e)}")
                    # Add original records to results in case of batch failure
                    results.extend(current_batch)

                # Clear the batch
                current_batch = []

        # Process any remaining records in the last batch
        if current_batch:
            batch_count += 1
            batch_start = time.time()

            try:
                processed_batch = process_batch_with_ner(current_batch, model1, model2, labels_list)
                results.extend(processed_batch)

                batch_time = time.time() - batch_start
                logger.info(f"Partition {partition_id}: Processed final batch {batch_count} ({len(current_batch)} records) in {batch_time:.2f} seconds")
            except Exception as e:
                logger.error(f"Error processing final batch in partition {partition_id}: {str(e)}")
                # Add original records to results in case of batch failure
                results.extend(current_batch)

        # Log completion
        total_time = time.time() - start_time
        logger.info(f"Finished partition {partition_id} on {executor_id}. "
                   f"Processed {record_count} records in {total_time:.2f} seconds")

        return iter(results)

    except Exception as e:
        logger.error(f"Major error in partition {partition_id} on {executor_id}: {str(e)}")
        import traceback
        logger.error(f"Traceback: {traceback.format_exc()}")
        # Re-raise to ensure Spark knows about the failure
        raise

# Read input data
input_path = "gs://your-bucket/input_path"  # Replace with your actual input path
logger.info(f"Reading data from {input_path}")
input_df = spark.read.parquet(input_path)

# Log initial DataFrame information
partition_count = input_df.rdd.getNumPartitions()
record_count = input_df.count()
logger.info(f"Input DataFrame has {record_count} records in {partition_count} partitions")

# Calculate optimal partitions based on data size and cluster
executor_instances = int(sc.getConf().get("spark.executor.instances", "1"))
executor_cores = int(sc.getConf().get("spark.executor.cores", "1"))
total_cores = executor_instances * executor_cores

# Target 2-3 tasks per core for good parallelism
optimal_partitions = total_cores * 3

# Ensure we don't create too many partitions for small datasets
if record_count < 10000:
    optimal_partitions = min(optimal_partitions, record_count // 100)

logger.info(f"Calculated optimal partitions: {optimal_partitions}")

if partition_count != optimal_partitions:
    logger.info(f"Repartitioning from {partition_count} to {optimal_partitions} partitions")
    input_df = input_df.repartition(optimal_partitions)

# Get the processed schema
input_schema = input_df.schema
processed_schema = get_processed_schema(input_schema)
logger.info(f"Using processed schema: {processed_schema}")

# Process the DataFrame
logger.info("Processing DataFrame with NER extraction")
processed_rdd = input_df.rdd.mapPartitions(extract_ner_entities)

# Convert back to DataFrame with the correct schema
processed_df = spark.createDataFrame(processed_rdd, processed_schema)

# Force an action to ensure processing happens
processed_count = processed_df.count()
logger.info(f"Processed {processed_count} records")

# Write output to GCP bucket
output_path = "gs://your-bucket/output_path"  # Replace with your actual output path
logger.info(f"Writing results to {output_path}")

# Check if Industry_primary column exists
has_industry = "Industry_primary" in processed_df.columns

if has_industry:
    # Partition by Industry_primary (low cardinality field)
    logger.info("Partitioning output by Industry_primary")

    # Write with partitioning by Industry_primary
    processed_df.write \
        .partitionBy("Industry_primary") \
        .option("compression", "snappy") \
        .option("maxRecordsPerFile", 50000) \
        .mode("overwrite") \
        .parquet(output_path)
else:
    # If Industry_primary doesn't exist, coalesce to 20 files
    logger.info("Industry_primary column not found, coalescing to 20 files")
    processed_df.coalesce(20).write \
        .option("compression", "snappy") \
        .option("maxRecordsPerFile", 50000) \
        .mode("overwrite") \
        .parquet(output_path)

logger.info(f"Results written to {output_path}")