## Main Code

### 0. Libraries

In [None]:
import vertexai
from vertexai.generative_models import GenerativeModel, GenerationConfig
from google.cloud import bigquery
from google.api_core.exceptions import NotFound
import json
import time
import pandas as pd
from datetime import timedelta

### 1. Configuration & Naming Standards

In [None]:
# ==========================================
# 1. CONFIGURATION & NAMING STANDARDS
# ==========================================
PROJECT_ID = "project-nirvana-405904"  # <--- REPLACE THIS
LOCATION = "us-central1"

# Versions
PROJECT_TAG = "csv"
DATA_VERSION = "006" # Current Synthetic Data TABLE VERSION
SCRIPT_VERSION = "005"
DESTINATION_TABLE_VERSION = "005"

# Resources
DATASET_ID = f"vel_{PROJECT_TAG}_schema"
# The validation table is deprecated; we now read directly from the unified transcript table
TRANSCRIPTION_TABLE = f"vel_{PROJECT_TAG}_synthetic_transcripts_{DATA_VERSION}"
DESTINATION_TABLE = f"vel_{PROJECT_TAG}_derived_signals_{DESTINATION_TABLE_VERSION}"

# Initialize Vertex AI and BigQuery
vertexai.init(project=PROJECT_ID, location=LOCATION)
client = bigquery.Client(project=PROJECT_ID)
model = GenerativeModel("gemini-2.5-flash")

# TAXONOMY
SIGNAL_TAXONOMY_LIST = [
    # Monetization
    "[Monetization > Ad Revenue Optimization] RPM",
    "[Monetization > Ad Revenue Optimization] CPM",
    "[Monetization > Ad Revenue Optimization] Geo Mix",
    "[Monetization > Ad Revenue Optimization] Seasonality",
    "[Monetization > Ad Revenue Optimization] Long Form vs Shorts",
    "[Monetization > Fan Funding Optimization] Channel Memberships",
    "[Monetization > Fan Funding Optimization] Super Chats",
    "[Monetization > Fan Funding Optimization] Super Thanks",
    "[Monetization > Fan Funding Optimization] Recurring Revenue",
    "[Monetization > Fan Funding Optimization] Churn",
    "[Monetization > Commerce Optimization] Shopping",
    "[Monetization > Commerce Optimization] Affiliate",
    "[Monetization > Commerce Optimization] Product Tagging",
    "[Monetization > Commerce Optimization] Conversion",
    "[Monetization > Brand Revenue Strategy] Brand Deals",
    "[Monetization > Brand Revenue Strategy] Brand Connect",
    "[Monetization > Brand Revenue Strategy] Sponsor Integrations",
    "[Monetization > Brand Revenue Strategy] Platform Ads",
    # Content & Formats
    "[Content & Formats > Shorts Strategy] Shorts growth vs revenue",
    "[Content & Formats > Shorts Strategy] Shorts Collab",
    "[Content & Formats > Shorts Strategy] Shorts Experimentation",
    "[Content & Formats > Live and Event Strategy] Live Streaming",
    "[Content & Formats > Live and Event Strategy] Premieres",
    "[Content & Formats > Live and Event Strategy] Redirect Strategy",
    "[Content & Formats > Live and Event Strategy] Real-Time Monetization",
    "[Content & Formats > Live and Event Strategy] Scheduled Launches",
    "[Content & Formats > Content Packaging] Titles, Thumbnails",
    "[Content & Formats > Content Packaging] Hooks",
    "[Content & Formats > Content Packaging] Chapters",
    "[Content & Formats > Content Packaging] Video Structure Strategy",
    # Tools and Policy
    "[Tools and Policy > Copyright and content] Claims vs Strikes",
    "[Tools and Policy > Copyright and content] Disputes",
    "[Tools and Policy > Copyright and content] Copyright Risk Management",
    "[Tools and Policy > Brand Safety and Ads] Yellow Icon",
    "[Tools and Policy > Brand Safety and Ads] Advertiser Suitability",
    "[Tools and Policy > Brand Safety and Ads] Self Certification",
    "[Tools and Policy > Brand Safety and Ads] Ad Restrictions",
    # Creator Health and Ops
    "[Creator Health and Ops > Sustainability and Ops] Burnout",
    "[Creator Health and Ops > Sustainability and Ops] Upload Cadence Stress",
    "[Creator Health and Ops > Sustainability and Ops] Team Scaling",
    "[Creator Health and Ops > Sustainability and Ops] Prod Workflow Strain",
    # Relationship and Strategic Support
    "[Relationship and Strategic Support > Strategic Partnership and Support] Feedback on SPM support",
    "[Relationship and Strategic Support > Strategic Partnership and Support] Need for Escalation",
    "[Relationship and Strategic Support > Strategic Partnership and Support] Milestone Logistics",
    "[Relationship and Strategic Support > Strategic Partnership and Support] Awards/Events",
    # Analytics and Growth
    "[Analytics and Growth > Retention and Discovery] Audience Retention Curves",
    "[Analytics and Growth > Retention and Discovery] Returning Viewers",
    "[Analytics and Growth > Retention and Discovery] Engagement Depth",
    "[Analytics and Growth > Retention and Discovery] Subscriber Conversion",
    "[Analytics and Growth > Traffic and Discovery] Browse vs Search vs Suggested",
    "[Analytics and Growth > Traffic and Discovery] Growth Volatility",
    "[Analytics and Growth > Traffic and Discovery] Traffic Source Dependency",
    "[Analytics and Growth > Topic and Demand Discovery] Research Tab",
    "[Analytics and Growth > Topic and Demand Discovery] Keyword Demand",
    "[Analytics and Growth > Topic and Demand Discovery] Content Ideation off Audience",
    "[Analytics and Growth > Topic and Demand Discovery] Search Trends Buff",
    "[Analytics and Growth > Performance Metric Interpretation] Confusion/Discussion on RPM vs CPM",
    "[Analytics and Growth > Performance Metric Interpretation] Impression",
    "[Analytics and Growth > Performance Metric Interpretation] CTR",
    "[Analytics and Growth > Performance Metric Interpretation] Analytics Insights to Gaps"
]


