In [None]:
#install
!pip install google-generativeai==0.7.2
!pip install langchain-community
!pip install crewai
!pip install requests
!pip install beautifulsoup4
!pip install pandas

Collecting langchain-community
  Downloading langchain_community-0.3.27-py3-none-any.whl.metadata (2.9 kB)
Collecting dataclasses-json<0.7,>=0.5.7 (from langchain-community)
  Downloading dataclasses_json-0.6.7-py3-none-any.whl.metadata (25 kB)
Collecting pydantic-settings<3.0.0,>=2.4.0 (from langchain-community)
  Downloading pydantic_settings-2.10.1-py3-none-any.whl.metadata (3.4 kB)
Collecting httpx-sse<1.0.0,>=0.4.0 (from langchain-community)
  Downloading httpx_sse-0.4.1-py3-none-any.whl.metadata (9.4 kB)
Collecting marshmallow<4.0.0,>=3.18.0 (from dataclasses-json<0.7,>=0.5.7->langchain-community)
  Downloading marshmallow-3.26.1-py3-none-any.whl.metadata (7.3 kB)
Collecting typing-inspect<1,>=0.4.0 (from dataclasses-json<0.7,>=0.5.7->langchain-community)
  Downloading typing_inspect-0.9.0-py3-none-any.whl.metadata (1.5 kB)
