# Import necessary libraries

In [1]:
import pandas as pd
import numpy as np
from config import call_data_path, signup_data_path, message_data_path, search_data_path   
pd.set_option('display.width', None)
pd.set_option('display.max_colwidth', None) 
pd.set_option('display.max_rows', 500)


# 1. Reading Datasets

In [2]:
message_data = pd.read_csv(message_data_path, sep = '\t')
signup_data = pd.read_csv(signup_data_path, sep = '\t')
call_data = pd.read_csv(call_data_path, sep = '\t')
search_data = pd.read_csv(search_data_path, sep = '\t')
print(message_data.columns , signup_data.columns, call_data.columns, search_data.columns)
print(message_data.shape , signup_data.shape, call_data.shape, search_data.shape)

Index(['user_id', 'message_ts'], dtype='object') Index(['user_id', 'country_code', 'signup_ts'], dtype='object') Index(['user_id', 'call_ts'], dtype='object') Index(['user_id', 'search_ts'], dtype='object')
(105785, 2) (26000, 3) (1223795, 2) (867131, 2)


In [3]:
# Sanity check for user IDs
print(len(set(message_data['user_id'].unique())-set(signup_data['user_id'].unique())) ,
len(set(call_data['user_id'].unique())-set(signup_data['user_id'].unique())),
len(set(search_data['user_id'].unique())-set(signup_data['user_id'].unique())))

0 0 0


In [4]:
# Remove duplicates and check shapes and unique user counts
message_data.drop_duplicates(inplace=True)
signup_data.drop_duplicates(inplace=True)
call_data.drop_duplicates(inplace=True)
search_data.drop_duplicates(inplace=True)
print(message_data.shape , signup_data.shape, call_data.shape, search_data.shape)
print(message_data['user_id'].nunique() , signup_data['user_id'].nunique(), call_data['user_id'].nunique(), search_data['user_id'].nunique())

(105763, 2) (26000, 3) (1223794, 2) (867108, 2)
8452 26000 24439 26000


# 2. Data Preprocessing

In [5]:
def preprocess(df, ts_col):
    '''
    Converts timestamp column to datetime (with seconds only, no milliseconds),
    adds 00:00:00 if only date is present, and removes timezone.
    '''
    df = df.copy()

    # Try parsing with ms as numeric timestamp
    try:
        df[ts_col] = pd.to_datetime(df[ts_col], unit='ms', utc=True, errors='raise')
    except:
        df[ts_col] = pd.to_datetime(df[ts_col], utc=True, errors='coerce')

    # Floor to second (remove milliseconds)
    df[ts_col] = df[ts_col].dt.floor('s')

    # Remove timezone information
    df[ts_col] = df[ts_col].dt.tz_convert(None)

    return df.dropna(subset=[ts_col])

signup_data = preprocess(signup_data, 'signup_ts')
call_data = preprocess(call_data, 'call_ts')
search_data = preprocess(search_data, 'search_ts')
message_data = preprocess(message_data, 'message_ts')


In [6]:
signup_data['signup_ts'] = signup_data['signup_ts'].dt.strftime('%Y-%m-%d %H:%M:%S')

# 3. Understand Data behaviour and pattern

# 3.1. User Behaviour Check

In [7]:
# Standardize timestamp column names to a common 'timestamp' for easier merging and analysis
def prepare_combined_data(signup_data, call_data, message_data, search_data):
    signup_data = signup_data.rename(columns={'signup_ts': 'timestamp'}).copy()  # Rename for consistency
    call_data = call_data.rename(columns={'call_ts': 'timestamp'}).copy()
    message_data = message_data.rename(columns={'message_ts': 'timestamp'}).copy()
    search_data = search_data.rename(columns={'search_ts': 'timestamp'}).copy()

    # Add event type for tracking source of the action
    signup_data['event_type'] = 'signup'
    call_data['event_type'] = 'call'
    message_data['event_type'] = 'message'
    search_data['event_type'] = 'search'

    # Combine all event types into a single DataFrame for unified analysis
    all_data = pd.concat([signup_data, call_data, message_data, search_data], ignore_index=True)

    # Convert timestamps to datetime objects for sorting and calculations
    all_data['timestamp'] = pd.to_datetime(all_data['timestamp'])

    # Sort events chronologically within each user
    all_data.sort_values(by=['user_id', 'timestamp'], inplace=True)

    return all_data

# Analyze user behavior for event patterns and detect potential suspicious activity
def analyze_user_behavior(all_data):
    # Compute time difference to next event for each user
    all_data['next_timestamp'] = all_data.groupby('user_id')['timestamp'].shift(-1)
    all_data['time_diff_sec'] = (all_data['next_timestamp'] - all_data['timestamp']).dt.total_seconds()

    # Mark rapid actions where time difference is less than 5 seconds
    all_data['rapid_action_flag'] = all_data['time_diff_sec'].apply(lambda x: 1 if pd.notnull(x) and x < 5 else 0)

    # Aggregate user-level behavior metrics
    user_summary = all_data.groupby('user_id').agg(
        total_actions=('event_type', 'count'),  # Total number of actions
        num_calls=('event_type', lambda x: (x == 'call').sum()),  # Number of calls
        num_messages=('event_type', lambda x: (x == 'message').sum()),  # Number of messages
        num_searches=('event_type', lambda x: (x == 'search').sum()),  # Number of searches
        num_rapid_actions=('rapid_action_flag', 'sum'),  # Count of rapid actions
        avg_time_diff=('time_diff_sec', 'mean'),  # Average time gap between actions
        median_time_diff=('time_diff_sec', 'median')  # Median time gap between actions
    ).reset_index()

    # Flag users as suspect if they have too many rapid actions or very low average time gaps
    user_summary['is_suspect'] = user_summary.apply(
        lambda row: 1 if row['num_rapid_actions'] > 5 or (pd.notnull(row['avg_time_diff']) and row['avg_time_diff'] < 2) else 0,
        axis=1
    )

    return all_data, user_summary


