In [24]:
# Step 1 – Environment setup: load libraries, configure switches, grab API keys.
import asyncio
import csv
import json
import os
import time
from pathlib import Path
from threading import Thread
from typing import Dict, List, Optional, Tuple
import pandas as pd
from dotenv import find_dotenv, load_dotenv
from groq import AsyncGroq

# Optional imports: only required when you switch to the real production data flow.
try:
    from database import GCS_SQL_CONNECTION, connect_with_connector, database  # type: ignore
    from ds.data_loading_utils import convert_to_json, optimize_json, query_user_data  # type: ignore
except ImportError:
    GCS_SQL_CONNECTION = None
    connect_with_connector = None
    database = None
    convert_to_json = None
    optimize_json = None
    query_user_data = None

# keep things simple by using the sample CSV first.
DATA_SOURCE = "sample"  # Switch to "actual" only after wiring up database helpers.
SAMPLE_DATA_PATH = "user_features.csv"  # Demo dataset bundled with the repo.
OUTPUT_CSV_PATH = "personalized_messages.csv"  # Messages are appended here after every run.

if DATA_SOURCE not in {"sample", "actual"}:
    raise ValueError("DATA_SOURCE must be either 'sample' or 'actual'.")

OUTPUT_CSV_PATH = str(Path(OUTPUT_CSV_PATH))

_ = load_dotenv(find_dotenv())  # Load secrets from .env (no-op if the file does not exist).

GROQ_API_KEY = os.environ.get("GROQ_API_KEY")
GROQ_MODEL_VERSION = "llama-3.3-70b-versatile"
INTENT_TO_CONNECT = "networking purposes"

version = 'baseline'

In [25]:
# Step 2 – Sample data helpers (used when DATA_SOURCE == "sample").
def load_sample_profiles(path: str = SAMPLE_DATA_PATH) -> Tuple[int, List[int], Dict[int, Dict[str, object]]]:
    """Load six demo rows from the CSV and split them into requester + targets."""
    df = pd.read_csv(path).fillna("")
    if len(df) < 6:
        raise ValueError("Sample dataset must contain at least 6 rows.")

    df["user_id"] = df["user_id"].astype(int)
    primary_user_row = df.iloc[0]
    target_rows = df.iloc[1:6]
    primary_user_id = int(primary_user_row["user_id"])
    target_ids = [int(uid) for uid in target_rows["user_id"].tolist()]

    profile_map: Dict[int, Dict[str, object]] = {}
    for _, row in df.iloc[:6].iterrows():
        profile_map[int(row["user_id"])] = row.to_dict()
    return primary_user_id, target_ids, profile_map


def prepare_sample_context() -> Tuple[int, str, List[int], Dict[int, Dict[str, object]]]:
    """Prepare the exact data structure the main pipeline expects in sample mode."""
    primary_user_id, sample_targets, profiles = load_sample_profiles()
    for key, profile in profiles.items():
        profile["USER_ID"] = key
    user_id = str(primary_user_id)
    return primary_user_id, user_id, sample_targets, profiles


In [26]:
# Step 3 – Prompt templates and Groq helpers.
PROMPT_NETWORKING = r"""
You are a networking expert who can connect with anyone.
Please compose a concise and compelling introductory message for me to reach out to a person on LinkedIn for {intent_to_connect}.
Highlight our shared educational background, professional experiences, and mutual research interests to increase the likelihood of receiving a positive response.
Only mention commonalities that are explicitly present in both profiles.
If there is no clear shared background in education, work experience, or research interests, do not fabricate or assume any connections.
Keep the message concise and to the point.

User would give you two profiles: my_profile and target_profile.
my_profile is the user's profile. target_profile is the profile for the user they want to connect.

Please write a refined message that effectively communicates our shared background and interests to establish a meaningful connection for potential networking opportunities.
Keep the message concise and simple, utilizing the best networking tips available in the world.
Only mention commonalities that are explicitly present in both profiles.
If there is no clear shared background, do not fabricate or assume any connections.
Also, include a signoff note. Warm up the tone to make a more personal connection with the reader.
Only return the messages and nothing else. Directly starts with message contents.
"""