### 2. The system prompt

In [None]:
# ==========================================
# 2. THE SYSTEM PROMPT
# ==========================================
PROMPT_SIGNAL_EXTRACTOR = f"""
You are a Senior Data and Behavior Analyst at YouTube. Your job is to audit call transcripts between a Strategic Partner Manager (SPM) and a Content Creator.
Do not assume anything about the creator's mood or channel persona; deduce everything purely from the dialogue.

You have three primary objectives:

1. ORGANIC SIGNAL DETECTION (STRICT TAXONOMY):
   - You MUST extract signals based ONLY on the following predefined list. Each item is formatted as "[Category > Sub-Category] Topic".
   {json.dumps(SIGNAL_TAXONOMY_LIST, indent=2)}
   - Ignore any topic discussed that does not map perfectly to one of these Specific Topics.

2. SIGNAL CLASSIFICATION & SENTIMENT:
   - **Actionability:** Determine if the issue is "Actionable" (YouTube/SPM can intervene) or "Non-Actionable".
   - **Sentiment:** Classify the creator's tone regarding the specific topic as "Positive", "Negative", or "Neutral".

3. SPM SCORING:
   - Rate the SPM's effectiveness (1-100) based on Empathy, Clarity, and Resolution.
   - **Agenda Adherence Penalty:** Compare the topics the SPM stated they would discuss against what was ACTUALLY covered. Heavily penalize the score if the SPM claims to have discussed topics that were not covered (e.g., stated 5 topics but only covered 2).

4. CRITICAL JSON FORMATTING RULES:
   - NEVER use double quotes ("") inside any of your text values.
   - If you need to quote the creator or emphasize a word, you MUST use single quotes ('') instead to avoid breaking the JSON structure.
   - Do not include newline characters (\n) inside your text values.

OUTPUT FORMAT (STRICT JSON):
Respond ONLY with a valid JSON object. Extract the Category, Sub-Category, and Topic from the brackets in the list.
{{
  "spm_score": [0-100],
  "spm_reasoning": "Explanation of score including Empathy and Agenda Adherence.",
  "signals": [
    {{
      "signal_category": "Exact Category from the brackets",
      "signal_sub_category": "Exact Sub-Category from the brackets",
      "signal_name": "Exact Topic Name outside the brackets",
      "signal_sentiment": "Positive | Negative | Neutral",
      "signal_actionability": "Actionable | Non-Actionable",
      "signal_description": "Brief description of the issue",
      "signal_evidence": "Verbatim quote from the creator"
    }}
  ],
  "recommended_next_step": "Suggested action..."
}}
"""