In [8]:
signup_data['country_code'].value_counts(normalize=True).multiply(100)


country_code
IT    43.169231
GB    20.876923
SE    18.153846
ES    11.238462
FR     6.561538
Name: proportion, dtype: float64

#### Majority of users (~43%) are from Italy, indicating strongest traction there.UK and Sweden follow, contributing ~21% and ~18% respectively — worth focusing marketing/sales efforts here. Spain and France show moderate engagement; potential for growth with localized strategies. 

In [9]:
signup_data['country_code'].nunique(), signup_data.shape, signup_data.columns

(5,
 (26000, 3),
 Index(['user_id', 'country_code', 'signup_ts'], dtype='object'))

In [10]:
combined_data = prepare_combined_data(signup_data, call_data, message_data, search_data)
event_log, behavior_summary = analyze_user_behavior(combined_data)

In [11]:
total_users = behavior_summary.shape[0]
search_users = (behavior_summary['num_searches'] > 0).sum()
message_users = (behavior_summary['num_messages'] > 0).sum()
call_users = (behavior_summary['num_calls'] > 0).sum()

funnel_df = pd.DataFrame({
    'Stage': ['Signed Up', 'Searched', 'Messaged', 'Called'],
    'Users': [total_users, search_users, message_users, call_users]
})
funnel_df

Unnamed: 0,Stage,Users
0,Signed Up,26000
1,Searched,26000
2,Messaged,8452
3,Called,24439


In [12]:
avg_events = behavior_summary[['num_searches', 'num_messages', 'num_calls']].mean()
avg_events    

num_searches    33.350308
num_messages     4.067808
num_calls       47.069000
dtype: float64

#### Out of 26,000 users who signed up, all engaged with the search feature, indicating it as the most commonly used entry point. However, only about 32.5% of users proceeded to send messages, suggesting that messaging is a more selective or intent-driven action. In contrast, around 94% of users made calls, highlighting calling as the dominant mode of user interaction and a likely primary channel for engagement or conversion.

In [13]:
event_trend = event_log.groupby(['event_type', event_log['timestamp'].dt.date]).size().unstack(0).fillna(0)
event_trend

event_type,call,message,search,signup
timestamp,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1
2019-03-02,1097.0,527.0,9953.0,580.0
2019-03-03,3229.0,720.0,7105.0,467.0
2019-03-04,5956.0,1406.0,13419.0,667.0
2019-03-05,9364.0,1339.0,13139.0,639.0
2019-03-06,10145.0,1752.0,16453.0,690.0
2019-03-07,11895.0,1713.0,19914.0,613.0
2019-03-08,13516.0,1699.0,19187.0,736.0
2019-03-09,13025.0,1717.0,13727.0,586.0
2019-03-10,10917.0,1872.0,14764.0,502.0
2019-03-11,16367.0,2443.0,25020.0,759.0


### User Activity Overview
#### The platform has around 26,000 unique users. Signups show steady daily growth (480–500), with Italy contributing the largest share, followed by the UK, Sweden, Spain, and France.

#### Calls increased consistently through March, peaking on April 1 before dropping sharply after April 4. Clear weekend dips are visible. About 94% of users placed at least one call.

#### Searches are highly frequent, with some users performing hundreds of searches a day, indicating likely bot or automated behavior. Search patterns also reflect weekend dips and align closely with call trends.

#### Messages are less frequent compared to calls and searches. Usage dropped significantly post-April 1, suggesting a shift in user behavior or platform functionality.

# 4. Pattern detect
Create a master file for the analysis

In [15]:
import pandas as pd
from concurrent.futures import ThreadPoolExecutor, as_completed
from tqdm import tqdm


def process_batch(batch_users, signup_data, call_data, search_data, message_data):
    """Process a batch of user_ids and return summary info."""

    # Filter batches
    signup_batch = signup_data[signup_data['user_id'].isin(batch_users)]
    call_batch = call_data[call_data['user_id'].isin(batch_users)]
    search_batch = search_data[search_data['user_id'].isin(batch_users)]
    message_batch = message_data[message_data['user_id'].isin(batch_users)]

    # Counts per user
    call_count = call_batch.groupby('user_id').size().rename("num_calls")
    search_count = search_batch.groupby('user_id').size().rename("num_searches")
    message_count = message_batch.groupby('user_id').size().rename("num_messages")

    # Last activity times
    last_call = call_batch.groupby('user_id')['call_ts'].max().rename("last_call_ts")
    last_search = search_batch.groupby('user_id')['search_ts'].max().rename("last_search_ts")
    last_message = message_batch.groupby('user_id')['message_ts'].max().rename("last_message_ts")

    # Determine latest activity time
    last_activity = pd.concat([last_call, last_search, last_message], axis=1)
    last_activity['last_action_ts'] = last_activity.max(axis=1)

    # Signup time
    signup_ts = signup_batch.groupby('user_id')['signup_ts'].min()

    # Tenure in days
    tenure = abs(((last_activity['last_action_ts'] - signup_ts).dt.days + 1)).rename("tenure_days")

    # Combine all features including last action timestamp
    batch_df = pd.concat([
        signup_ts.rename("signup_ts"),
        tenure,
        call_count,
        search_count,
        message_count,
        last_activity['last_action_ts']
    ], axis=1).reset_index()

    # Fill nulls and convert to int
    batch_df[['num_calls', 'num_searches', 'num_messages']] = batch_df[
        ['num_calls', 'num_searches', 'num_messages']
    ].fillna(0).astype(int)

    batch_df['tenure_days'] = batch_df['tenure_days'].fillna(0).astype(int)

    return batch_df