PROMPT_NETWORKING_NORUID = r"""
You are a networking expert who can connect with anyone.
Please compose a concise and compelling introductory message for me to reach out to a person on LinkedIn for {intent_to_connect}.
Highlight ONLY the MAJOR educational background, professional experiences, OR research interests to increase the likelihood of receiving a positive response.
Only mention things that are explicitly present in the profile.
If there is no clear background in education, work experience, or research interests, do not fabricate or assume anything.
Keep the message concise and to the point.

User would give you one profile: target_profile.
target_profile is the profile for the user they want to connect.

Please write a refined message that effectively communicates major background and interests to establish a meaningful connection for potential networking opportunities.
Keep the message concise and simple, utilizing the best networking tips available in the world.
Only mention things that are explicitly present in the profile.
If there is no clear background in education, work experience, or research interests, do not fabricate or assume anything.
Also, include a signoff note. Warm up the tone to make a more personal connection with the reader.
Only return the messages and nothing else. Directly starts with message contents.
"""


def get_groq_client() -> AsyncGroq:
    """Create the Groq client after verifying that the API key is present."""
    if not GROQ_API_KEY:
        raise EnvironmentError("GROQ_API_KEY is not set. Please configure your environment.")
    return AsyncGroq(api_key=GROQ_API_KEY)


def build_prompt(no_ruid: bool, intent_to_connect: str, payload: Dict[str, object]) -> List[Dict[str, str]]:
    """Assemble the system + user messages the Groq chat endpoint expects."""
    if no_ruid:
        system_prompt = PROMPT_NETWORKING_NORUID.format(intent_to_connect=intent_to_connect)
    else:
        system_prompt = PROMPT_NETWORKING.format(intent_to_connect=intent_to_connect)

    return [
        {"role": "system", "content": system_prompt},
        {"role": "user", "content": json.dumps(payload, ensure_ascii=False)},
    ]


async def request_groq_completion(client: AsyncGroq, messages: List[Dict[str, str]]) -> str:
    """Send a chat completion request and return the plain-text answer."""
    response = await client.chat.completions.create(
        messages=messages,
        model=GROQ_MODEL_VERSION,
        temperature=0,
        seed=0,
    )
    return response.choices[0].message.content.strip()


async def create_networking_message_groq(
    client: AsyncGroq,
    user_profile: Optional[Dict[str, object]],
    target_profile: Dict[str, object],
    no_ruid: bool,
    intent_to_connect: str = INTENT_TO_CONNECT,
) -> str:
    """Generate the personalized networking message for a single target profile."""
    if no_ruid:
        payload = {"target_profile": target_profile}
    else:
        payload = {"my_profile": user_profile, "target_profile": target_profile}
    messages = build_prompt(no_ruid=no_ruid, intent_to_connect=intent_to_connect, payload=payload)
    return await request_groq_completion(client, messages)


In [27]:
# Step 4 – Convenience helper to store every generated message inside a CSV file.
def save_messages_to_csv(
    request_id: int,
    ru_id: int,
    user_id: str,
    messages: Dict[int, str],
    intent_to_connect: str,
    output_path: str = OUTPUT_CSV_PATH,
) -> None:
    """Append each generated message to a CSV file so non-technical users can read it."""
    rows = []
    for target_id, message in messages.items():
        rows.append(
            {
                "request_id": request_id,
                "ru_id": ru_id,
                "user_id": user_id,
                "target_user_id": target_id,
                "intent_to_connect": intent_to_connect,
                "message": message,
                "version": version,
            }
        )

    file_exists = Path(output_path).exists()
    with open(output_path, mode="a", encoding="utf-8", newline="") as csv_file:
        writer = csv.DictWriter(
            csv_file,
            fieldnames=[
                "request_id",
                "ru_id",
                "user_id",
                "target_user_id",
                "intent_to_connect",
                "message",
                "version"
            ],
        )
        if not file_exists:
            writer.writeheader()
        writer.writerows(rows)


