In [None]:
from transformers import pipeline

classifier = pipeline("sentiment-analysis", model="distilbert-base-uncased-finetuned-sst-2-english")
result = classifier("Thanks for the demo. We’ve decided to explore other companies for now.")
print(result)


Device set to use cpu


[{'label': 'POSITIVE', 'score': 0.9960334897041321}]


In [None]:
from transformers import pipeline

classifier = pipeline("zero-shot-classification", model="facebook/bart-large-mnli")

text = "Thanks for the demo. We’ve decided to explore other companies for now"
labels = ["Positive", "Neutral", "Negative"]

result = classifier(text, candidate_labels=labels)
print(result)


config.json: 0.00B [00:00, ?B/s]

model.safetensors:   0%|          | 0.00/1.63G [00:00<?, ?B/s]

tokenizer_config.json:   0%|          | 0.00/26.0 [00:00<?, ?B/s]

vocab.json: 0.00B [00:00, ?B/s]

merges.txt: 0.00B [00:00, ?B/s]

tokenizer.json: 0.00B [00:00, ?B/s]

Device set to use cpu


{'sequence': 'Thanks for the demo. We’ve decided to explore other companies for now', 'labels': ['Positive', 'Neutral', 'Negative'], 'scores': [0.626172661781311, 0.22731542587280273, 0.14651191234588623]}


In [None]:
from transformers import AutoTokenizer, AutoModelForSeq2SeqLM
import torch

# ✅ Load the model & tokenizer only once
model_name = "google/flan-t5-xl"
tokenizer = AutoTokenizer.from_pretrained(model_name)
model = AutoModelForSeq2SeqLM.from_pretrained(model_name)
model.eval()  # Optional: put the model in eval mode

tokenizer_config.json: 0.00B [00:00, ?B/s]

spiece.model:   0%|          | 0.00/792k [00:00<?, ?B/s]

tokenizer.json: 0.00B [00:00, ?B/s]

special_tokens_map.json: 0.00B [00:00, ?B/s]

config.json: 0.00B [00:00, ?B/s]

model.safetensors.index.json: 0.00B [00:00, ?B/s]

Fetching 2 files:   0%|          | 0/2 [00:00<?, ?it/s]

model-00002-of-00002.safetensors:   0%|          | 0.00/1.95G [00:00<?, ?B/s]

model-00001-of-00002.safetensors:   0%|          | 0.00/9.45G [00:00<?, ?B/s]

Loading checkpoint shards:   0%|          | 0/2 [00:00<?, ?it/s]

generation_config.json:   0%|          | 0.00/147 [00:00<?, ?B/s]

