*This notebook is optional. We were testing the creation of a variable which assesses whether a certain cluster is residential or not. It turns out to be too expensive and to add very little value.*

# Filter dataset (first 5 clusters for each mobile)

In [None]:
import pandas as pd
cluster_quarterly_metrics = pd.read_csv("../datasets/cluster_quarterly_metrics.csv")


filtered_data = (
    cluster_quarterly_metrics
    .sort_values(['caid', 'quarter', 'total_pings'], ascending=[True, True, False])
    .groupby(['caid', 'quarter'])
    .head(5)
    .copy()
)

filtered_data.shape



# Get geocoding from geocoding API. 

In [6]:
"""
For each cluster, we have the latitude and longitude. However, we don't have the address name (e.g. 454 Ruthven Avenue). We get this info thanks to Google's geocoding API.
"""

import requests
from tqdm import tqdm
import os
import time
import random
from dotenv import load_dotenv

# Load API key
load_dotenv()
api_key = os.getenv('GOOGLE_MAPS_API_KEY')

# Initialize session for faster requests
session = requests.Session()

# Function to retrieve address from coordinates
def get_address_from_coords(lat, lng, api_key, max_retries=5):
    url = f"https://maps.googleapis.com/maps/api/geocode/json?latlng={lat},{lng}&key={api_key}"
    retries = 0
    while retries < max_retries:
        try:
            response = session.get(url, timeout=10)
            if response.status_code == 200:
                result = response.json()
                if result['status'] == 'OK':
                    return result['results'][0]['formatted_address']
                elif result['status'] == 'OVER_QUERY_LIMIT':
                    delay = 5 + random.uniform(1, 3)  # Controlled delay for retry
                    print(f"⚠️ Rate limit hit. Retrying in {delay:.2f} seconds...")
                    time.sleep(delay)
                    retries += 1
                else:
                    return f"Error: {result['status']}"
            else:
                return "Failed to connect to API"
        except Exception as e:
            return f"Error: {str(e)}"
    return "Failed after max retries"

# Optimized function with controlled delays
def fetch_addresses_with_throttling(data, batch_size=10, delay_between_batches=1):
    addresses = []

    for i in tqdm(range(0, len(data), batch_size), desc="Fetching Addresses"):
        batch = data.iloc[i:i + batch_size]
        for index, row in batch.iterrows():
            address = get_address_from_coords(row['centroid_latitude'], row['centroid_longitude'], api_key = "AIzaSyDBASVfTsyh5kjcFsXSQANUEsu2fPgyoDg")
            addresses.append(address)

        # Introduce a short delay after each batch
        time.sleep(delay_between_batches + random.uniform(0.5, 1.5))  # Slight randomness for safety

    return addresses






In [None]:
# Add address column using throttled fetching
filtered_data['address'] = fetch_addresses_with_throttling(filtered_data, batch_size=10, delay_between_batches=1)

# Save filtered data to CSV
filtered_data.to_csv('../datasets/filtered_data_with_addresses.csv', index=False)

print("✅ Descriptions successfully extracted and saved to 'filtered_data_with_addresses.csv'.")

In [None]:
import pandas as pd
filtered_data = pd.read_csv('../datasets/filtered_data_with_addresses.csv')
filtered_data.head()

# Method 1 : Google Search API

# Scrape descriptions (Google Search API)

In [None]:
"""
Now that we have the address (e.g. 454 Ruthven Avenue), we can search for it on Google.
"""

import requests
import pandas as pd
from tqdm import tqdm
from concurrent.futures import ThreadPoolExecutor, as_completed
import time
import random
import os
import threading

# Google Custom Search API credentials
api_key = 'AIzaSyBe4OdpVVJsbmsDDL9B89NkPFe1DY8bVmo'
cx = '83a75dd1ff5f3496b'

# File paths
file_path = '../datasets/filtered_data_with_google_results.csv'
source_path = '../datasets/filtered_data_with_addresses.csv'

# Load dataset
if os.path.exists(file_path):
    filtered_data = pd.read_csv(file_path)
    if 'search_results' not in filtered_data.columns:
        filtered_data['search_results'] = ""