def process_user_batches_threaded(signup_data, call_data, search_data, message_data, batch_size=1000, num_threads=8):
    """Process user data in batches using ThreadPoolExecutor."""

    # Rename timestamp columns
    signup_data = signup_data.rename(columns={"signup_time": "signup_ts"})
    call_data = call_data.rename(columns={"call_time": "call_ts"})
    search_data = search_data.rename(columns={"search_time": "search_ts"})
    message_data = message_data.rename(columns={"message_time": "message_ts"})

    # Convert to datetime
    signup_data['signup_ts'] = pd.to_datetime(signup_data['signup_ts'])
    call_data['call_ts'] = pd.to_datetime(call_data['call_ts'])
    search_data['search_ts'] = pd.to_datetime(search_data['search_ts'])
    message_data['message_ts'] = pd.to_datetime(message_data['message_ts'])

    # All unique user IDs
    all_user_ids = pd.concat([
        signup_data['user_id'],
        call_data['user_id'],
        search_data['user_id'],
        message_data['user_id']
    ]).dropna().unique()

    # Create batches of unique user_ids
    user_batches = [all_user_ids[i:i + batch_size] for i in range(0, len(all_user_ids), batch_size)]

    # Process in parallel
    final_results = []
    with ThreadPoolExecutor(max_workers=num_threads) as executor:
        futures = [
            executor.submit(process_batch, batch, signup_data, call_data, search_data, message_data)
            for batch in user_batches
        ]

        for future in tqdm(as_completed(futures), total=len(futures), desc="Processing batches"):
            final_results.append(future.result())

    # Combine all batch results
    return pd.concat(final_results, ignore_index=True)


master_df = process_user_batches_threaded(
    signup_data,
    call_data,
    search_data,
    message_data,
    batch_size=1000,
    num_threads=8
)

# Add per-day usage metrics
master_df['tenure_days'] = master_df['tenure_days'].replace(0, 1)  # Avoid division by zero
master_df['search_per_day'] = round(abs(master_df['num_searches'] / master_df['tenure_days']), 2)
master_df['call_per_day'] = round(abs(master_df['num_calls'] / master_df['tenure_days']), 2)
master_df['message_per_day'] = round(abs(master_df['num_messages'] / master_df['tenure_days']), 2)
master_df.head()

Processing batches: 100%|██████████| 26/26 [00:00<00:00, 105.12it/s]


Unnamed: 0,user_id,signup_ts,tenure_days,num_calls,num_searches,num_messages,last_action_ts,search_per_day,call_per_day,message_per_day
0,10000000,2019-03-18,17,36,16,1,2019-04-03 22:05:00,0.94,2.12,0.06
1,10000001,2019-03-03,16,64,20,0,2019-03-18 16:54:08,1.25,4.0,0.0
2,10000002,2019-04-02,2,5,17,2,2019-04-03 17:28:01,8.5,2.5,1.0
3,10000003,2019-03-27,8,60,38,0,2019-04-03 21:33:40,4.75,7.5,0.0
4,10000004,2019-04-01,3,24,37,0,2019-04-03 18:09:55,12.33,8.0,0.0


# 4.1. identifies the top unique values for key activity metrics to better understand extreme user behaviors across calls, searches, and messages.

In [16]:
columns_to_extract = [
     'num_calls', 'num_searches', 'num_messages',
    'search_per_day', 'call_per_day', 'message_per_day'
]

top_values = {}

for col in columns_to_extract:
    # Drop NA, get unique values, sort descending, take top 10
    top_values[col] = (
        master_df[col]
        .dropna()
        .drop_duplicates()
        .sort_values(ascending=False)
        .head(20)
        .values
    )

# Convert to DataFrame
DAF = pd.DataFrame(top_values)

# Optional: Transpose for cleaner view
DAF = DAF.T
DAF.columns = [f'Top_{i+1}' for i in range(DAF.shape[1])]

DAF


Unnamed: 0,Top_1,Top_2,Top_3,Top_4,Top_5,Top_6,Top_7,Top_8,Top_9,Top_10,Top_11,Top_12,Top_13,Top_14,Top_15,Top_16,Top_17,Top_18,Top_19,Top_20
num_calls,1681.0,1415.0,1341.0,1086.0,1039.0,951.0,916.0,860.0,758.0,750.0,669.0,634.0,622.0,591.0,588.0,565.0,524.0,520.0,513.0,511.0
num_searches,5631.0,2229.0,1818.0,1782.0,1754.0,1508.0,1238.0,1222.0,1183.0,925.0,828.0,787.0,742.0,736.0,734.0,722.0,714.0,698.0,691.0,688.0
num_messages,1960.0,1744.0,1136.0,1084.0,898.0,849.0,774.0,737.0,735.0,734.0,718.0,654.0,645.0,548.0,484.0,481.0,440.0,437.0,436.0,413.0
search_per_day,938.5,501.0,361.0,360.0,321.0,317.0,314.0,300.0,292.0,273.0,268.0,186.5,157.0,156.5,153.25,151.5,148.5,147.5,146.5,143.5
call_per_day,840.5,707.5,513.0,274.0,249.0,219.0,196.5,116.0,113.5,110.0,91.6,91.0,84.0,83.0,79.5,78.0,77.0,72.0,66.33,65.0
message_per_day,249.14,119.67,118.0,110.0,84.0,82.0,81.75,81.56,78.0,77.18,75.0,70.0,67.75,65.0,62.8,59.39,56.0,55.29,54.5,52.5