### 3. Data ingestion (read from bq)

In [None]:
# ==========================================
# 3. DATA INGESTION (READ FROM BQ)
# ==========================================
def get_transcripts_from_bq(limit=50):
    """
    Fetches transcripts from BigQuery preventing data leakage.
    Only permitted columns are queried. Duplicates are filtered out.
    """
    table_id = f"{PROJECT_ID}.{DATASET_ID}.{DESTINATION_TABLE}"

    filter_already_processed = f"""
        AND conversation_id NOT IN (
            SELECT DISTINCT conversation_id FROM `{table_id}`
        )
    """

    try:
        client.get_table(table_id)
        print(f"üîç Destination table detected. Filtering out already processed records...")
    except NotFound:
        print(f"‚ÑπÔ∏è Destination table does not exist yet. Processing all available records.")
        filter_already_processed = ""

    # No JOIN needed anymore. Reading only allowed columns to prevent Data Leakage.
    query = f"""
        SELECT
            conversation_id,
            channel_name,
            creator_niche,
            creator_region,
            spm_name,
            duration_minutes,
            raw_transcript
        FROM `{PROJECT_ID}.{DATASET_ID}.{TRANSCRIPTION_TABLE}`
        WHERE is_valid = True
        {filter_already_processed}
        LIMIT {limit}
    """

    try:
        print(f"üì• Fetching data from BigQuery...")
        df = client.query(query).to_dataframe()
        return df
    except Exception as e:
        print(f"‚ùå Error reading from BigQuery: {e}")
        return pd.DataFrame()

### 4. Analysis Engine (LLM processing)