else:
    filtered_data = pd.read_csv(source_path)
    filtered_data['search_results'] = ""

# Rate limit tracker
lock = threading.Lock()
timestamps = []

def rate_limited_request():
    with lock:
        now = time.time()
        timestamps.append(now)

        # Remove timestamps older than 60 seconds
        one_minute_ago = now - 60
        while timestamps and timestamps[0] < one_minute_ago:
            timestamps.pop(0)

        # If we've hit the limit, sleep until allowed
        if len(timestamps) >= 100:
            wait_time = 60 - (now - timestamps[0]) + random.uniform(0.1, 0.5)
            print(f"⏳ Global rate limit hit. Sleeping for {wait_time:.2f}s...")
            time.sleep(wait_time)

# Function to fetch Google search results
def fetch_address_info(index, address, max_retries=3):
    query = address
    url = f"https://www.googleapis.com/customsearch/v1?key={api_key}&cx={cx}&q={query}"

    for attempt in range(max_retries):
        try:
            rate_limited_request()
            response = requests.get(url, timeout=10)

            if response.status_code == 200:
                data = response.json()
                all_results = ""
                if 'items' in data:
                    for item in data['items']:
                        title = item.get('title', '')
                        link = item.get('link', '')
                        snippet = item.get('snippet', '')
                        all_results += f"Title: {title}\nLink: {link}\nDescription: {snippet}\n-----\n"
                else:
                    all_results = "No results found."
                return index, all_results

            elif response.status_code == 429:
                wait_time = 5 + random.uniform(1, 3)
                print(f"⏳ 429 Rate limit from API at index {index}, sleeping {wait_time:.2f}s...")
                time.sleep(wait_time)
            else:
                return index, f"Error: {response.status_code}, {response.text}"

        except Exception as e:
            print(f"❌ Exception for index {index}: {e}")
            time.sleep(2 + random.uniform(0.5, 1.5))

    return index, "❌ Failed after retries"

# Filter rows that need processing
pending_rows = filtered_data[filtered_data['search_results'].isna() | (filtered_data['search_results'].str.strip() == "")]

# Multithreaded execution
batch_size = 100
max_workers = 5
results_buffer = []

with ThreadPoolExecutor(max_workers=max_workers) as executor:
    futures = [
        executor.submit(fetch_address_info, idx, row['address'])
        for idx, row in pending_rows.iterrows()
    ]

    for future in tqdm(as_completed(futures), total=len(futures), desc="Fetching Google Results"):
        idx, result = future.result()
        filtered_data.at[idx, 'search_results'] = result
        results_buffer.append(idx)

        if len(results_buffer) >= batch_size:
            filtered_data.to_csv(file_path, index=False)
            print(f"💾 Saved intermediate batch of {batch_size} results.")
            results_buffer = []

# Final save
filtered_data.to_csv(file_path, index=False)
print(f"✅ Done. Results saved to {file_path}")

print(f"✅ Done. Results saved to {file_path}")



In [None]:
filtered_data.to_excel('../datasets/filtered_data_with_google_results.xlsx', index=False)

# Method 2 : Tavily API

# Scrape descriptions (Tavily)

In [None]:
""" 
Similarly, we look for info about addresses with Tavily API, rather than with a Google Search.
"""

from tavily import TavilyClient
import pandas as pd
from tqdm import tqdm
import time
import os

# Initialize Tavily Client
client = TavilyClient("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")

file_path = '../datasets/filtered_data_with_tavily_results.csv'
if os.path.exists(file_path):
    filtered_data = pd.read_csv(file_path)
    if 'search_results' not in filtered_data.columns:
        filtered_data['search_results'] = ""
else:
    filtered_data = pd.read_csv('../datasets/filtered_data_with_addresses.csv')  # Original dataset
    filtered_data['search_results'] = ""

# Function to search address directly and fetch content
def fetch_address_info(address):
    try:
        response = client.search(query=address, max_results=5)
        content = " | ".join([result['content'] for result in response['results']])
        return content if content else "No content found"
    except Exception as e:
        print(f"❌ Error processing address '{address}': {e}")
        return "Error"