In [28]:
# Step 5 – The main orchestration function used by both sample and real data flows.
async def get_personalized_message(
    request_id: int,
    ru_id: Optional[int],
    user_id: Optional[str],
    target_user_list: Optional[List[int]],
    conn=None,
    intent_to_connect: str = INTENT_TO_CONNECT,
    userPositions=None,
    userEducations=None,
    userData=None,
    data_source: str = DATA_SOURCE,
) -> Dict[str, object]:
    """Collect data, call Groq, and save the output messages to disk."""
    if data_source not in {"sample", "actual"}:
        raise ValueError("data_source must be either 'sample' or 'actual'.")

    if data_source == "sample":
        ru_id, user_id, target_user_list, user_dict = prepare_sample_context()
        no_ruid = False
        query_time = 0.0
    else:
        if query_user_data is None or convert_to_json is None or optimize_json is None:
            raise ImportError(
                "Actual data mode requires database and data loading helpers to be installed."
            )
        if not target_user_list:
            raise ValueError("target_user_list must be provided when DATA_SOURCE='actual'.")
        if user_id is None:
            raise ValueError("user_id must be provided when DATA_SOURCE='actual'.")

        query_time_start = time.time()
        try:
            conn_internal = conn if conn else GCS_SQL_CONNECTION
            all_users = target_user_list + [user_id]
            user_data_result = await query_user_data(
                conn_internal,
                all_users,
                userData=userData,
                userPositions=userPositions,
                userEducations=userEducations,
            )
            user_data, positions_data, education_data, no_ruid = user_data_result
        except Exception as exc:
            raise RuntimeError(f"Failed to query user data: {exc}") from exc

        query_time_end = time.time()
        query_time = query_time_end - query_time_start
        user_dict = convert_to_json(user_data, positions_data, education_data)
        user_dict = optimize_json(user_dict)
        for key in user_dict.keys():
            user_dict[key]["USER_ID"] = key

    client = get_groq_client()
    messages: Dict[int, str] = {}
    message_gen_time_start = time.time()

    for user in target_user_list:
        try:
            target_profile = user_dict.get(user)
            if not target_profile:
                messages[user] = ""
                continue

            base_profile = user_dict.get(ru_id) if ru_id is not None else None
            if base_profile is None and user_id is not None:
                try:
                    fallback_id = int(user_id)
                    base_profile = user_dict.get(fallback_id)
                except ValueError:
                    base_profile = None

            use_no_ruid = no_ruid or base_profile is None
            generated_message = await create_networking_message_groq(
                client=client,
                user_profile=base_profile,
                target_profile=target_profile,
                no_ruid=use_no_ruid,
                intent_to_connect=intent_to_connect,
            )
            messages[user] = generated_message
        except Exception as exc:
            messages[user] = ""
            print(f"Error generating message for {user}: {exc}")

    message_gen_time_end = time.time()
    message_gen_time = message_gen_time_end - message_gen_time_start

    save_messages_to_csv(
        request_id=request_id,
        ru_id=ru_id if ru_id is not None else -1,
        user_id=user_id if user_id is not None else "sample-user",
        messages=messages,
        intent_to_connect=intent_to_connect,
    )

    empty_message_count = sum(1 for msg in messages.values() if not msg)
    status = "SUCCESS" if empty_message_count < len(target_user_list) else "FAIL"

    return {
        "Request_ID": request_id,
        "Messages": messages,
        "Status": status,
        "Query_Time": query_time,
        "Message_Generation_Time": message_gen_time,
        "Empty_Message_Count": empty_message_count,
    }


In [29]:
# Step 6 – Quick smoke test for learners. Safe to rerun multiple times.
if DATA_SOURCE == "sample":
    async def _run_sample_preview():
        return await get_personalized_message(
            request_id=123,
            ru_id=None,
            user_id=None,
            target_user_list=None,
            intent_to_connect=INTENT_TO_CONNECT,
            data_source=DATA_SOURCE,
        )

    try:
        try:
            sample_result = asyncio.run(_run_sample_preview())
        except RuntimeError:
            result_box: Dict[str, Dict[str, object]] = {}

            def threaded_run():
                result_box["value"] = asyncio.run(_run_sample_preview())

            worker = Thread(target=threaded_run, daemon=True)
            worker.start()
            worker.join()
            sample_result = result_box["value"]

        print(json.dumps(sample_result, indent=2, ensure_ascii=False))
        print(f"Messages saved to {OUTPUT_CSV_PATH}.")
    except Exception as exc:
        print(f"Sample run failed: {exc}")
else:
    print(
        "Switch DATA_SOURCE to 'actual' and rerun the notebook once the live data"
        " dependencies are available."
    )


{
  "Request_ID": 123,
  "Messages": {
    "56419714": "Hi, I came across your profile and noticed we share a similar background, having both studied at New York University and currently working in the Computer Software industry in New York City. I'd love to connect and explore potential networking opportunities. \n\nBest regards,",
    "136951372": "Hi, I came across your profile and noticed we share a common background in Computer Science, having both studied at New York University. We also have a similar industry focus, with both of us working in Computer Software. I'd love to connect and explore potential networking opportunities. Looking forward to hearing from you.\nBest regards,",
    "140902986": "Hi, I came across your profile and noticed we share a connection to New York University and are both based in New York City, which is really cool. We also both work in the Computer Software industry, and I think our experiences could lead to some interesting conversations. I'd love to

  sample_result = result_box["value"]