Collecting python-dotenv>=0.21.0 (from pydantic-settings<3.0.0,>=2.4.0->langchain-community)
  Downloading python_dotenv-1.1.1-py3-none-any.whl.metadata (24 k



## Working Code

In [None]:
import os
import json
import re
import pandas as pd
from langchain_community.utilities import GoogleSerperAPIWrapper
import google.generativeai as genai

import time
import requests

def safe_enrich(func, *args, retries=3, delay=2, **kwargs):
    for attempt in range(retries):
        try:
            return func(*args, **kwargs)
        except (requests.ConnectionError, requests.exceptions.RequestException) as e:
            print(f"Connection error: {e}. Retrying in {delay} seconds...")
            time.sleep(delay)
    print("Failed after retries.")
    return {}

# --- SETUP ---
os.environ["GOOGLE_API_KEY"] = os.getenv("GOOGLE_API_KEY", "YOUR-KEY")
os.environ["SERPER_API_KEY"] = os.getenv("SERPER_API_KEY", "YOUR-KEY")
genai.configure(api_key=os.environ["GOOGLE_API_KEY"])
gemini_model = genai.GenerativeModel('gemini-2.5-flash')
serper = GoogleSerperAPIWrapper()
search = GoogleSerperAPIWrapper()

def extract_json(text):
    cleaned = re.sub(r"^```json\s*|```$", "", text.strip(), flags=re.MULTILINE)
    try:
        return json.loads(cleaned)
    except json.JSONDecodeError as e:
        print(f"JSON decode error: {e}")
        print(f"Cleaned text:\n{cleaned}")
        return {}

def format_name(name):
    name = re.sub(r'\s+', ' ', str(name).replace('\n', ' ')).strip().strip('"').strip("'")
    corp_terms = ['llc', 'inc', 'corp', 'ltd', 'company']
    if any(term in name.lower() for term in corp_terms):
        return name
    if ',' in name and name.count(',') == 1:
        last, first = name.split(',', 1)
        name = f"{first.strip()} {last.strip()}"
    return name

def clean_dataframe(df):
    df = df.copy()
    df.loc[:, 'Franchisee'] = df['Franchisee'].apply(format_name)
    df.loc[:, 'State'] = df['City'].str.strip() + ', ' + df['State'].str.strip()
    if 'FDD' in df.columns:
        df.loc[:, 'Franchise Name'] = df['FDD']
    return df.dropna(axis=1, how='all')

def search_web(query: str):
    results = search.results(query)

    # if isinstance(results, dict) and results.get('organic'):
    #     first_result = results['organic'][0]
    #     snippet = first_result.get('snippet', '')
    #     url = first_result.get('link', '')
    return results
    # return "", ""

def classify_franchisees(names):
    prompt = f"""Classify the following names as 'Individual' or 'Corporate':
{names}

Rules:
- Individual = Person name (John Doe)
- Corporate = Includes LLC, Inc, Ltd, Corp, Company, etc.

Return only valid JSON, in this format:
[{{"name": "Name", "type": "Individual/Corporate"}}]
No explanation. No markdown. No text before or after. If you cannot classify, return [].
"""
    response = gemini_model.generate_content(prompt)
    text = response.text.strip()
    #print("Gemini classification raw output:", repr(text))  # Debug print
    data = extract_json(text)
    if not data:
        print("Gemini returned empty or invalid output. Falling back to empty DataFrame.")
        return pd.DataFrame(columns=["name", "type"])
    return pd.DataFrame(data)

def enrich_individual(name, franchise, state):
    company_query = f"What company does {name} own that holds a {franchise} franchise in {state}?"
    snippet = search_web(company_query)
    #print(snippet)
    legal_name_prompt = f"From the following text, return the company name owned by {name} that is associated with {franchise}. If not found, return just '{name}'. No explanation.\n\n{snippet}"
    legal_name = gemini_model.generate_content(legal_name_prompt).text.strip()

    details_query = f"Details of {legal_name} in {state}, include corporate address, phone, and email"
    snippet2 = search_web(details_query)

    extract_prompt = f"""From the {snippet2}, extract corporate details along with Source URLs used for enrichment. return JSON:

{{
  "legal_corporate_name": "{legal_name}",
  "corporate_address": "",
  "corporate_phone": "",
  "corporate_email": "",
  "owner_name": "{name}",
  "linkedin_url": "",
  "Source URLs used for enrichment": ""
}}
No backtics(```),No markdown. Return only JSON"""
    response = gemini_model.generate_content(extract_prompt)
    text = response.text.strip()
    result = extract_json(text)

    return result

def enrich_corporate(name, state):
    query = f"Who owns or manages {name} in {state}?"
    snippet = search_web(query)
    extract_prompt = f"""From the following, extract business info along with Source URLs used for enrichment. return JSON:
{snippet}

{{
  "legal_corporate_name": "{name}",
  "corporate_address": "",
  "corporate_phone": "",
  "corporate_email": "",
  "owner_name": "",
  "linkedin_url": "",
  "Source URLs used for enrichment": ""
}}.
No backtics and markdown."""
    response = gemini_model.generate_content(extract_prompt)
    text = response.text.strip()
    print(f"Raw Gemini response:\n{text}")
    result = extract_json(text)
    # Always set the source_urls field to the actual URL

    return result

def enrich_all(df):
    df = clean_dataframe(df)
    names = df['Franchisee'].tolist()
    classifications_df = classify_franchisees(names)
    enriched_rows = []
    for _, row in df.iterrows():
        name = row['Franchisee']
        entity = classifications_df[classifications_df['name'].str.lower() == name.lower()]
        entity_type = entity['type'].values[0] if not entity.empty else "Unknown"
        state = row['State']
        franchise = row.get('Franchise Name', '')

        if entity_type == "Individual":
            enriched = safe_enrich(enrich_individual, name, franchise, state)
        elif entity_type == "Corporate":
            enriched = safe_enrich(enrich_corporate, name, state)
        else:
            enriched = {
                "legal_corporate_name": name,
                "corporate_address": "N/A",
                "corporate_phone": "N/A",
                "corporate_email": "N/A",
                "owner_name": "N/A",
                "linkedin_url": "N/A",
                "Source URLs used for enrichment": "N/A"
            }

        if isinstance(enriched, list):
            if len(enriched) > 0 and isinstance(enriched[0], dict):
                enriched = enriched[0]
            else:
                enriched = {}
        elif not isinstance(enriched, dict):
            enriched = {}

        default_keys = ["legal_corporate_name", "corporate_address", "corporate_phone", "corporate_email", "owner_name", "linkedin_url", "Source URLs used for enrichment"]
        for key in default_keys:
            if key not in enriched:
                enriched[key] = "N/A"

        enriched_rows.append({**row.to_dict(), "Type": entity_type, **enriched})

    return pd.DataFrame(enriched_rows)

def batch_iterable(iterable, batch_size):
    l = len(iterable)
    for ndx in range(0, l, batch_size):
        yield iterable[ndx:min(ndx + batch_size, l)]

if __name__ == "__main__":
    df = pd.read_excel("Golden Chick_DE_Takehome.xlsx")
    print("Original Sample:")
    print(df[['Franchisee', 'City', 'State']].head())
    enriched_rows = []
    for batch in batch_iterable(df.index.tolist(), 10):
        batch_df = df.loc[batch]
        enriched_batch = enrich_all(batch_df)
        enriched_rows.append(enriched_batch)
    enriched_df = pd.concat(enriched_rows, ignore_index=True)
    enriched_df.to_excel("enriched_franchisees_gemini.xlsx", index=False)
    print("\nSaved enriched data to enriched_franchisees_gemini.xlsx")
    print(enriched_df[['Franchisee', 'Type', 'owner_name', 'legal_corporate_name', 'corporate_address', 'Source URLs used for enrichment']].head())


Original Sample:
                      Franchisee      City State
0                    Hagan, John      Waco    TX
1                    Hagan, John     Brady    TX
2        Adam Fried Chicken, LLC     Plano    TX
3               Kallos 153, Inc.  McKinney    TX
4  AIG-A Foods Enterprises,\nLLC     Acton    TX
Raw Gemini response:
{
  "legal_corporate_name": "Adam Fried Chicken, LLC",
  "corporate_address": "1718 14th St, Plano, TX 75074, US",
  "corporate_phone": "",
  "corporate_email": "",
  "owner_name": "",
  "linkedin_url": "",
  "Source URLs used for enrichment": "https://www.mapquest.com/us/texas/adam-fried-chicken-420442742"
}
Raw Gemini response:
{
  "legal_corporate_name": "Kallos 153, Inc.",
  "corporate_address": "",
  "corporate_phone": "",
  "corporate_email": "",
  "owner_name": "",
  "linkedin_url": "",
  "Source URLs used for enrichment": ""
}
Raw Gemini response:
{
  "legal_corporate_name": "AIG FOODS ENTERPRISES LLC",
  "corporate_address": "Glen Rose, TX",
  "corpor

In [11]:
# Fill NAs if NA in  /content/enriched_franchisees_gemini.xlsx
import pandas as pd

# Load the Excel file into a DataFrame
df = pd.read_excel('/content/enriched_franchisees_gemini.xlsx')
#fill NA
df.fillna('N/A', inplace=True)
# Save the DataFrame back to the same Excel file
df.to_excel('/content/enriched_franchisees_gemini.xlsx', index=False)

  df.fillna('N/A', inplace=True)


## NOT WORKING - TRIED WITH DASK PARELLISM


In [None]:
import os
import re
import json
import pandas as pd
import time
from dask import delayed, compute
from langchain_community.utilities import GoogleSerperAPIWrapper
import google.generativeai as genai

# Setup
os.environ["GOOGLE_API_KEY"] = os.getenv("GOOGLE_API_KEY", "AIzaSyB_zRgfdmObMachU5qkrNM-dAoG9Ae4Qgk")
os.environ["SERPER_API_KEY"] = os.getenv("SERPER_API_KEY", "4d17b38723a34498589c3edd102b7b4b175b65f3")
genai.configure(api_key=os.environ["GOOGLE_API_KEY"])

search = GoogleSerperAPIWrapper()
gemini_model = genai.GenerativeModel('gemini-1.5-flash')

# Utils
def extract_json(text):
    cleaned = re.sub(r"^```json\s*|```$", "", text.strip(), flags=re.MULTILINE)
    try:
        return json.loads(cleaned)
    except json.JSONDecodeError:
        return {}

def format_name(name):
    name = re.sub(r'\s+', ' ', str(name).replace('\n', ' ')).strip().strip('"').strip("'")
    corp_terms = ['llc', 'inc', 'corp', 'ltd', 'company']
    if any(term in name.lower() for term in corp_terms):
        return name
    if ',' in name and name.count(',') == 1:
        last, first = name.split(',', 1)
        name = f"{first.strip()} {last.strip()}"
    return name

def clean_dataframe(df):
    df = df.copy()
    df['Franchisee'] = df['Franchisee'].apply(format_name)
    df['State'] = df['City'].str.strip() + ', ' + df['State'].str.strip()
    if 'FDD' in df.columns:
        df['Franchise Name'] = df['FDD']
    return df.dropna(axis=1, how='all')

def classify_franchisees(names):
    prompt = f"""Classify the following names as 'Individual' or 'Corporate':\n{names}\n\nRules:\n- Individual = Person name (John Doe)\n- Corporate = Includes LLC, Inc, Ltd, Corp, Company, etc.\n\nReturn only valid JSON, in this format:\n[{{"name": "Name", "type": "Individual/Corporate"}}]\nNo explanation. No markdown. If unknown, return []."""
    response = gemini_model.generate_content(prompt)
    return pd.DataFrame(extract_json(response.text.strip()))

def search_web(query: str):
    return search.results(query)

def safe_enrich(func, *args, retries=5, delay=5, **kwargs):
    for attempt in range(retries):
        try:
            return func(*args, **kwargs)
        except Exception as e:
            if "429" in str(e) or "TooManyRequests" in str(e):
                print(f"[429] Rate limit hit. Retrying in {delay} seconds...")
                time.sleep(delay)
                delay *= 2
            else:
                print(f"[ERROR] {e}. Retrying in {delay} seconds...")
                time.sleep(delay)
    return {}

def enrich_individual(name, franchise, state):
    query1 = f"What company does {name} own that holds a {franchise} franchise in {state}?"
    snippet = search_web(query1)
    prompt1 = f"From this text, return company name owned by {name} for {franchise}. If not found, return '{name}'.\n{snippet}"
    legal_name = gemini_model.generate_content(prompt1).text.strip()

    query2 = f"Details of {legal_name} in {state}, include address, phone, email."
    snippet2 = search_web(query2)
    extract_prompt = f"""From this data, extract:
{{
  "legal_corporate_name": "{legal_name}",
  "corporate_address": "",
  "corporate_phone": "",
  "corporate_email": "",
  "owner_name": "{name}",
  "linkedin_url": "",
  "Source URLs used for enrichment": ""
}}
Return JSON only. No markdown or text.
{snippet2}"""
    result = gemini_model.generate_content(extract_prompt).text.strip()
    return extract_json(result)

def enrich_corporate(name, state):
    query = f"Who owns or manages {name} in {state}?"
    snippet = search_web(query)
    extract_prompt = f"""From this data, extract:
{{
  "legal_corporate_name": "{name}",
  "corporate_address": "",
  "corporate_phone": "",
  "corporate_email": "",
  "owner_name": "",
  "linkedin_url": "",
  "Source URLs used for enrichment": ""
}}
Return JSON only. No markdown or text.
{snippet}"""
    result = gemini_model.generate_content(extract_prompt).text.strip()
    return extract_json(result)

@delayed
def enrich_row(row_dict):
    name = row_dict["Franchisee"]
    state = row_dict["State"]
    franchise = row_dict.get("Franchise Name", "")

    try:
        entity_type_df = classify_franchisees([name])
        entity_type = entity_type_df['type'].values[0] if not entity_type_df.empty else "Unknown"

        if entity_type == "Individual":
            enriched = safe_enrich(enrich_individual, name, franchise, state)
        elif entity_type == "Corporate":
            enriched = safe_enrich(enrich_corporate, name, state)
        else:
            enriched = {}
    except Exception:
        enriched = {}

    defaults = {
        "legal_corporate_name": name,
        "corporate_address": "N/A",
        "corporate_phone": "N/A",
        "corporate_email": "N/A",
        "owner_name": name,
        "linkedin_url": "N/A",
        "Source URLs used for enrichment": "N/A",
        "Type": entity_type if 'entity_type' in locals() else "Unknown"
    }
    enriched = {**defaults, **enriched}
    return {**row_dict, **enriched}

def run_dask_enrichment(df):
    df = clean_dataframe(df)
    tasks = [enrich_row(row._asdict()) for row in df.itertuples(index=False)]
    results = compute(*tasks, scheduler='threads', num_workers=3)
    return pd.DataFrame(results)

if __name__ == "__main__":
    input_df = pd.read_excel("/content/Gra_1_50.xlsx")
    enriched_df = run_dask_enrichment(input_df)
    enriched_df.to_excel("enriched_franchisees_dask.xlsx", index=False)
    print("Saved to enriched_franchisees_dask.xlsx")

Saved to enriched_franchisees_dask.xlsx