# Iterate and save progress on each run
for idx, row in tqdm(filtered_data.iterrows(), total=filtered_data.shape[0], desc="Fetching Tavily Results"):
    if pd.notna(row['search_results']) and row['search_results'].strip() != "":
        continue  # Skip already processed

    address = row['address']
    result = fetch_address_info(address)
    filtered_data.at[idx, 'search_results'] = result

    # Save progress after each request
    filtered_data.to_csv(file_path, index=False)

    # Respect the 100 RPM rate limit
    time.sleep(0.6)

print(f"✅ Tavily search results completed and saved to '{file_path}'.")


# Determine whether the address is residential

## We use an LLM to determine whether an address is residential, based on the description we retrieved from Google (or Tavily)

In [None]:
filtered_data = pd.read_csv('../datasets/filtered_data_with_google_results.csv')

In [None]:
import pandas as pd
import os
from dotenv import load_dotenv
from tqdm import tqdm
from concurrent.futures import ThreadPoolExecutor, as_completed

from langchain_core.output_parsers import PydanticOutputParser
from langchain_core.prompts import PromptTemplate
from langchain_google_genai import ChatGoogleGenerativeAI
from pydantic import BaseModel, Field, ValidationError

# Load API key
load_dotenv()
os.environ["GOOGLE_API_KEY"] = os.getenv("GEMINI_API_KEY")

# Initialize Gemini Flash model
model = ChatGoogleGenerativeAI(
    model="gemini-2.0-flash",
    temperature=0.0,
    google_api_key=os.environ["GOOGLE_API_KEY"]
)

# Define Pydantic model for structured response
class Classification(BaseModel):
    is_residential: int = Field(description="1 for residential, 0 for non-residential")

# LangChain parser
parser = PydanticOutputParser(pydantic_object=Classification)

# Prompt template
prompt = PromptTemplate(
    template="""
You are given multiple web search results about a single physical address:
{search_results}

Based on this information, determine whether the location is **residential** (e.g., a home or apartment) or **non-residential** (e.g., business, school, hospital, etc.). Do not consider special events as being non-residential.

{format_instructions}
""",
    input_variables=["search_results"],
    partial_variables={"format_instructions": parser.get_format_instructions()},
)

# Create chain: prompt → model
chain = prompt | model

# Classification function
def classify_address(search_results, address):
    if pd.isna(search_results) or search_results.strip() == "No results found":
        return "Unknown"
    
    try:
        output = chain.invoke({"search_results": search_results})
        result = parser.invoke(output)
        return result.is_residential
    except ValidationError as ve:
        print(f"⚠️ Validation error for address '{address}': {ve}")
        return "Error"
    except Exception as e:
        print(f"⚠️ Error classifying address '{address}': {e}")
        return "Error"

# Multi-threaded classification
def classify_addresses_concurrently(data, max_threads=10):
    results = ["Unknown"] * len(data)

    with ThreadPoolExecutor(max_workers=max_threads) as executor:
        futures = {
            executor.submit(classify_address, row['search_results'], row['address']): idx
            for idx, row in data.iterrows()
        }

        for future in tqdm(as_completed(futures), total=len(futures), desc="Classifying Addresses"):
            idx = futures[future]
            try:
                results[idx] = future.result()
            except Exception as e:
                print(f"❌ Thread error at index {idx}: {e}")
                results[idx] = "Error"

    return results


# Run classification
filtered_data["is_residential"] = classify_addresses_concurrently(filtered_data)

# Save output
filtered_data.to_csv("../datasets/filtered_data_with_residential_classification.csv", index=False)
print("✅ Saved to 'filtered_data_with_residential_classification.csv'")


# Determine whether it is the main address : All variables

## Now that we have indicators, and we know whether an address is residential, we can assess whether an address is the main address.

In [None]:
import pandas as pd
import os
from dotenv import load_dotenv
from tqdm import tqdm
from concurrent.futures import ThreadPoolExecutor, as_completed