### Flagged users with extreme search or call activity using top 20 thresholds.

In [18]:
# Get thresholds from top 10 values (excluding 'inf')
search_thresh = sorted(master_df['search_per_day'].replace([np.inf, -np.inf], np.nan).dropna().unique(), reverse=True)[:20][-1]
call_thresh = sorted(master_df['call_per_day'].replace([np.inf, -np.inf], np.nan).dropna().unique(), reverse=True)[:20][-1]

# Create scraper flag where either of these conditions are met
scraper_df1 = master_df[
    (master_df['search_per_day'] >= search_thresh) |
    (master_df['call_per_day'] >= call_thresh)
].copy()

# Optional: add flag
scraper_df1['is_scraper_extreme_activity'] = True

print(f"Potential scraper-like users: {len(scraper_df1)}")


Potential scraper-like users: 43


## 4.2. Anpomaly detection on top 2% user activity metrics (searches, calls, messages per day) to identify sudden behavioral spikes, helping flag potential scraper accounts with unusually high engagement.

In [20]:
import pandas as pd
import numpy as np

# Function to find first significant jump in top 2%
def find_jump_from_top_2_percent(column_values, column_name, jump_factor=10):
    cleaned_vals = column_values.dropna().sort_values().unique()

    if len(cleaned_vals) < 10:
        return column_name, None, None  # not enough data

    # Take top 2% tail values
    top_n = int(0.02 * len(cleaned_vals))
    tail_vals = cleaned_vals[-top_n:] if top_n > 1 else cleaned_vals[-10:]

    diffs = np.diff(tail_vals)
    median_diff = np.median(diffs)

    for i, d in enumerate(diffs):
        if d >= jump_factor * median_diff:
            return column_name, tail_vals[i], tail_vals[i + 1]

    return column_name, None, None

# Detect jump thresholds
def detect_jump_thresholds(df, columns):
    results = {}
    for col in columns:
        colname, prev, jump = find_jump_from_top_2_percent(df[col], col)
        if jump is not None:
            results[col] = (prev, jump)
    return results

# Define your columns of interest
columns_to_check = ['search_per_day', 'call_per_day', 'message_per_day']

# Detect thresholds
jump_thresholds = detect_jump_thresholds(master_df, columns_to_check)

# Print the results
print("Jump thresholds based on top 2% tail analysis:")
for col, (prev, jump) in jump_thresholds.items():
    print(f"{col}: jump from {prev} to {jump} (diff = {jump - prev})")

# Filter potential scrapers from master_df
condition = False
for col, (_, jump) in jump_thresholds.items():
    condition |= master_df[col] >= jump

scrapers_df2 = master_df[condition].copy()

print(f"\nScrapers detected: {len(scrapers_df2)} rows")
scrapers_df2


Jump thresholds based on top 2% tail analysis:
search_per_day: jump from 186.5 to 268.0 (diff = 81.5)
call_per_day: jump from 91.6 to 110.0 (diff = 18.400000000000006)
message_per_day: jump from 84.0 to 110.0 (diff = 26.0)

Scrapers detected: 25 rows


Unnamed: 0,user_id,signup_ts,tenure_days,num_calls,num_searches,num_messages,last_action_ts,search_per_day,call_per_day,message_per_day
550,10000550,2019-03-06,1,219,1,0,2019-03-06 17:18:36,1.0,219.0,0.0
1585,10001585,2019-04-01,1,1,317,1,2019-04-01 15:40:14,317.0,1.0,1.0
3165,10004165,2019-03-09,1,2,361,1,2019-03-09 23:49:15,361.0,2.0,1.0
3977,10004977,2019-03-13,1,0,39,118,2019-03-13 19:07:52,39.0,0.0,118.0
4387,10006387,2019-03-14,2,393,5,1,2019-03-15 08:36:33,2.5,196.5,0.5
7975,10007975,2019-04-02,2,227,35,0,2019-04-03 19:15:12,17.5,113.5,0.0
7992,10007992,2019-03-10,1,274,2,46,2019-03-10 20:48:18,2.0,274.0,46.0
8109,10008109,2019-03-26,1,1,300,0,2019-03-26 22:11:26,300.0,1.0,0.0
8746,10008746,2019-03-21,2,220,15,0,2019-03-22 12:25:58,7.5,110.0,0.0
9592,10009592,2019-03-30,1,1,292,0,2019-03-30 17:38:21,292.0,1.0,0.0


In [21]:
# Summary statistics for master_df
master_df.describe([0.25, 0.5, 0.75, 0.80, 0.85, 0.90,0.93, 0.95,0.96,0.97,0.98,0.99])

Unnamed: 0,user_id,signup_ts,tenure_days,num_calls,num_searches,num_messages,last_action_ts,search_per_day,call_per_day,message_per_day
count,26000.0,26000,26000.0,26000.0,26000.0,26000.0,26000,26000.0,26000.0,26000.0
mean,10013000.0,2019-03-18 20:44:39.507692288,11.577731,47.069,33.350308,4.067808,2019-03-29 21:15:20.723115008,4.912244,5.32828,0.605702
min,10000000.0,2019-03-02 00:00:00,1.0,0.0,1.0,0.0,2019-03-02 11:39:02,0.05,0.0,0.0
25%,10006500.0,2019-03-12 00:00:00,5.0,15.0,19.0,0.0,2019-03-26 15:35:40.500000,1.54,1.58,0.0
50%,10013000.0,2019-03-20 00:00:00,9.0,30.0,28.0,0.0,2019-04-03 12:14:56,2.69,3.56,0.0
75%,10019500.0,2019-03-27 00:00:00,17.0,59.0,35.0,2.0,2019-04-03 20:35:04.500000,5.12,7.0,0.21
80%,10020800.0,2019-03-28 00:00:00,19.0,70.0,37.0,3.0,2019-04-03 21:25:32.200000,6.0,8.0,0.4
85%,10022100.0,2019-03-29 00:00:00,21.0,85.0,39.0,5.0,2019-04-03 22:15:01.749999872,7.6,9.5,0.67
90%,10023400.0,2019-03-30 00:00:00,24.0,107.0,40.0,8.0,2019-04-03 23:10:51.200000,10.0,11.55,1.14
93%,10024180.0,2019-03-31 00:00:00,26.0,126.0,42.0,11.0,2019-04-03 23:52:01.069999872,12.5,13.75,1.8