In [None]:
# ==========================================
# 4. ANALYSIS ENGINE (LLM PROCESSING)
# ==========================================
def analyze_transcripts(df):
    results = []
    if df.empty:
        return pd.DataFrame()

    print(f"üß† Analyzing {len(df)} rows with Gemini (Sequential)...")

    for index, row in df.iterrows():
        try:
            user_prompt = f"""
            METADATA:
            - CONVERSATION_ID: {row['conversation_id']}
            - CHANNEL NAME: {row['channel_name']}
            - CREATOR NICHE: {row['creator_niche']}
            - CREATOR REGION: {row['creator_region']}
            - SPM NAME: {row['spm_name']}
            - DURATION (MIN): {row['duration_minutes']}

            TRANSCRIPT TEXT:
            {row['raw_transcript']}
            """

            # --- API Call ---
            response = model.generate_content(
                [PROMPT_SIGNAL_EXTRACTOR, user_prompt],
                generation_config=GenerationConfig(
                    temperature=0.0,
                    response_mime_type="application/json"
                )
            )

            # --- Clean up raw text before parsing ---
            raw_text = response.text.strip()

            # Remove Markdown code blocks if the LLM hallucinated them
            if raw_text.startswith("```json"):
                raw_text = raw_text[7:]
            if raw_text.startswith("```"):
                raw_text = raw_text[3:]
            if raw_text.endswith("```"):
                raw_text = raw_text[:-3]

            raw_text = raw_text.strip()

            # Parse the JSON string returned by Gemini
            analysis = json.loads(response.text)
            signals = analysis.get('signals', [])

            # --- NEW LOGIC: Handle cases where no signals were detected ---
            if not signals:
                results.append({
                    "conversation_id": row['conversation_id'],
                    "channel_name": row['channel_name'],
                    "creator_niche": row['creator_niche'],
                    "creator_region": row['creator_region'],
                    "spm_name": row['spm_name'],
                    "duration_minutes": float(row['duration_minutes']) if pd.notnull(row['duration_minutes']) else None,

                    # Keep the SPM evaluation data intact
                    "spm_score": analysis.get('spm_score'),
                    "spm_reasoning": analysis.get('spm_reasoning'),

                    # Explicitly set signal mapping fields to None (NULL in BigQuery)
                    "signal_category": None,
                    "signal_sub_category": None,
                    "signal_name": None,
                    "signal_sentiment": None,
                    "signal_actionability": None,
                    "signal_description": "No organic signals detected matching the strict taxonomy.",
                    "signal_evidence": None,

                    "recommended_action": analysis.get('recommended_next_step')
                })

            # --- ORIGINAL LOGIC: If signals exist, create one row per signal ---
            else:
                for signal in signals:
                    results.append({
                        "conversation_id": row['conversation_id'],
                        "channel_name": row['channel_name'],
                        "creator_niche": row['creator_niche'],
                        "creator_region": row['creator_region'],
                        "spm_name": row['spm_name'],
                        "duration_minutes": float(row['duration_minutes']) if pd.notnull(row['duration_minutes']) else None,

                        "spm_score": analysis.get('spm_score'),
                        "spm_reasoning": analysis.get('spm_reasoning'),

                        # Map the detected signal fields
                        "signal_category": signal.get('signal_category'),
                        "signal_sub_category": signal.get('signal_sub_category'),
                        "signal_name": signal.get('signal_name'),
                        "signal_sentiment": signal.get('signal_sentiment'),
                        "signal_actionability": signal.get('signal_actionability'),
                        "signal_description": signal.get('signal_description'),
                        "signal_evidence": signal.get('signal_evidence'),

                        "recommended_action": analysis.get('recommended_next_step')
                    })

            print(f"  -> Processed {row['conversation_id']} | Signals found: {len(analysis.get('signals', []))}")
            time.sleep(0.5)

        except Exception as e:
            print(f"‚ùå Error analyzing {row['conversation_id']}: {e}")

            # To check the error
            if 'response' in locals():
                print(f"Texto crudo devuelto por Gemini:\n{response.text}\n")

            continue

    return pd.DataFrame(results)

### 5. Upload Results to BigQuery

In [None]:
# ==========================================
# 5. UPLOAD RESULTS TO BIGQUERY
# ==========================================
def initialize_destination_table():
    """
    Creates the destination table using a flat schema optimized for LLM/MCP SQL agents.
    """
    table_ref = f"{PROJECT_ID}.{DATASET_ID}.{DESTINATION_TABLE}"

    schema = [
        # Call Metadata
        bigquery.SchemaField("conversation_id", "STRING", mode="REQUIRED"),
        bigquery.SchemaField("channel_name", "STRING"),
        bigquery.SchemaField("creator_niche", "STRING"),
        bigquery.SchemaField("creator_region", "STRING"),
        bigquery.SchemaField("spm_name", "STRING"),
        bigquery.SchemaField("duration_minutes", "FLOAT"),

        # SPM Evaluation
        bigquery.SchemaField("spm_score", "INTEGER"),
        bigquery.SchemaField("spm_reasoning", "STRING"),

        # Signal Extraction (Categorical limits for easy SQL GROUP BY)
        bigquery.SchemaField("signal_category", "STRING"),
        bigquery.SchemaField("signal_sub_category", "STRING"),
        bigquery.SchemaField("signal_name", "STRING"),
        bigquery.SchemaField("signal_sentiment", "STRING"),
        bigquery.SchemaField("signal_actionability", "STRING"),
        bigquery.SchemaField("signal_description", "STRING"),
        bigquery.SchemaField("signal_evidence", "STRING"),

        # Next Steps & Audit
        bigquery.SchemaField("recommended_action", "STRING"),
        bigquery.SchemaField("processed_at", "TIMESTAMP")
    ]

    try:
        client.get_table(table_ref)
        print(f"‚úÖ Destination table {DESTINATION_TABLE} exists.")
    except NotFound:
        print(f"‚ö†Ô∏è Destination table not found. Creating {DESTINATION_TABLE}...")
        table = bigquery.Table(table_ref, schema=schema)
        client.create_table(table)
        print("‚úÖ Table created successfully.")