from langchain_core.output_parsers import PydanticOutputParser
from langchain_core.prompts import PromptTemplate
from langchain_openai import ChatOpenAI
from pydantic import BaseModel, Field, ValidationError

# ----------------------------
# Load API key
# ----------------------------
load_dotenv()
os.environ["OPENAI_API_KEY"] = os.getenv("OPENAI_API_KEY")

# ----------------------------
# Init LLM
# ----------------------------
model = ChatOpenAI(
    model_name="gpt-4o",
    temperature=0.0,
    api_key=os.environ["OPENAI_API_KEY"]
)

# ----------------------------
# Output schema
# ----------------------------
class MainClusterSelection(BaseModel):
    main_cluster: int = Field(description="Cluster number selected as main address")
    justification: str = Field(description="Why this cluster was selected")

parser = PydanticOutputParser(pydantic_object=MainClusterSelection)

# ----------------------------
# Prompt Template
# ----------------------------
prompt = PromptTemplate(
    template="""
Each user (identified by a CAID) has multiple location clusters detected from mobile signals. Each cluster represents a place where the user spent time during Q2.

Your task is to select the **main home address cluster** for the user based on the features below. We would like to differentiate from all other addresses, especially from work address.

A main address is usually:
- High **night** or **evening** consistency
- High unique hours, meaning that the user is active at different times of the day
- High consecutive hours, meaning that the user is active for a long time
- Long stays and broad time window coverage
- High dominance score 
- High total pings and diverse hourly activity (entropy)

### Column Descriptions

cluster: Cluster index for this user  
is_residential: 1 for residential, 0 for non-residential  
night_consistency_score: % of nights this cluster was seen (NaN if no night pings for the user)  
evening_consistency_score: % of evenings this cluster was seen (NaN if no evening pings for the user)  
day_consistency_score: % of days this cluster was seen (NaN if no day pings for the user)  
dominance_score: % of device pings in this cluster  
total_pings: Total number of pings in this cluster  
unique_hours: Number of unique hourly bins this cluster was active  
hour_entropy: Entropy of hourly activity (NaN if too few pings)  
max_consecutive_hours: Longest continuous stay in hours  
time_window_coverage: Fraction of [day, evening, night] time windows with activity

---

### Cluster Candidates

{cluster_table}

Choose the main_cluster and explain why.

{format_instructions}
""",
    input_variables=["cluster_table"],
    partial_variables={"format_instructions": parser.get_format_instructions()}
)

chain = prompt | model

# ----------------------------
# Format one user's cluster group
# ----------------------------
def format_cluster_group(group):
    rows = []
    for _, row in group.iterrows():
        rows.append(
            f"cluster: {row['cluster']}, is_residential: {row['is_residential']}, "
            f"night_consistency_score: {row['night_consistency_score']}, "
            f"evening_consistency_score: {row['evening_consistency_score']}, "
            f"day_consistency_score: {row['day_consistency_score']}, "
            f"dominance_score: {row['dominance_score']}, "
            f"total_pings: {row['total_pings']}, "
            f"unique_hours: {row['unique_hours']}, "
            f"hour_entropy: {row['hour_entropy']}, "
            f"max_consecutive_hours: {row['max_consecutive_hours']}, "
            f"time_window_coverage: {row['time_window_coverage']}"
        )
    return "\n".join(rows)

# ----------------------------
# Decide main cluster for one user
# ----------------------------
def decide_main_cluster(caid, group):
    try:
        table = format_cluster_group(group)
        output = chain.invoke({"cluster_table": table})
        parsed = parser.invoke(output)
        return {
            "caid": caid,
            "quarter": group.iloc[0]["quarter"],
            "cluster": parsed.main_cluster,
            "is_main_address": 1,
            "main_address_justification": parsed.justification
        }
    except ValidationError as ve:
        print(f"❌ Validation error for caid {caid}: {ve}")
        return None
    except Exception as e:
        print(f"❌ Error for caid {caid}: {e}")
        return None