# 4.3. Scraper Detection – Rule-Based Approach

Rule 1: High searches (≥97th pct) with low calls/messages — scrapers.

Rule 2: Extremely high search + call/message + long tenure — systematic high-volume scrapers.

Rule 3: High search + high call but low message — fast contact grabbers (no text follow-up).

Rule 4: High search + high message but low call — mass-messagers using scraped data, avoiding voice.

In [22]:
from concurrent.futures import ThreadPoolExecutor, as_completed
from tqdm import tqdm
import pandas as pd
import numpy as np

def detect_scraper_patterns(batch_df):
    # -------------------------------
    # Pattern 1: High search volume but low call/message
    # Real-life Scenario:
    #   A user performs an abnormally high number of searches (like a bot scraping listings)
    #   but rarely initiates meaningful engagement (calls or messages).
    #   This mismatch in intent and interaction is a strong scraper signal.
    batch_df['is_scraper_low_engagement'] = (
        (batch_df['num_searches'] >= batch_df['num_searches'].quantile(0.97)) &
        (batch_df['search_per_day'] >= batch_df['search_per_day'].quantile(0.99)) &
        (batch_df['call_per_day'] <= batch_df['call_per_day'].quantile(0.50)) &
        (batch_df['message_per_day'] <= batch_df['message_per_day'].quantile(0.95))
    )

    # -------------------------------
    #  Pattern 2: High activity across search/call/message with long tenure
    # Real-life Scenario:
    #   Some scrapers mimic real users but operate at extremely high volume across all actions.
    #   If such high activity spans a long time (high tenure), this could indicate
    #   well-maintained automated accounts or systematic scraping bots.
    batch_df['is_scraper_high_activity'] = (
        (batch_df['search_per_day'] >= batch_df['search_per_day'].quantile(0.97)) &
        (
            (batch_df['call_per_day'] >= batch_df['call_per_day'].quantile(0.99)) |
            (batch_df['message_per_day'] >= batch_df['message_per_day'].quantile(0.75))
        ) &
        (batch_df['tenure_days'] >= batch_df['tenure_days'].quantile(0.97))
    )

    # -------------------------------
    #  Pattern 3: High search + high call but low message
    # Real-life Scenario:
    #   These users aggressively search and call but barely send messages.
    #   Many bots are programmed to scrape contact details quickly without following up via text.
    batch_df['is_scraper_search_call_low_msg'] = (
        (batch_df['search_per_day'] >= batch_df['search_per_day'].quantile(0.97)) &
        (batch_df['call_per_day'] >= batch_df['call_per_day'].quantile(0.95)) &
        (batch_df['message_per_day'] <= batch_df['message_per_day'].quantile(0.25))
    )

    # -------------------------------
    #  Pattern 4: High search + high message but low call
    # Real-life Scenario:
    #   Bots configured to mass-message users using scraped info, but without making calls.
    #   Such automated "message spammers" often have no voice interaction.
    batch_df['is_scraper_search_msg_low_call'] = (
        (batch_df['search_per_day'] >= batch_df['search_per_day'].quantile(0.99)) &
        (batch_df['message_per_day'] >= batch_df['message_per_day'].quantile(0.99)) &
        (batch_df['call_per_day'] <= batch_df['call_per_day'].quantile(0.25))
    )

    return batch_df[
        ['user_id',
         'is_scraper_low_engagement',
         'is_scraper_high_activity',
         'is_scraper_search_call_low_msg',
         'is_scraper_search_msg_low_call']
    ]

def detect_scrapers_parallel(master_df, batch_size=1000, num_threads=8):
    # Divide user data into batches for multi-threaded parallel processing
    unique_user_ids = master_df['user_id'].unique()
    total_batches = int(np.ceil(len(unique_user_ids) / batch_size))

    batches = []
    for i in range(total_batches):
        batch_ids = unique_user_ids[i * batch_size : (i + 1) * batch_size]
        batch_df = master_df[master_df['user_id'].isin(batch_ids)]
        batches.append(batch_df)

    results = []
    with ThreadPoolExecutor(max_workers=num_threads) as executor:
        futures = [executor.submit(detect_scraper_patterns, batch) for batch in batches]
        for f in tqdm(as_completed(futures), total=len(futures), desc="Detecting scrapers"):
            results.append(f.result())

    # Merge predictions back to original data
    combined = pd.concat(results, ignore_index=True)
    merged = master_df.merge(combined, on='user_id', how='left')

    # Separate each pattern's detections
    df_pattern1 = merged[merged['is_scraper_low_engagement'] == True]
    df_pattern2 = merged[merged['is_scraper_high_activity'] == True]
    df_pattern3 = merged[merged['is_scraper_search_call_low_msg'] == True]
    df_pattern4 = merged[merged['is_scraper_search_msg_low_call'] == True]

    return df_pattern1, df_pattern2, df_pattern3, df_pattern4