def upload_signals_to_bq(df):
    if df.empty:
        print("‚ö†Ô∏è No signals detected. Nothing to upload.")
        return

    # Add processing timestamp
    df['processed_at'] = pd.Timestamp.now()

    table_ref = f"{PROJECT_ID}.{DATASET_ID}.{DESTINATION_TABLE}"

    job_config = bigquery.LoadJobConfig(
        write_disposition="WRITE_APPEND",
        # Use the exact same schema defined in the init function
        schema=[
            bigquery.SchemaField("conversation_id", "STRING", mode="REQUIRED"),
            bigquery.SchemaField("channel_name", "STRING"),
            bigquery.SchemaField("creator_niche", "STRING"),
            bigquery.SchemaField("creator_region", "STRING"),
            bigquery.SchemaField("spm_name", "STRING"),
            bigquery.SchemaField("duration_minutes", "FLOAT"),
            bigquery.SchemaField("spm_score", "INTEGER"),
            bigquery.SchemaField("spm_reasoning", "STRING"),
            bigquery.SchemaField("signal_category", "STRING"),
            bigquery.SchemaField("signal_sub_category", "STRING"),
            bigquery.SchemaField("signal_name", "STRING"),
            bigquery.SchemaField("signal_sentiment", "STRING"),
            bigquery.SchemaField("signal_actionability", "STRING"),
            bigquery.SchemaField("signal_description", "STRING"),
            bigquery.SchemaField("signal_evidence", "STRING"),
            bigquery.SchemaField("recommended_action", "STRING"),
            bigquery.SchemaField("processed_at", "TIMESTAMP")
        ]
    )

    print(f"üöÄ Uploading {len(df)} detected signals to {table_ref}...")
    job = client.load_table_from_dataframe(df, table_ref, job_config=job_config)
    job.result()
    print("‚úÖ Upload Complete!")

## Run Main

In [None]:
if __name__ == "__main__":
    start_time = time.perf_counter()

    # 1. Initialize Destination Table (Crucial Step)
    initialize_destination_table()

    # 2. Read transcriptions (Cleaned to avoid Data Leakage)
    df_transcripts = get_transcripts_from_bq(limit=150)
    end_time_read = time.perf_counter()

    # 3. Analyze with LLM
    df_signals = analyze_transcripts(df_transcripts)
    end_time_llm = time.perf_counter()

    # 4. Save Results
    upload_signals_to_bq(df_signals)
    end_time = time.perf_counter()

    # Execution times
    duration_read = str(timedelta(seconds=end_time_read - start_time))
    duration_llm = str(timedelta(seconds=end_time_llm - end_time_read))
    duration_save = str(timedelta(seconds=end_time - end_time_llm))
    duration_total = str(timedelta(seconds=end_time - start_time))

    print(f"Read BigQuery execution time (HH:MM:SS): {duration_read}")
    print(f"LLM Analysis execution time (HH:MM:SS): {duration_llm}")
    print(f"Save signals to BigQuery execution time (HH:MM:SS): {duration_save}")
    print(f"Total execution time (HH:MM:SS): {duration_total}")

    # Preview
    print("\n--- PREVIEW OF DETECTED SIGNALS ---")
    try:
      print(df_signals[['spm_name', 'signal_name', 'signal_sentiment', 'spm_score']].head().to_markdown(index=False))
    except:
      print(df_signals.head().to_markdown(index=False))

In average it takes:

~2 to 4 seconds to extract the data from bigquery

~43.5 seconds to extract signals from a transcription

~3.5 seconds to upload extracted signals to bigquery