# ----------------------------
# Load dataset
# ----------------------------
df = pd.read_csv("../datasets/filtered_data_with_residential_classification.csv")

# ----------------------------
# Multi-threaded execution
# ----------------------------
results = []
caid_groups = list(df.groupby("caid"))

with ThreadPoolExecutor(max_workers=10) as executor:
    futures = {
        executor.submit(decide_main_cluster, caid, group): caid
        for caid, group in caid_groups
    }

    for future in tqdm(as_completed(futures), total=len(futures), desc="Classifying main clusters"):
        result = future.result()
        if result:
            results.append(result)

# ----------------------------
# Merge and save
# ----------------------------
df_main = pd.DataFrame(results)

df = df.merge(df_main, on=["caid", "quarter", "cluster"], how="left")
df["is_main_address"] = df["is_main_address"].fillna(0).astype(int)
df["main_address_justification"] = df["main_address_justification"].fillna("")

df.to_csv("../datasets/filtered_data_with_main_address_per_user.csv", index=False)
print("✅ Saved to 'filtered_data_with_main_address_per_user.csv'")



In [16]:
import openpyxl
df.to_excel("../datasets/filtered_data_with_main_address_per_user.xlsx")

# Determine whether it is the main address : All variables except is_residential

In [None]:
import pandas as pd
import os
from dotenv import load_dotenv
from tqdm import tqdm
from concurrent.futures import ThreadPoolExecutor, as_completed

from langchain_core.output_parsers import PydanticOutputParser
from langchain_core.prompts import PromptTemplate
from langchain_openai import ChatOpenAI
from pydantic import BaseModel, Field, ValidationError

# ----------------------------
# Load API key
# ----------------------------
load_dotenv()
os.environ["OPENAI_API_KEY"] = os.getenv("OPENAI_API_KEY")

# ----------------------------
# Init LLM
# ----------------------------
model = ChatOpenAI(
    model_name="gpt-4o",
    temperature=0.0,
    api_key=os.environ["OPENAI_API_KEY"]
)

# ----------------------------
# Output schema
# ----------------------------
class MainClusterSelection(BaseModel):
    main_cluster: int = Field(description="Cluster number selected as main address")
    justification: str = Field(description="Why this cluster was selected")

parser = PydanticOutputParser(pydantic_object=MainClusterSelection)

# ----------------------------
# Prompt Template
# ----------------------------
prompt = PromptTemplate(
    template="""
Each user (identified by a CAID) has multiple location clusters detected from mobile signals. Each cluster represents a place where the user spent time during Q2.

Your task is to select the **main home address cluster** for the user based on the features below. We would like to differentiate from all other addresses, especially from work address.

A main address is usually:
- High **night** or **evening** consistency
- High unique hours, meaning that the user is active at different times of the day
- High consecutive hours, meaning that the user is active for a long time
- Long stays and broad time window coverage
- High dominance score
- High total pings and diverse hourly activity (entropy)

### Column Descriptions

cluster: Cluster index for this user  
night_consistency_score: % of nights this cluster was seen (NaN if no night pings for the user)  
evening_consistency_score: % of evenings this cluster was seen (NaN if no evening pings for the user)  
day_consistency_score: % of days this cluster was seen (NaN if no day pings for the user)  
dominance_score: % of device pings in this cluster  
total_pings: Total number of pings in this cluster  
unique_hours: Number of unique hourly bins this cluster was active  
hour_entropy: Entropy of hourly activity (NaN if too few pings)  
max_consecutive_hours: Longest continuous stay in hours  
time_window_coverage: Fraction of [day, evening, night] time windows with activity

---

### Cluster Candidates

{cluster_table}

Choose the main_cluster and explain why.

{format_instructions}
""",
    input_variables=["cluster_table"],
    partial_variables={"format_instructions": parser.get_format_instructions()}
)

chain = prompt | model