T5ForConditionalGeneration(
  (shared): Embedding(32128, 2048)
  (encoder): T5Stack(
    (embed_tokens): Embedding(32128, 2048)
    (block): ModuleList(
      (0): T5Block(
        (layer): ModuleList(
          (0): T5LayerSelfAttention(
            (SelfAttention): T5Attention(
              (q): Linear(in_features=2048, out_features=2048, bias=False)
              (k): Linear(in_features=2048, out_features=2048, bias=False)
              (v): Linear(in_features=2048, out_features=2048, bias=False)
              (o): Linear(in_features=2048, out_features=2048, bias=False)
              (relative_attention_bias): Embedding(32, 32)
            )
            (layer_norm): T5LayerNorm()
            (dropout): Dropout(p=0.1, inplace=False)
          )
          (1): T5LayerFF(
            (DenseReluDense): T5DenseGatedActDense(
              (wi_0): Linear(in_features=2048, out_features=5120, bias=False)
              (wi_1): Linear(in_features=2048, out_features=5120, bias=False)
       

In [None]:


# 🧠 Label prompt for the model
LABEL_GUIDANCE = """
Use exactly one of these labels:
["Positive", "Neutral", "Negative"]

---

🔍 Label Definitions:
can i  use
✅ Positive:
- Ready to proceed (signing, onboarding, payment)
- Explicit satisfaction ("Great work!", "This solves our problem")
- Continued business approval ("Renewing our contract")
- Positive comparisons ("Much better than [competitor]")

➖ Neutral:
- Routine updates ("Received the documents")
- Standard inquiries ("What's the pricing?")
- Non-committal statements ("We'll review and get back")
- Process discussions ("Our team is evaluating options")

❌ Negative:
- Complaints ("This isn't working as promised")
- Threats to churn ("Considering other options")
- Persistent issues ("Still facing the same bug")
- Refund requests ("We need our money back")
- Negative comparisons ("[Competitor] does this better")
"""

def extract_latest_lead_message(chat_log: str):
    """
    Extract only the latest message from the lead (line starting with 'received:')
    """
    lead_lines = [line.strip().replace("received:", "").strip()
                  for line in chat_log.strip().split('\n')
                  if line.strip().lower().startswith("received:")]

    if not lead_lines:
        return ""

    return lead_lines[-1]  # Only the latest one


def classify_sentiment(message: str):
    """
    Classify the sentiment using FLAN-T5 with clear instruction prompt
    """
    full_prompt = f"""{LABEL_GUIDANCE}

Message:
\"{message}\"

What is the sentiment? Respond with only one label from ["Positive", "Neutral", "Negative"].
"""
    inputs = tokenizer(full_prompt, return_tensors="pt", truncation=True)

    with torch.no_grad():
        outputs = model.generate(**inputs, max_new_tokens=10)

    sentiment = tokenizer.decode(outputs[0], skip_special_tokens=True).strip()
    sentiment = sentiment.capitalize()

    # Ensure valid label
    if sentiment not in ["Positive", "Neutral", "Negative"]:
        sentiment = "Neutral"  # fallback

    return sentiment


# 🧪 Test function that you can call with different input
def classify_chat(chat_data: str):
    latest_message = extract_latest_lead_message(chat_data)
    sentiment = classify_sentiment(latest_message)
    print(f"Latest lead message: {latest_message}")
    print(f"Predicted sentiment: {sentiment}")

# Example call
if __name__ == "__main__":
    chat_data = """
    received: We tested the automation yesterday and it saved us hours.
    received: Dashboards look great and the team is happy. We'd like to proceed this week.
    received: Thanks for the demo. We’ve decided to explore other companies for now
    """
    classify_chat(chat_data)

    # You can now call `classify_chat()` again with different chat data without reloading the model

Latest lead message: Thanks for the demo. We’ve decided to explore other companies for now
Predicted sentiment: Negative


In [None]:
#calls

In [None]:
from transformers import AutoTokenizer, AutoModelForSeq2SeqLM
import torch

# Load once
model_name = "google/flan-t5-xl"
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

tokenizer = AutoTokenizer.from_pretrained(model_name)
model = AutoModelForSeq2SeqLM.from_pretrained(model_name).to(device)
model.eval()

# 📊 Label definitions for FLAN prompt
CALL_LABEL_GUIDANCE = """
Use exactly one of these labels:
["Positive", "Neutral", "Negative"]

---

🔍 Label Definitions:

✅ Positive:
- Outcome is deal closure, payment, or onboarding
- Clear success: "Signed", "Onboarded", "Confirmed go-ahead"
- Client shows satisfaction and next steps

➖ Neutral:
- Outcome undecided or in progress
- Needs follow-up or internal review
- Example: "Sent proposal, awaiting response", "Discussion ongoing"

❌ Negative:
- Outcome is a rejection, delay, or negative sentiment
- Example: "Not moving forward", "Postponed indefinitely", "Concerns with product"
"""

def build_call_prompt(call_summary: dict):
    """
    Builds a prompt from structured call summary
    """
    agenda = call_summary.get("Agenda", "").strip()
    result = call_summary.get("Result", "").strip()
    description = call_summary.get("Description", "").strip()

    return f"""{CALL_LABEL_GUIDANCE}

Call Summary:
Agenda: {agenda}
Result: {result}
Description: {description}

What is the sentiment? Respond with one label from ["Positive", "Neutral", "Negative"].
"""


def classify_call_summary(call_summary: dict):
    """
    Classifies sentiment from a structured call summary
    """
    prompt = build_call_prompt(call_summary)
    inputs = tokenizer(prompt, return_tensors="pt", truncation=True).to(device)

    with torch.no_grad():
        outputs = model.generate(**inputs, max_new_tokens=10)

    prediction = tokenizer.decode(outputs[0], skip_special_tokens=True).strip().capitalize()

    if prediction not in ["Positive", "Neutral", "Negative"]:
        prediction = "Neutral"  # fallback

    return prediction


# 🧪 Test it
if __name__ == "__main__":
    call_data = {
        "Agenda": "Shared final agreement",
        "Result": "Signed",
        "Description": "The automation saved us time, but support is painfully slow."
    }

    label = classify_call_summary(call_data)
    print("Predicted sentiment:", label)


In [None]:
!pip install mysql-connector-python


Collecting mysql-connector-python
  Downloading mysql_connector_python-9.4.0-cp311-cp311-manylinux_2_28_x86_64.whl.metadata (7.3 kB)
Downloading mysql_connector_python-9.4.0-cp311-cp311-manylinux_2_28_x86_64.whl (33.9 MB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m33.9/33.9 MB[0m [31m19.4 MB/s[0m eta [36m0:00:00[0m
[?25hInstalling collected packages: mysql-connector-python
Successfully installed mysql-connector-python-9.4.0


In [None]:
from transformers import AutoTokenizer, AutoModelForCausalLM
import mysql.connector
import torch

# --- Load SQLCoder-7B model ---
model_name = "defog/sqlcoder-7b-2"
tokenizer = AutoTokenizer.from_pretrained(model_name)
model = AutoModelForCausalLM.from_pretrained(model_name, torch_dtype=torch.float16, device_map="auto")
model.eval()

# --- Function to fetch schema from a MySQL database ---
def fetch_schema_from_mysql():
    try:
        connection = mysql.connector.connect(
            host="localhost",
            user="root",
            port=3306,
            password="",
            database="categories_lead"
        )

        cursor = connection.cursor()
        cursor.execute("SHOW TABLES")
        tables = cursor.fetchall()

        schema = ""
        for (table_name,) in tables:
            cursor.execute(f"SHOW CREATE TABLE {table_name}")
            create_stmt = cursor.fetchone()[1]
            schema += create_stmt + ";\n\n"

        cursor.close()
        connection.close()
        return schema

    except Exception as e:
        print("❌ Error connecting to DB:", e)
        return ""

# --- Function to generate SQL query using SQLCoder ---
def generate_sql(question: str, schema: str):
    prompt = f"""### Task:
Generate a SQL query to answer the following question:
"{question}"

### Schema:
{schema}

### SQL:
"""
    inputs = tokenizer(prompt, return_tensors="pt", truncation=True, max_length=2048).to(model.device)
    outputs = model.generate(**inputs, max_new_tokens=150, num_beams=4, early_stopping=True)
    sql = tokenizer.decode(outputs[0], skip_special_tokens=True)
    return sql.strip()

# --- Main script ---
if __name__ == "__main__":
    # 🧠 Question from the manager
    question = "How many employees are on leave today?"

    # 🔌 Fetch schema from database
    schema_text = fetch_schema_from_mysql()

    if schema_text:
        # 🪄 Generate SQL
        sql = generate_sql(question, schema_text)
        print("🔍 Generated SQL:\n", sql)
    else:
        print("❌ Failed to fetch schema.")


Loading checkpoint shards:   0%|          | 0/3 [00:00<?, ?it/s]



❌ Error connecting to DB: 2003: Can't connect to MySQL server on 'localhost:3306' (Errno 111: Connection refused)
❌ Failed to fetch schema.


In [None]:
!pip install mysql-connector-python transformers accelerate sentencepiece


Collecting nvidia-cuda-nvrtc-cu12==12.4.127 (from torch>=2.0.0->accelerate)
  Downloading nvidia_cuda_nvrtc_cu12-12.4.127-py3-none-manylinux2014_x86_64.whl.metadata (1.5 kB)
Collecting nvidia-cuda-runtime-cu12==12.4.127 (from torch>=2.0.0->accelerate)
  Downloading nvidia_cuda_runtime_cu12-12.4.127-py3-none-manylinux2014_x86_64.whl.metadata (1.5 kB)
Collecting nvidia-cuda-cupti-cu12==12.4.127 (from torch>=2.0.0->accelerate)
  Downloading nvidia_cuda_cupti_cu12-12.4.127-py3-none-manylinux2014_x86_64.whl.metadata (1.6 kB)
Collecting nvidia-cudnn-cu12==9.1.0.70 (from torch>=2.0.0->accelerate)
  Downloading nvidia_cudnn_cu12-9.1.0.70-py3-none-manylinux2014_x86_64.whl.metadata (1.6 kB)
Collecting nvidia-cublas-cu12==12.4.5.8 (from torch>=2.0.0->accelerate)
  Downloading nvidia_cublas_cu12-12.4.5.8-py3-none-manylinux2014_x86_64.whl.metadata (1.5 kB)
Collecting nvidia-cufft-cu12==11.2.1.3 (from torch>=2.0.0->accelerate)
  Downloading nvidia_cufft_cu12-11.2.1.3-py3-none-manylinux2014_x86_64.wh

In [None]:
import mysql.connector
from transformers import AutoTokenizer, AutoModelForCausalLM
import torch

# --- Load model ---
model_name = "defog/sqlcoder-7b-2"
tokenizer = AutoTokenizer.from_pretrained(model_name)
model = AutoModelForCausalLM.from_pretrained(model_name, device_map="auto", torch_dtype=torch.float16)
model.eval()

# --- Connect to your MySQL DB ---
def get_db_connection():
    return mysql.connector.connect(
    host="68.178.155.255",           # or IP address
    user="Anika12",
    password="Anika12",
    database="categories_lead"
    )

# --- Extract schema info (table names + columns) ---
def get_schema_text(conn):
    cursor = conn.cursor()
    cursor.execute("SHOW TABLES")
    tables = cursor.fetchall()

    schema_parts = []
    for (table_name,) in tables:
        cursor.execute(f"DESCRIBE {table_name}")
        columns = cursor.fetchall()
        columns_desc = ", ".join([f"{col[0]} {col[1]}" for col in columns])
        schema_parts.append(f"Table {table_name}: {columns_desc}")
    cursor.close()
    return "\n".join(schema_parts)

# --- Generate SQL ---
def generate_sql(question, schema_text):
    prompt = f"""### Task:
Generate a SQL query for the question below.

Question:
{question}

Schema:
{schema_text}

SQL query:
"""
    inputs = tokenizer(prompt, return_tensors="pt", truncation=True, max_length=1024).to(model.device)
    outputs = model.generate(
        **inputs,
        max_new_tokens=150,
        num_beams=4,
        early_stopping=True,
        no_repeat_ngram_size=2,
    )
    sql = tokenizer.decode(outputs[0], skip_special_tokens=True)
    return sql.strip()

# --- Main flow ---
if __name__ == "__main__":
    # Connect to DB and get schema
    try:
        conn = get_db_connection()
        schema_text = get_schema_text(conn)
        print("Extracted schema:\n", schema_text)

        # Sample question
        question = "how many lead ids are there?"

        sql_query = generate_sql(question, schema_text)
        print("\nGenerated SQL query:\n", sql_query)
    except Exception as e:
        print("Error:", e)
    finally:
        if 'conn' in locals() and conn.is_connected():
            conn.close()


Loading checkpoint shards:   0%|          | 0/3 [00:00<?, ?it/s]

Setting `pad_token_id` to `eos_token_id`:2 for open-end generation.


Extracted schema:
 Table calls: id int(11), call_id varchar(10), call_for varchar(50), related_to varchar(50), call_type varchar(50), call_status varchar(50), call_date varchar(50), call_time varchar(10), duration_minutes int(11), duration_seconds int(11), subject varchar(255), voice_recording varchar(255), call_purpose varchar(100), call_agenda varchar(255), call_result varchar(100), description text, lead_id varchar(20), assigned_to varchar(100), created_at datetime, updated_at datetime
Table conversations: id int(11), lead_id int(11), conversation_text text, predicted_label varchar(50), confidence_score decimal(5,4), created_at timestamp, updated_at timestamp, prediction_method varchar(500)
Table emails: id int(11), lead_id int(11), user_id int(11), direction enum('sent','received'), subject varchar(255), body text, status enum('sent','bounced','delivered','failed','replied'), opened_at datetime, clicked_at datetime, email_thread_id varchar(50), timestamp datetime, is_archived tinyi

In [None]:
!pip install mysql-connector-python

Collecting mysql-connector-python
  Downloading mysql_connector_python-9.4.0-cp311-cp311-manylinux_2_28_x86_64.whl.metadata (7.3 kB)
Downloading mysql_connector_python-9.4.0-cp311-cp311-manylinux_2_28_x86_64.whl (33.9 MB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m33.9/33.9 MB[0m [31m49.9 MB/s[0m eta [36m0:00:00[0m
[?25hInstalling collected packages: mysql-connector-python
Successfully installed mysql-connector-python-9.4.0


In [None]:
#final-part1

In [None]:
!pip install mysql-connector-python

Collecting mysql-connector-python
  Downloading mysql_connector_python-9.4.0-cp311-cp311-manylinux_2_28_x86_64.whl.metadata (7.3 kB)
Downloading mysql_connector_python-9.4.0-cp311-cp311-manylinux_2_28_x86_64.whl (33.9 MB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m33.9/33.9 MB[0m [31m21.8 MB/s[0m eta [36m0:00:00[0m
[?25hInstalling collected packages: mysql-connector-python
Successfully installed mysql-connector-python-9.4.0


In [7]:
from transformers import AutoTokenizer, AutoModelForSeq2SeqLM
import torch
import mysql.connector

# ✅ Define database connection
def connect_to_db():
    return mysql.connector.connect(
        host="68.178.155.255",
        user="Anika12",
        password="Anika12",
        database="categories_lead"
    )

# ✅ Load FLAN-T5 Model
model_name = "google/flan-t5-xl"
tokenizer = AutoTokenizer.from_pretrained(model_name)
model = AutoModelForSeq2SeqLM.from_pretrained(model_name)
model.eval()

# ✅ Sentiment prompt
LABEL_GUIDANCE = """
Use exactly one of these labels:
["Positive", "Neutral", "Negative"]

---

🔍 Label Definitions:

✅ Positive:
- Ready to proceed (signing, onboarding, payment)
- Explicit satisfaction ("Great work!", "This solves our problem")
- Continued business approval ("Renewing our contract")
- Positive comparisons ("Much better than [competitor]")

➖ Neutral:
- Routine updates ("Received the documents")
- Standard inquiries ("What's the pricing?")
- Non-committal statements ("We'll review and get back")
- Process discussions ("Our team is evaluating options")

❌ Negative:
- Complaints ("This isn't working as promised")
- Threats to churn ("Considering other options")
- Persistent issues ("Still facing the same bug")
- Refund requests ("We need our money back")
- Negative comparisons ("[Competitor] does this better")
"""

def classify_sentiment(message: str) -> str:
    """
    Classify sentiment from message using FLAN-T5 model
    """
    full_prompt = f"""{LABEL_GUIDANCE}

Message:
\"{message}\"

What is the sentiment? Respond with only one label from ["Positive", "Neutral", "Negative"].
"""
    inputs = tokenizer(full_prompt, return_tensors="pt", truncation=True)

    with torch.no_grad():
        outputs = model.generate(**inputs, max_new_tokens=10)

    sentiment = tokenizer.decode(outputs[0], skip_special_tokens=True).strip()
    sentiment = sentiment.capitalize()

    return sentiment if sentiment in ["Positive", "Neutral", "Negative"] else "Neutral"

def insert_sentiment(cursor, lead_id, sentiment):
    """
    Insert or update sentiment into sentiment_module table
    """
    query = """
        INSERT INTO sentiment_module (lead_id, sentiment_emails)
        VALUES (%s, %s)
        ON DUPLICATE KEY UPDATE sentiment_emails = VALUES(sentiment_emails);
    """
    cursor.execute(query, (lead_id, sentiment))

def run_sentiment_pipeline():
    """
    Optimized pipeline: fetches data first, runs model second, writes third
    """
    # Step 1: Connect and fetch all leads + emails
    try:
        conn = connect_to_db()
        cursor = conn.cursor(dictionary=True)
    except Exception as e:
        print("❌ Failed to connect to database:", e)
        return

    lead_data = []

    try:
        cursor.execute("SELECT DISTINCT lead_id FROM leads")
        lead_ids = [row["lead_id"] for row in cursor.fetchall()]
        print(f"⚙️ Processing {len(lead_ids)} leads...")

        for lead_id in lead_ids:
            cursor.execute("""
                SELECT body FROM emails
                WHERE lead_id = %s AND direction = 'received'
                ORDER BY timestamp DESC
                LIMIT 1
            """, (lead_id,))
            result = cursor.fetchone()
            if result and result['body']:
                lead_data.append((lead_id, result['body']))
            else:
                print(f"[Lead ID {lead_id}] No 'received' message found.")

    except Exception as e:
        print("❌ Error while fetching leads or emails:", e)
        cursor.close()
        conn.close()
        return

    cursor.close()
    conn.close()

    # Step 2: Run model on all data
    print("\n🧠 Running FLAN-T5 sentiment classification...\n")
    sentiment_results = []
    for lead_id, message in lead_data:
        sentiment = classify_sentiment(message)
        print(f"[Lead ID {lead_id}] Message: {message}")
        print(f"[Lead ID {lead_id}] Sentiment: {sentiment}")
        sentiment_results.append((lead_id, sentiment))

    # Step 3: Reconnect and insert all results
    try:
        conn = connect_to_db()
        cursor = conn.cursor()
        for lead_id, sentiment in sentiment_results:
            insert_sentiment(cursor, lead_id, sentiment)
        conn.commit()
        print("\n✅ All sentiment classifications saved to DB.")
    except Exception as e:
        print("❌ Error while inserting sentiment data:", e)
    finally:
        cursor.close()
        conn.close()

if __name__ == "__main__":
    run_sentiment_pipeline()


Loading checkpoint shards:   0%|          | 0/2 [00:00<?, ?it/s]

⚙️ Processing 7 leads...

🧠 Running FLAN-T5 sentiment classification...

[Lead ID 101] Message: it is so expensive
[Lead ID 101] Sentiment: Negative
[Lead ID 6] Message: that sounds good — let’s plan a quick call Thursday to go over the details
[Lead ID 6] Sentiment: Positive
[Lead ID 102] Message: i am not satisfied with the product
[Lead ID 102] Sentiment: Negative
[Lead ID 105] Message: That sounds good — let’s plan a quick call Thursday to go over the details
[Lead ID 105] Sentiment: Positive
[Lead ID 103] Message: Apologies for the delay — I’ve been tied up with other priorities and honestly, this project is on the back burner for us right now. I’m out of office quite a bit over the next month and won’t be able to give this the attention it deserves. Let’s pause for now and reconnect later this year if it makes sense — I appreciate your patience and follow-ups
[Lead ID 103] Sentiment: Neutral
[Lead ID 106] Message: Hey, thanks for following up. Honestly, we’re swamped with another

In [None]:
#calls

In [9]:
!pip install mysql-connector-python

from transformers import AutoTokenizer, AutoModelForSeq2SeqLM
import torch
import mysql.connector
from mysql.connector import OperationalError
import time

# ✅ Load FLAN-T5 Model
model_name = "google/flan-t5-xl"
tokenizer = AutoTokenizer.from_pretrained(model_name)
model = AutoModelForSeq2SeqLM.from_pretrained(model_name)
model.eval()

# ✅ Sentiment prompt
LABEL_GUIDANCE = """
Use exactly one of these labels:
["Positive", "Neutral", "Negative"]

---

🔍 Label Definitions:

✅ Positive:
- Ready to proceed (signing, onboarding, payment)
- Explicit satisfaction ("Great work!", "This solves our problem")
- Continued business approval ("Renewing our contract")
- Positive comparisons ("Much better than [competitor]")

➖ Neutral:
- Routine updates ("Received the documents")
- Standard inquiries ("What's the pricing?")
- Non-committal statements ("We'll review and get back")
- Process discussions ("Our team is evaluating options")

❌ Negative:
- Complaints ("This isn't working as promised")
- Threats to churn ("Considering other options")
- Persistent issues ("Still facing the same bug")
- Refund requests ("We need our money back")
- Negative comparisons ("[Competitor] does this better")
"""

def classify_sentiment(message: str) -> str:
    full_prompt = f"""{LABEL_GUIDANCE}

Message:
\"{message}\"

What is the sentiment? Respond with only one label from ["Positive", "Neutral", "Negative"].
"""
    inputs = tokenizer(full_prompt, return_tensors="pt", truncation=True)

    with torch.no_grad():
        outputs = model.generate(**inputs, max_new_tokens=10)

    sentiment = tokenizer.decode(outputs[0], skip_special_tokens=True).strip()
    sentiment = sentiment.capitalize()

    return sentiment if sentiment in ["Positive", "Neutral", "Negative"] else "Neutral"

def connect_to_db():
    return mysql.connector.connect(
        host="68.178.155.255",
        user="Anika12",
        password="Anika12",
        database="categories_lead"
    )

def get_all_lead_ids(cursor, limit=50):
    cursor.execute("SELECT lead_id FROM leads LIMIT %s;", (limit,))
    return [row[0] for row in cursor.fetchall()]

def get_latest_call_description(cursor, lead_id):
    """
    Get the latest call description based on updated_at timestamp.
    """
    query = """
        SELECT description FROM calls
        WHERE lead_id = %s
        ORDER BY updated_at DESC
        LIMIT 1;
    """
    cursor.execute(query, (lead_id,))
    result = cursor.fetchone()
    return result[0] if result else None

def insert_or_update_call_sentiment(cursor, lead_id, sentiment):
    """
    Insert or update sentiment_calls in sentiment_module.
    """
    query = """
        INSERT INTO sentiment_module (lead_id, sentiment_emails, sentiment_meetings, sentiment_calls)
        VALUES (%s, '', '', %s)
        ON DUPLICATE KEY UPDATE sentiment_calls = VALUES(sentiment_calls);
    """
    cursor.execute(query, (lead_id, sentiment))

def run_call_sentiment_pipeline():
    try:
        conn = connect_to_db()
        cursor = conn.cursor()
    except Exception as e:
        print("❌ Failed to connect to database:", e)
        return

    try:
        lead_ids = get_all_lead_ids(cursor, limit=50)
        print(f"📞 Processing {len(lead_ids)} leads for call sentiment...")

        for lead_id in lead_ids:
            try:
                description = get_latest_call_description(cursor, lead_id)
                if not description:
                    print(f"[Lead ID {lead_id}] No call description found.")
                    continue

                sentiment = classify_sentiment(description)
                print(f"[Lead ID {lead_id}] Description: {description}")
                print(f"[Lead ID {lead_id}] Sentiment: {sentiment}")

                insert_or_update_call_sentiment(cursor, lead_id, sentiment)
                conn.commit()

            except OperationalError as db_err:
                print(f"🔌 Lost connection. Reconnecting... Error: {db_err}")
                time.sleep(2)
                conn = connect_to_db()
                cursor = conn.cursor()
            except Exception as err:
                print(f"❌ Error processing lead {lead_id}: {err}")
                continue

    finally:
        cursor.close()
        conn.close()
        print("✅ Call sentiment classification completed and stored.")

if __name__ == "__main__":
    run_call_sentiment_pipeline()




Loading checkpoint shards:   0%|          | 0/2 [00:00<?, ?it/s]

📞 Processing 7 leads for call sentiment...
[Lead ID 101] Description: hjk
[Lead ID 101] Sentiment: Positive
🔌 Lost connection. Reconnecting... Error: 2055: Lost connection to MySQL server at '68.178.155.255:3306', system error: Errno 104: Connection reset by peer
[Lead ID 6] No call description found.
[Lead ID 102] Description: lead is not satisfied
[Lead ID 102] Sentiment: Negative
🔌 Lost connection. Reconnecting... Error: 2055: Lost connection to MySQL server at '68.178.155.255:3306', system error: Errno 104: Connection reset by peer
[Lead ID 105] Description: Positive call, lead is price sensitive.
[Lead ID 105] Sentiment: Neutral
🔌 Lost connection. Reconnecting... Error: 2055: Lost connection to MySQL server at '68.178.155.255:3306', system error: Errno 104: Connection reset by peer
[Lead ID 103] Description: Client needs internal approval.
[Lead ID 103] Sentiment: Neutral
🔌 Lost connection. Reconnecting... Error: 2055: Lost connection to MySQL server at '68.178.155.255:3306', syst

In [None]:
#meetings

In [4]:
!pip install mysql-connector-python

from transformers import AutoTokenizer, AutoModelForSeq2SeqLM
import torch
import mysql.connector
from mysql.connector import OperationalError
import time

# ✅ Load FLAN-T5 Model
model_name = "google/flan-t5-xl"
tokenizer = AutoTokenizer.from_pretrained(model_name)
model = AutoModelForSeq2SeqLM.from_pretrained(model_name)
model.eval()

# ✅ Sentiment prompt
LABEL_GUIDANCE = """
Use exactly one of these labels:
["Positive", "Neutral", "Negative"]

---

🔍 Label Definitions:

✅ Positive:
- Ready to proceed (signing, onboarding, payment)
- Explicit satisfaction ("Great work!", "This solves our problem")
- Continued business approval ("Renewing our contract")
- Positive comparisons ("Much better than [competitor]")

➖ Neutral:
- Routine updates ("Received the documents")
- Standard inquiries ("What's the pricing?")
- Non-committal statements ("We'll review and get back")
- Process discussions ("Our team is evaluating options")

❌ Negative:
- Complaints ("This isn't working as promised")
- Threats to churn ("Considering other options")
- Persistent issues ("Still facing the same bug")
- Refund requests ("We need our money back")
- Negative comparisons ("[Competitor] does this better")
"""

def classify_sentiment(message: str) -> str:
    full_prompt = f"""{LABEL_GUIDANCE}

Message:
\"{message}\"

What is the sentiment? Respond with only one label from ["Positive", "Neutral", "Negative"].
"""
    inputs = tokenizer(full_prompt, return_tensors="pt", truncation=True)

    with torch.no_grad():
        outputs = model.generate(**inputs, max_new_tokens=10)

    sentiment = tokenizer.decode(outputs[0], skip_special_tokens=True).strip()
    sentiment = sentiment.capitalize()

    return sentiment if sentiment in ["Positive", "Neutral", "Negative"] else "Neutral"

def connect_to_db():
    return mysql.connector.connect(
        host="68.178.155.255",
        user="Anika12",
        password="Anika12",
        database="categories_lead"
    )

def get_all_meeting_leads(cursor, limit=50):
    """
    Fetch distinct lead_ids (related_to) from meetings table.
    """
    query = """
        SELECT DISTINCT related_to FROM meetings
        WHERE related_to IS NOT NULL
        LIMIT %s;
    """
    cursor.execute(query, (limit,))
    return [row[0] for row in cursor.fetchall()]

def get_latest_meeting_description(cursor, lead_id):
    """
    Get the latest meeting description based on updated_at.
    """
    query = """
        SELECT description FROM meetings
        WHERE related_to = %s
        ORDER BY updated_at DESC
        LIMIT 1;
    """
    cursor.execute(query, (lead_id,))
    result = cursor.fetchone()
    return result[0] if result else None

def insert_or_update_meeting_sentiment(cursor, lead_id, sentiment):
    """
    Insert or update sentiment_meetings in sentiment_module.
    """
    query = """
        INSERT INTO sentiment_module (lead_id, sentiment_emails, sentiment_meetings, sentiment_calls)
        VALUES (%s, '', %s, '')
        ON DUPLICATE KEY UPDATE sentiment_meetings = VALUES(sentiment_meetings);
    """
    cursor.execute(query, (lead_id, sentiment))

def run_meeting_sentiment_pipeline():
    try:
        conn = connect_to_db()
        cursor = conn.cursor()
    except Exception as e:
        print("❌ Failed to connect to database:", e)
        return

    try:
        lead_ids = get_all_meeting_leads(cursor, limit=50)
        print(f"📅 Processing {len(lead_ids)} leads for meeting sentiment...")

        for lead_id in lead_ids:
            try:
                description = get_latest_meeting_description(cursor, lead_id)
                if not description:
                    print(f"[Lead ID {lead_id}] No meeting description found.")
                    continue

                sentiment = classify_sentiment(description)
                print(f"[Lead ID {lead_id}] Description: {description}")
                print(f"[Lead ID {lead_id}] Sentiment: {sentiment}")

                insert_or_update_meeting_sentiment(cursor, lead_id, sentiment)
                conn.commit()

            except OperationalError as db_err:
                print(f"🔌 Lost connection. Reconnecting... Error: {db_err}")
                time.sleep(2)
                conn = connect_to_db()
                cursor = conn.cursor()
            except Exception as err:
                print(f"❌ Error processing lead {lead_id}: {err}")
                continue

    finally:
        cursor.close()
        conn.close()
        print("✅ Meeting sentiment classification completed and stored.")

if __name__ == "__main__":
    run_meeting_sentiment_pipeline()




Loading checkpoint shards:   0%|          | 0/2 [00:00<?, ?it/s]

📅 Processing 4 leads for meeting sentiment...
[Lead ID 101] Description: discussed abu the porduct
[Lead ID 101] Sentiment: Positive
[Lead ID 102] Description: the lead is satisfied
[Lead ID 102] Sentiment: Positive
[Lead ID 103] Description: Review shared proposal.
[Lead ID 103] Sentiment: Neutral
[Lead ID 104] Description: fhgfghfghfgh
[Lead ID 104] Sentiment: Neutral
✅ Meeting sentiment classification completed and stored.


To be implemeneted


In [None]:
#final part 2

In [12]:
from transformers import AutoTokenizer, AutoModelForSeq2SeqLM
import torch
import mysql.connector
from mysql.connector import OperationalError
import time

# ✅ Load FLAN-T5 Model
model_name = "google/flan-t5-xl"
tokenizer = AutoTokenizer.from_pretrained(model_name)
model = AutoModelForSeq2SeqLM.from_pretrained(model_name)
model.eval()

# ✅ Define database connection
def connect_to_db():
    return mysql.connector.connect(
        host="68.178.155.255",
        user="Anika12",
        password="Anika12",
        database="categories_lead"
    )

# ✅ Sentiment guidance prompt
LABEL_GUIDANCE = """
Use exactly one of these labels:
["Positive", "Neutral", "Negative"]

---

🔍 Label Definitions:

✅ Positive:
- Ready to proceed (signing, onboarding, payment)
- Explicit satisfaction ("Great work!", "This solves our problem")
- Continued business approval ("Renewing our contract")
- Positive comparisons ("Much better than [competitor]")

➖ Neutral:
- Routine updates ("Received the documents")
- Standard inquiries ("What's the pricing?")
- Non-committal statements ("We'll review and get back")
- Process discussions ("Our team is evaluating options")

❌ Negative:
- Complaints ("This isn't working as promised")
- Threats to churn ("Considering other options")
- Persistent issues ("Still facing the same bug")
- Refund requests ("We need our money back")
- Negative comparisons ("[Competitor] does this better")
"""

# ✅ Sentiment classification
def classify_sentiment(message: str) -> str:
    full_prompt = f"""{LABEL_GUIDANCE}

Message:
\"{message}\"

What is the sentiment? Respond with only one label from ["Positive", "Neutral", "Negative"]."""

    inputs = tokenizer(full_prompt, return_tensors="pt", truncation=True)

    with torch.no_grad():
        outputs = model.generate(**inputs, max_new_tokens=10)

    sentiment = tokenizer.decode(outputs[0], skip_special_tokens=True).strip()
    sentiment = sentiment.capitalize()

    return sentiment if sentiment in ["Positive", "Neutral", "Negative"] else "Neutral"

# ✅ Upsert function
def upsert_email_sentiment(cursor, lead_id: str, sentiment: str):
    lead_id = (str(lead_id) if lead_id is not None else "").strip()
    if not lead_id:
        return  # Skip bad IDs

    sql = """
        INSERT INTO sentiment_module (lead_id, sentiment_emails)
        VALUES (%s, %s)
        ON DUPLICATE KEY UPDATE sentiment_emails = VALUES(sentiment_emails)
    """
    cursor.execute(sql, (lead_id, sentiment))

# ✅ Main pipeline with reconnection logic
def run_sentiment_pipeline():
    # Step 1: Fetch lead_ids + latest received email body
    try:
        conn = connect_to_db()
        cursor = conn.cursor(dictionary=True)
    except Exception as e:
        print("❌ Failed to connect to database:", e)
        return

    lead_data = []

    try:
        cursor.execute("SELECT DISTINCT lead_id FROM leads")
        lead_ids = [row["lead_id"] for row in cursor.fetchall()]
        print(f"⚙️ Processing {len(lead_ids)} leads...")

        for lead_id in lead_ids:
            cursor.execute("""
                SELECT body FROM emails
                WHERE lead_id = %s AND direction = 'received'
                ORDER BY timestamp DESC
                LIMIT 1
            """, (lead_id,))
            row = cursor.fetchone()
            if row and row.get('body'):
                lead_data.append((lead_id, row['body']))
            else:
                # Pre-create empty row for later
                try:
                    cursor2 = conn.cursor()
                    cursor2.execute(
                        "INSERT IGNORE INTO sentiment_module (lead_id) VALUES (%s)",
                        ((str(lead_id).strip()),)
                    )
                    conn.commit()
                    cursor2.close()
                except Exception:
                    pass
                print(f"[Lead ID {lead_id}] No 'received' email found.")
    except Exception as e:
        print("❌ Error while fetching leads or emails:", e)
        cursor.close()
        conn.close()
        return

    cursor.close()
    conn.close()

    # Step 2: Run sentiment + insert with error recovery
    print("\n🧠 Running FLAN-T5 sentiment classification (emails)...\n")

    try:
        conn = connect_to_db()
        cursor = conn.cursor()

        for lead_id, message in lead_data:
            try:
                sentiment = classify_sentiment(message)
                print(f"[Lead ID {lead_id}] Sentiment (email): {sentiment}")
                upsert_email_sentiment(cursor, lead_id, sentiment)
                conn.commit()
            except OperationalError as db_err:
                print(f"🔌 Lost connection during insertion. Reconnecting... Error: {db_err}")
                time.sleep(2)
                conn = connect_to_db()
                cursor = conn.cursor()
                # Retry the same lead after reconnecting
                upsert_email_sentiment(cursor, lead_id, sentiment)
                conn.commit()
            except Exception as err:
                print(f"❌ Error processing lead {lead_id}: {err}")
                continue

    finally:
        cursor.close()
        conn.close()
        print("\n✅ Email sentiment classification completed and stored.")

# ✅ Run
if __name__ == "__main__":
    run_sentiment_pipeline()


Loading checkpoint shards:   0%|          | 0/2 [00:00<?, ?it/s]

⚙️ Processing 7 leads...

🧠 Running FLAN-T5 sentiment classification (emails)...

[Lead ID 101] Sentiment (email): Negative
❌ Error processing lead 101: Failed processing format-parameters; MySQL Connection not available
[Lead ID 6] Sentiment (email): Positive
❌ Error processing lead 6: Failed processing format-parameters; MySQL Connection not available
[Lead ID 102] Sentiment (email): Negative
❌ Error processing lead 102: Failed processing format-parameters; MySQL Connection not available
[Lead ID 105] Sentiment (email): Positive
❌ Error processing lead 105: Failed processing format-parameters; MySQL Connection not available
[Lead ID 103] Sentiment (email): Neutral
❌ Error processing lead 103: Failed processing format-parameters; MySQL Connection not available
[Lead ID 106] Sentiment (email): Neutral
❌ Error processing lead 106: Failed processing format-parameters; MySQL Connection not available
[Lead ID 104] Sentiment (email): Neutral
❌ Error processing lead 104: Failed processing fo

In [5]:
# Install MySQL connector
!pip install -q mysql-connector-python

from transformers import AutoTokenizer, AutoModelForSeq2SeqLM
import torch
import mysql.connector
from mysql.connector import OperationalError, Error
import time

# =========================
# Model setup
# =========================
model_name = "google/flan-t5-xl"
tokenizer = AutoTokenizer.from_pretrained(model_name)
model = AutoModelForSeq2SeqLM.from_pretrained(model_name)
model.eval()

LABEL_GUIDANCE = """
Use exactly one of these labels:
["Positive", "Neutral", "Negative"]

---

🔍 Label Definitions:

✅ Positive:
- Ready to proceed (signing, onboarding, payment)
- Explicit satisfaction ("Great work!", "This solves our problem")
- Continued business approval ("Renewing our contract")
- Positive comparisons ("Much better than [competitor]")

➖ Neutral:
- Routine updates ("Received the documents")
- Standard inquiries ("What's the pricing?")
- Non-committal statements ("We'll review and get back")
- Process discussions ("Our team is evaluating options")

❌ Negative:
- Complaints ("This isn't working as promised")
- Threats to churn ("Considering other options")
- Persistent issues ("Still facing the same bug")
- Refund requests ("We need our money back")
- Negative comparisons ("[Competitor] does this better")
"""

def classify_sentiment(message: str) -> str:
    prompt = f"""{LABEL_GUIDANCE}

Message:
\"{message}\"

What is the sentiment? Respond with only one label from ["Positive", "Neutral", "Negative"].
"""
    inputs = tokenizer(prompt, return_tensors="pt", truncation=True)
    with torch.no_grad():
        outputs = model.generate(**inputs, max_new_tokens=10)
    sentiment = tokenizer.decode(outputs[0], skip_special_tokens=True).strip().capitalize()
    return sentiment if sentiment in ("Positive", "Neutral", "Negative") else "Neutral"

# =========================
# DB helpers
# =========================
def connect_to_db():
    # Keep connection lean; we’ll ping before use
    return mysql.connector.connect(
        host="68.178.155.255",
        user="Anika12",
        password="Anika12",
        database="categories_lead",
        connection_timeout=10,
        autocommit=False,
    )

def ping(conn):
    """Ensure the connection is alive; reconnect if needed."""
    try:
        if conn is None or not conn.is_connected():
            return connect_to_db()
        conn.ping(reconnect=True, attempts=1, delay=0)
        return conn
    except Exception:
        # If ping fails, reconnect
        try:
            if conn:
                conn.close()
        except Exception:
            pass
        return connect_to_db()

def get_all_lead_ids(cursor, limit=50):
    cursor.execute("SELECT DISTINCT lead_id FROM leads LIMIT %s;", (limit,))
    return [row[0] for row in cursor.fetchall()]

def get_latest_call_description(cursor, lead_id):
    cursor.execute("""
        SELECT description FROM calls
        WHERE lead_id = %s
        ORDER BY COALESCE(updated_at, created_at) DESC
        LIMIT 1;
    """, (lead_id,))
    row = cursor.fetchone()
    return row[0] if row else None

def ensure_row_exists(cursor, lead_id):
    cursor.execute("INSERT IGNORE INTO sentiment_module (lead_id) VALUES (%s)", (lead_id,))

def upsert_call_sentiment(cursor, lead_id, sentiment):
    cursor.execute("""
        INSERT INTO sentiment_module (lead_id, sentiment_calls)
        VALUES (%s, %s)
        ON DUPLICATE KEY UPDATE sentiment_calls = VALUES(sentiment_calls);
    """, (lead_id, sentiment))

# =========================
# Main pipeline (single classify, write with one retry)
# =========================
def run_call_sentiment_pipeline():
    # Single shared connection for the run; we’ll ping before use
    conn = None
    try:
        conn = connect_to_db()
        conn = ping(conn)
        read_cur = conn.cursor()  # simple cursor for lists
        lead_ids = get_all_lead_ids(read_cur, limit=50)
        print(f"📞 Processing {len(lead_ids)} leads for call sentiment...")
        print(f"🆔 Lead IDs: {lead_ids}")

        for raw_lead_id in lead_ids:
            lead_id = (str(raw_lead_id) if raw_lead_id is not None else "").strip()
            if not lead_id:
                print("⚠️ Skipping blank lead_id")
                continue

            # --- Fetch once (no retries) ---
            conn = ping(conn)
            read_cur = conn.cursor()
            description = get_latest_call_description(read_cur, lead_id)
            if not description:
                print(f"[Lead ID {lead_id}] No call description found.")
                continue

            # --- Classify once (no retries) ---
            sentiment = classify_sentiment(description)
            print(f"[Lead ID {lead_id}] Description: {description}")
            print(f"[Lead ID {lead_id}] Sentiment: {sentiment}")

            # --- Write with a single retry on DB step only ---
            try:
                conn = ping(conn)
                write_cur = conn.cursor()
                ensure_row_exists(write_cur, lead_id)
                upsert_call_sentiment(write_cur, lead_id, sentiment)
                conn.commit()
                write_cur.close()
            except OperationalError as db_err:
                # Connection dropped during write: reconnect and try the write ONCE
                print(f"🔌 Write failed (will retry once): {db_err}")
                try:
                    # Reconnect fresh and retry only the WRITE
                    if conn:
                        try:
                            conn.close()
                        except Exception:
                            pass
                    conn = connect_to_db()
                    write_cur = conn.cursor()
                    ensure_row_exists(write_cur, lead_id)
                    upsert_call_sentiment(write_cur, lead_id, sentiment)
                    conn.commit()
                    write_cur.close()
                    print(f"✅ Write succeeded on retry for Lead ID {lead_id}.")
                except Exception as retry_err:
                    print(f"❌ Write retry failed for Lead ID {lead_id}: {retry_err}")
                    # Do NOT re-run fetch/classify; move to next lead
                    continue
            except Error as e:
                print(f"❌ DB error for Lead ID {lead_id}: {e}")
                # Move on—don’t re-run whole loop for this lead
                continue

        try:
            read_cur.close()
        except Exception:
            pass

    finally:
        if conn:
            try:
                conn.close()
            except Exception:
                pass
        print("✅ Call sentiment classification completed and stored.")

# Run
if __name__ == "__main__":
    run_call_sentiment_pipeline()


Loading checkpoint shards:   0%|          | 0/2 [00:00<?, ?it/s]

📞 Processing 7 leads for call sentiment...
🆔 Lead IDs: [101, 6, 102, 105, 103, 106, 104]
[Lead ID 101] Description: hjk
[Lead ID 101] Sentiment: Positive
[Lead ID 6] No call description found.
[Lead ID 102] Description: lead is not satisfied
[Lead ID 102] Sentiment: Negative
[Lead ID 105] Description: Positive call, lead is price sensitive.
[Lead ID 105] Sentiment: Neutral
[Lead ID 103] Description: Client needs internal approval.
[Lead ID 103] Sentiment: Neutral
[Lead ID 106] Description: Lead negotiating payment terms.
[Lead ID 106] Sentiment: Positive
[Lead ID 104] Description: 4564536
[Lead ID 104] Sentiment: Neutral
✅ Call sentiment classification completed and stored.


In [6]:
# Install MySQL connector (quiet)
!pip install -q mysql-connector-python

from transformers import AutoTokenizer, AutoModelForSeq2SeqLM
import torch
import mysql.connector
from mysql.connector import OperationalError, Error
import time

# =========================
# Model setup
# =========================
model_name = "google/flan-t5-xl"
tokenizer = AutoTokenizer.from_pretrained(model_name)
model = AutoModelForSeq2SeqLM.from_pretrained(model_name)
model.eval()

LABEL_GUIDANCE = """
Use exactly one of these labels:
["Positive", "Neutral", "Negative"]

---

🔍 Label Definitions:

✅ Positive:
- Ready to proceed (signing, onboarding, payment)
- Explicit satisfaction ("Great work!", "This solves our problem")
- Continued business approval ("Renewing our contract")
- Positive comparisons ("Much better than [competitor]")

➖ Neutral:
- Routine updates ("Received the documents")
- Standard inquiries ("What's the pricing?")
- Non-committal statements ("We'll review and get back")
- Process discussions ("Our team is evaluating options")

❌ Negative:
- Complaints ("This isn't working as promised")
- Threats to churn ("Considering other options")
- Persistent issues ("Still facing the same bug")
- Refund requests ("We need our money back")
- Negative comparisons ("[Competitor] does this better")
"""

def classify_sentiment(message: str) -> str:
    prompt = f"""{LABEL_GUIDANCE}

Message:
\"{message}\"

What is the sentiment? Respond with only one label from ["Positive", "Neutral", "Negative"].
"""
    inputs = tokenizer(prompt, return_tensors="pt", truncation=True)
    with torch.no_grad():
        outputs = model.generate(**inputs, max_new_tokens=10)
    sentiment = tokenizer.decode(outputs[0], skip_special_tokens=True).strip().capitalize()
    return sentiment if sentiment in ("Positive", "Neutral", "Negative") else "Neutral"

# =========================
# DB helpers
# =========================
def connect_to_db():
    # keep it lean; we will ping before critical ops
    return mysql.connector.connect(
        host="68.178.155.255",
        user="Anika12",
        password="Anika12",
        database="categories_lead",
        connection_timeout=10,
        autocommit=False,
    )

def ping(conn):
    """Ensure the connection is alive; reconnect if needed."""
    try:
        if conn is None or not conn.is_connected():
            return connect_to_db()
        conn.ping(reconnect=True, attempts=1, delay=0)
        return conn
    except Exception:
        try:
            if conn:
                conn.close()
        except Exception:
            pass
        return connect_to_db()

def get_all_meeting_leads(cursor, limit=50):
    cursor.execute("""
        SELECT DISTINCT related_to
        FROM meetings
        WHERE related_to IS NOT NULL
        LIMIT %s;
    """, (limit,))
    return [row[0] for row in cursor.fetchall()]

def get_latest_meeting_description(cursor, lead_id):
    cursor.execute("""
        SELECT description
        FROM meetings
        WHERE related_to = %s
        ORDER BY COALESCE(updated_at, created_at) DESC
        LIMIT 1;
    """, (lead_id,))
    row = cursor.fetchone()
    return row[0] if row else None

def ensure_row_exists(cursor, lead_id):
    cursor.execute("INSERT IGNORE INTO sentiment_module (lead_id) VALUES (%s)", (lead_id,))

def upsert_meeting_sentiment(cursor, lead_id, sentiment):
    cursor.execute("""
        INSERT INTO sentiment_module (lead_id, sentiment_meetings)
        VALUES (%s, %s)
        ON DUPLICATE KEY UPDATE sentiment_meetings = VALUES(sentiment_meetings);
    """, (lead_id, sentiment))

# =========================
# Main pipeline (single fetch/classify; retry write once)
# =========================
def run_meeting_sentiment_pipeline():
    conn = None
    try:
        conn = connect_to_db()
        conn = ping(conn)

        # read cursor for listing leads
        read_cur = conn.cursor()
        lead_ids = get_all_meeting_leads(read_cur, limit=50)
        print(f"📅 Processing {len(lead_ids)} leads for meeting sentiment...")
        # print(f"🆔 Lead IDs: {lead_ids}")  # uncomment if needed

        for raw_lead_id in lead_ids:
            lead_id = (str(raw_lead_id) if raw_lead_id is not None else "").strip()
            if not lead_id:
                print("⚠️ Skipping blank lead_id")
                continue

            # --- Fetch once ---
            conn = ping(conn)
            read_cur = conn.cursor()
            description = get_latest_meeting_description(read_cur, lead_id)
            if not description:
                print(f"[Lead ID {lead_id}] No meeting description found.")
                continue

            # --- Classify once ---
            sentiment = classify_sentiment(description)
            print(f"[Lead ID {lead_id}] Sentiment (meeting): {sentiment}")

            # --- Write with a single retry on DB step only ---
            try:
                conn = ping(conn)
                write_cur = conn.cursor()
                ensure_row_exists(write_cur, lead_id)
                upsert_meeting_sentiment(write_cur, lead_id, sentiment)
                conn.commit()
                write_cur.close()
            except OperationalError as db_err:
                print(f"🔌 Write failed (will retry once): {db_err}")
                try:
                    # reconnect and retry only the WRITE
                    if conn:
                        try:
                            conn.close()
                        except Exception:
                            pass
                    conn = connect_to_db()
                    write_cur = conn.cursor()
                    ensure_row_exists(write_cur, lead_id)
                    upsert_meeting_sentiment(write_cur, lead_id, sentiment)
                    conn.commit()
                    write_cur.close()
                    print(f"✅ Write succeeded on retry for Lead ID {lead_id}.")
                except Exception as retry_err:
                    print(f"❌ Write retry failed for Lead ID {lead_id}: {retry_err}")
                    # do not re-fetch/classify; move to next lead
                    continue
            except Error as e:
                print(f"❌ DB error for Lead ID {lead_id}: {e}")
                continue

        try:
            read_cur.close()
        except Exception:
            pass

    finally:
        if conn:
            try:
                conn.close()
            except Exception:
                pass
        print("✅ Meeting sentiment classification completed and stored.")

# Run
if __name__ == "__main__":
    run_meeting_sentiment_pipeline()


Loading checkpoint shards:   0%|          | 0/2 [00:00<?, ?it/s]

📅 Processing 4 leads for meeting sentiment...
[Lead ID 101] Sentiment (meeting): Positive
[Lead ID 102] Sentiment (meeting): Positive
[Lead ID 103] Sentiment (meeting): Neutral
[Lead ID 104] Sentiment (meeting): Neutral
✅ Meeting sentiment classification completed and stored.