# runs pattern detection on full dataset in parallel
unusual_df, high_activity_df, search_call_low_msg_df, search_msg_low_call_df = detect_scrapers_parallel(master_df)

print(f"Pattern 1 - Low Engagement Scrapers: {len(unusual_df)}")
print(f"Pattern 2 - High Activity Scrapers: {len(high_activity_df)}")
print(f"Pattern 3 - High Search+Call, Low Message: {len(search_call_low_msg_df)}")
print(f"Pattern 4 - High Search+Message, Low Call: {len(search_msg_low_call_df)}")


A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  batch_df['is_scraper_low_engagement'] = (
A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  batch_df['is_scraper_low_engagement'] = (
A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  batch_df['is_scraper_low_engagement'] = (
A value is trying to be set on a copy of a slice from a DataFrame.
Try using .l

Pattern 1 - Low Engagement Scrapers: 204
Pattern 2 - High Activity Scrapers: 0
Pattern 3 - High Search+Call, Low Message: 50
Pattern 4 - High Search+Message, Low Call: 1





### Inference from Scraper Detection Rules:

Most scraper-like users fall under Pattern 1, indicating a dominant behavior of heavy search activity with little engagement (calls/messages) — classic low-intent scraping. Pattern 3 shows moderate presence, pointing to users aggressively collecting contact info via search and calls. Pattern 4 and Pattern 2 are rare or absent, suggesting fewer bots mimic long-tenure behavior or focus solely on messaging. Overall, scrapers prefer fast, passive data extraction over long-term or balanced interaction.

# 4.4 Indentify multiple signups

In [17]:
# Count number of signups per user_id
signup_counts = signup_data['user_id'].value_counts().reset_index()
signup_counts.columns = ['user_id', 'signup_count']
# ----------------------------------------
# Real-life context:
# A genuine user typically signs up once and uses their account.
# However, scrapers or bots often create multiple accounts (signups) to bypass system restrictions,
# avoid detection, or gain extra access (like free trials or limits).
# Hence, users with multiple signups are suspicious and worth investigating.
# ----------------------------------------
# Filter users with multiple signups
multiple_signups = signup_counts[signup_counts['signup_count'] > 1]

print(f"Total unique users: {signup_counts.shape[0]}")
print(f"Users with multiple signups: {multiple_signups.shape[0]}")
print(multiple_signups.head())

Total unique users: 26000
Users with multiple signups: 0
Empty DataFrame
Columns: [user_id, signup_count]
Index: []


# 4.5. Identify suspicious users with unusually high search activity and analyze patterns in their search timings.

In [23]:
# Merge signup timestamps into search data
search_data = search_data.merge(signup_data[['user_id', 'signup_ts']], on='user_id', how='left')
# Group by user_id to compute desired features
search_summary = search_data.groupby('user_id').agg(
    first_search_time=('search_ts', 'min'),
    last_search_time=('search_ts', 'max'),
    num_searches=('search_ts', 'count')
).reset_index()

# Calculate tenure (duration between first and last search)
search_summary['tenure_days'] = (
    search_summary['last_search_time'] - search_summary['first_search_time']
).dt.total_seconds() / 86400  # Convert seconds to days

search_summary['avg_num_of_searches'] = search_summary['num_searches'] / search_summary['tenure_days']

In [24]:
search_summary['num_searches'].describe([0.25, 0.5, 0.75, 0.80, 0.85, 0.90,0.93, 0.95,0.97]).round(2)

count    26000.00
mean        33.35
std         63.68
min          1.00
25%         19.00
50%         28.00
75%         35.00
80%         37.00
85%         39.00
90%         40.00
93%         42.00
95%         44.00
97%         45.00
max       5631.00
Name: num_searches, dtype: float64

In [None]:
# took 97% quantile value
search_summary[(search_summary['avg_num_of_searches'] > 100) & (search_summary['num_searches']> 45)]['user_id'].nunique()

297

In [27]:
suspicious_users_from_search = search_summary[(search_summary['avg_num_of_searches'] > 100) & (search_summary['num_searches'] > 45)]

##### Found 297 suspecious user ids

In [30]:
suspicious_users_from_search.head()

Unnamed: 0,user_id,first_search_time,last_search_time,num_searches,tenure_days,avg_num_of_searches
31,10000031,2019-03-18 09:54:52,2019-03-18 15:42:00,286,0.241065,1186.402919
82,10000082,2019-03-11 20:01:57,2019-03-11 21:47:23,400,0.073218,5463.167879
108,10000108,2019-03-15 13:37:13,2019-03-15 13:51:06,306,0.009641,31738.77551
142,10000142,2019-03-12 22:59:06,2019-03-13 17:25:41,306,0.768461,398.19866
833,10000833,2019-04-01 15:18:20,2019-04-01 15:49:25,292,0.021586,13527.506702


In [31]:
search_data1 = search_data.copy()
search_data1.shape, search_data1['user_id'].nunique()

((867108, 3), 26000)

In [32]:
search_data1 = search_data1[search_data1['user_id'].isin(suspicious_users_from_search['user_id'])]
search_data1.shape , search_data1['user_id'].nunique()

((98994, 3), 297)

In [33]:
search_data1 = search_data1.sort_values(by=['user_id', 'search_ts'])

In [34]:
search_data1 = search_data1.sort_values(by=['user_id', 'search_ts'])
# Calculate time difference between consecutive searches per user
search_data1['prev_ts'] = search_data1.groupby('user_id')['search_ts'].shift(1)
search_data1['time_diff_secs'] = (search_data1['search_ts'] - search_data1['prev_ts']).dt.total_seconds()


In [35]:
search_data1.sort_values(by=['user_id', 'search_ts'], inplace=True)
search_data1

Unnamed: 0,user_id,search_ts,signup_ts,prev_ts,time_diff_secs
38696,10000031,2019-03-18 09:54:52,2019-03-18 00:00:00,NaT,
807407,10000031,2019-03-18 09:55:11,2019-03-18 00:00:00,2019-03-18 09:54:52,19.0
695115,10000031,2019-03-18 09:55:30,2019-03-18 00:00:00,2019-03-18 09:55:11,19.0
356579,10000031,2019-03-18 09:55:54,2019-03-18 00:00:00,2019-03-18 09:55:30,24.0
92240,10000031,2019-03-18 09:56:16,2019-03-18 00:00:00,2019-03-18 09:55:54,22.0
...,...,...,...,...,...
115946,10025844,2019-03-07 15:36:14,2019-03-06 00:00:00,2019-03-07 15:36:09,5.0
684413,10025844,2019-03-07 15:36:35,2019-03-06 00:00:00,2019-03-07 15:36:14,21.0
422096,10025844,2019-03-07 15:37:00,2019-03-06 00:00:00,2019-03-07 15:36:35,25.0
526807,10025844,2019-03-07 15:37:25,2019-03-06 00:00:00,2019-03-07 15:37:00,25.0


## 4.6. Methodology: Detecting Users with Constant Search Gaps
Group users and calculate time differences between their searches.

Drop the first search (as it has no prior timestamp to compare).

Check if all time gaps for a user are exactly the same.

If yes, flag the user as suspicious for bot-like behavior.

Return user ID, constant gap, and total number of searches.

In [36]:
def detect_users_with_exact_time_gap(df, user_col='user_id', diff_col='time_diff_secs'):
    suspect_users = []

    grouped = df.groupby(user_col)

    for user, group in grouped:
        time_diffs = group[diff_col].dropna()  # exclude first search with NaN diff

        if len(time_diffs) >= 1:  # must have at least one time gap
            # Check if all time differences are exactly the same
            if time_diffs.nunique() == 1:
                suspect_users.append({
                    'user_id': user,
                    'constant_gap': time_diffs.iloc[0],
                    'num_searches': len(time_diffs) + 1  # original count of searches = gaps + 1
                })

    return pd.DataFrame(suspect_users)

    return pd.DataFrame(suspect_users)
suspect_df_constant_gaps = detect_users_with_exact_time_gap(search_data1, user_col='user_id', diff_col='time_diff_secs')


## 4.7. Methodology: Detecting Users with Time Gap Progression Patterns
For each user, calculate time differences between consecutive searches.

Check if time gaps follow an arithmetic progression (equal increments).

Check if gaps follow a geometric progression (constant ratio).

Check for fractional increase patterns using percentage change.

Flag users with any consistent progression pattern as suspicious.

In [37]:
def detect_user_patterns(user_id, diffs):
    diffs = diffs.dropna().reset_index(drop=True)
    output = {
        'user_id': user_id,
        'lambda_arithmetic': np.nan,
        'lambda_geometric': np.nan,
        'lambda_fraction': np.nan,
        'pattern_type': None,
        'user_is_suspect': False
    }

    # Arithmetic progression
    arith_diff = diffs.diff().dropna()
    if arith_diff.nunique() == 1:
        output['lambda_arithmetic'] = round(arith_diff.iloc[0], 2)
        output['pattern_type'] = 'arithmetic_progression'
        output['user_is_suspect'] = True
        return output

    # Geometric progression
    geom_ratio = diffs[1:] / diffs[:-1].values
    if geom_ratio.nunique() == 1:
        output['lambda_geometric'] = round(geom_ratio.iloc[0], 2)
        output['pattern_type'] = 'geometric_progression'
        output['user_is_suspect'] = True
        return output

    # Fractional increase
    frac_inc = diffs.pct_change().dropna()
    if frac_inc.nunique() == 1:
        output['lambda_fraction'] = round(frac_inc.iloc[0], 2)
        output['pattern_type'] = 'fractional_increase'
        output['user_is_suspect'] = True
        return output

    return output

# Step 3: Apply detection logic per user
pattern_summary = [
    detect_user_patterns(user_id, group['time_diff_secs'])
    for user_id, group in search_data1.groupby('user_id')
]
pattern_df = pd.DataFrame(pattern_summary)
pattern_df.dropna(axis=1, how='all', inplace=True)
pattern_df = pattern_df[pattern_df['user_is_suspect'] == True].reset_index(drop=True)
pattern_df = pattern_df[['user_id']].drop_duplicates().reset_index(drop=True)
pattern_df.shape, pattern_df['user_id'].nunique()

((0, 1), 0)

#### Did not found any pattern on those 297 suspecious user ids

## 4.8. Regular Pattern Detection using Time Gaps
Detects users performing events (e.g., search, call, message) at regular time intervals.

For each user, calculates time difference between consecutive events.

Flags users if their mean time gap is within 10% of the mode and has low standard deviation (< 60s).

Utilizes ThreadPoolExecutor for concurrent processing to scale across millions of users efficiently.

Supports progress tracking via tqdm, and returns a flagged user list per event type.

In [38]:
import pandas as pd
import numpy as np
from concurrent.futures import ThreadPoolExecutor, as_completed
from tqdm import tqdm


def process_user_batch(user_batch, df, user_col, time_col, event_type):
    """
    Process a batch of users and detect regular time patterns per user.
    """
    results = []

    for user_id in user_batch:
        user_df = df[df[user_col] == user_id].copy()

        # Sort events by timestamp
        user_df = user_df.sort_values(by=time_col)

        # Compute time differences between consecutive events (in seconds)
        user_df['time_diff'] = user_df[time_col].diff().dt.total_seconds()

        # Check for regularity: mean time_diff within 10% of mode and std < 60s
        if len(user_df['time_diff'].dropna()) >= 3:
            diffs = user_df['time_diff'].dropna()
            most_common_diff = diffs.mode().iloc[0]
            mean_diff = diffs.mean()
            std_diff = diffs.std()

            is_regular = (
                abs(mean_diff - most_common_diff) / most_common_diff < 0.1 and std_diff < 60
            )
        else:
            is_regular = False

        # Store results
        results.append({
            user_col: user_id,
            'event_type': event_type,
            'user_is_suspect': is_regular
        })

    return results


def detect_patterns_concurrent(df, user_col, time_col, event_type, batch_size=100, max_workers=4):
    """
    Run regular pattern detection concurrently using ThreadPoolExecutor with a tqdm progress bar.
    """
    # Convert timestamp column to datetime if needed
    if not np.issubdtype(df[time_col].dtype, np.datetime64):
        df[time_col] = pd.to_datetime(df[time_col])

    # Get all unique user IDs
    users = df[user_col].dropna().unique()
    
    # Break user IDs into batches
    user_batches = [users[i:i + batch_size] for i in range(0, len(users), batch_size)]

    results = []

    # Use ThreadPoolExecutor for concurrent processing with progress bar
    with ThreadPoolExecutor(max_workers=max_workers) as executor:
        futures = {
            executor.submit(process_user_batch, batch, df, user_col, time_col, event_type): batch
            for batch in user_batches
        }

        for future in tqdm(as_completed(futures), total=len(futures), desc=f"Processing {event_type}"):
            try:
                result = future.result()
                results.extend(result)
            except Exception as e:
                print(f"Batch failed with error: {e}")

    return pd.DataFrame(results)


In [39]:
if __name__ == '__main__':
    search_result = detect_patterns_concurrent(
        search_data,
        user_col='user_id',
        time_col='search_ts',
        event_type='search'
    )

    call_result = detect_patterns_concurrent(
        call_data,
        user_col='user_id',
        time_col='call_ts',
        event_type='call'
    )

    message_result = detect_patterns_concurrent(
        message_data,
        user_col='user_id',
        time_col='message_ts',
        event_type='message'
    )

    combined_patterns = pd.concat([search_result, call_result, message_result], ignore_index=True)
    suspect_users = combined_patterns[combined_patterns['user_is_suspect']]

    print(f"Total suspect users: {suspect_users.shape[0]}")
    print(suspect_users.head())


  abs(mean_diff - most_common_diff) / most_common_diff < 0.1 and std_diff < 60
  abs(mean_diff - most_common_diff) / most_common_diff < 0.1 and std_diff < 60
  abs(mean_diff - most_common_diff) / most_common_diff < 0.1 and std_diff < 60
  abs(mean_diff - most_common_diff) / most_common_diff < 0.1 and std_diff < 60
  abs(mean_diff - most_common_diff) / most_common_diff < 0.1 and std_diff < 60
  abs(mean_diff - most_common_diff) / most_common_diff < 0.1 and std_diff < 60
  abs(mean_diff - most_common_diff) / most_common_diff < 0.1 and std_diff < 60
  abs(mean_diff - most_common_diff) / most_common_diff < 0.1 and std_diff < 60
  abs(mean_diff - most_common_diff) / most_common_diff < 0.1 and std_diff < 60
  abs(mean_diff - most_common_diff) / most_common_diff < 0.1 and std_diff < 60
  abs(mean_diff - most_common_diff) / most_common_diff < 0.1 and std_diff < 60
  abs(mean_diff - most_common_diff) / most_common_diff < 0.1 and std_diff < 60
  abs(mean_diff - most_common_diff) / most_common_di

Total suspect users: 21
        user_id event_type  user_is_suspect
3211   10010114     search             True
6383   10012703     search             True
9668   10012276     search             True
12036  10002883     search             True
16166  10009479     search             True





In [40]:
suspect_users = suspect_users.merge(master_df, on='user_id', how='left', suffixes=('', '_total'))

# 5. Concat all the suspect userids who can be potential scrappers

In [41]:
import pandas as pd

# List of suspect DataFrames
suspect_dfs = [
    suspect_users,
    scraper_df1,
    scrapers_df2,
    unusual_df,
    high_activity_df,
    search_call_low_msg_df,
    search_msg_low_call_df,
    pattern_df
]

# Collect valid user_id Series
user_id_list = []

for df in suspect_dfs:
    if isinstance(df, pd.DataFrame) and not df.empty and 'user_id' in df.columns:
        user_id_list.append(df['user_id'].dropna())

# Concatenate and deduplicate
all_suspect_users = pd.concat(user_id_list, ignore_index=True).drop_duplicates().reset_index(drop=True)

print("Total unique suspect users found:", all_suspect_users.nunique())


Total unique suspect users found: 302


In [50]:
all_suspect_users_df = pd.DataFrame(all_suspect_users, columns=['suspected_scraper_user_id'])


In [52]:
search_data.columns, call_data.columns, message_data.columns

(Index(['user_id', 'search_ts', 'signup_ts'], dtype='object'),
 Index(['user_id', 'call_ts'], dtype='object'),
 Index(['user_id', 'message_ts'], dtype='object'))

In [53]:
all_suspect_users_df.merge(signup_data[['user_id', 'country_code']], left_on='suspected_scraper_user_id', right_on='user_id', how='left')['country_code'].value_counts(normalize=True).multiply(100)

country_code
IT    29.801325
GB    26.821192
SE    24.172185
FR    13.245033
ES     5.960265
Name: proportion, dtype: float64