# ----------------------------
# Format a CAID group
# ----------------------------
def format_cluster_group(group):
    return "\n".join(
        f"cluster: {row['cluster']}, night_consistency_score: {row['night_consistency_score']}, "
        f"evening_consistency_score: {row['evening_consistency_score']}, day_consistency_score: {row['day_consistency_score']}, "
        f"dominance_score: {row['dominance_score']}, total_pings: {row['total_pings']}, "
        f"unique_hours: {row['unique_hours']}, hour_entropy: {row['hour_entropy']}, "
        f"max_consecutive_hours: {row['max_consecutive_hours']}, time_window_coverage: {row['time_window_coverage']}"
        for _, row in group.iterrows()
    )

# ----------------------------
# Decide main cluster (thread target)
# ----------------------------
def process_caid(caid, group):
    try:
        cluster_table = format_cluster_group(group)
        output = chain.invoke({"cluster_table": cluster_table})
        parsed = parser.invoke(output)
        return {
            "caid": caid,
            "quarter": group.iloc[0]["quarter"],
            "cluster": parsed.main_cluster,
            "is_main_address_no_residential": 1,
            "main_address_justification_no_residential": parsed.justification
        }
    except ValidationError as ve:
        print(f"❌ Validation error for caid {caid}: {ve}")
        return None
    except Exception as e:
        print(f"❌ Error for caid {caid}: {e}")
        return None

# ----------------------------
# Load dataset
# ----------------------------
df = pd.read_csv("../datasets/filtered_data_with_main_address_per_user.csv")

# ----------------------------
# Threaded execution
# ----------------------------
results = []
futures = []
max_workers = 20 # Adjust based on your API limits

with ThreadPoolExecutor(max_workers=max_workers) as executor:
    for caid, group in df.groupby("caid"):
        futures.append(executor.submit(process_caid, caid, group))

    for future in tqdm(as_completed(futures), total=len(futures), desc="Running threaded LLM"):
        result = future.result()
        if result:
            results.append(result)





In [None]:
# ----------------------------
# Merge results and save
# ----------------------------
df_new = pd.DataFrame(results)
df = df.merge(df_new, on=["caid", "quarter", "cluster"], how="left")
df["is_main_address_no_residential"] = df["is_main_address_no_residential"].fillna(0).astype(int)
df["main_address_justification_no_residential"] = df["main_address_justification_no_residential"].fillna("")
df.to_csv("../datasets/filtered_data_with_main_address_per_user.csv", index=False)

print("✅ Threaded LLM results appended to filtered_data_with_main_address_per_user.csv")

In [17]:
df.to_excel("../datasets/filtered_data_with_main_address_per_user.xlsx")

# Effect of is_residential : what clusters got different assignment?

In [63]:
import pandas as pd

# Load dataset with both decisions
df = pd.read_csv("../datasets/filtered_data_with_main_address_per_user.csv")

# Step 1: Get main cluster selections
main_with = df[df["is_main_address"] == 1][["caid", "quarter", "cluster"]].rename(columns={"cluster": "main_with_res"})
main_without = df[df["is_main_address_no_residential"] == 1][["caid", "quarter", "cluster"]].rename(columns={"cluster": "main_without_res"})

# Step 2: Merge and find CAIDs with differing main clusters
comparison = pd.merge(main_with, main_without, on=["caid", "quarter"], how="inner")
diff_caids = comparison[comparison["main_with_res"] != comparison["main_without_res"]][["caid", "quarter"]]

# Step 3: Get all clusters for those CAIDs
df_diff = df.merge(diff_caids, on=["caid", "quarter"], how="inner")

# Step 4: Export to Excel
output_path = "../datasets/main_address_differences_all_clusters.xlsx"
df_diff.to_excel(output_path, index=False)

# Optional: Display
pd.set_option("display.max_columns", None)
pd.set_option("display.max_colwidth", None)
print("✅ Saved all clusters for CAIDs with disagreement to:")
print(output_path)




✅ Saved all clusters for CAIDs with disagreement to:
../datasets/main_address_differences_all_clusters.xlsx


In [64]:
print(f"🔢 Number of CAIDs with differing main clusters: {diff_caids['caid'].nunique()}")


🔢 Number of CAIDs with differing main clusters: 16
