In [1]:
import duckdb
import pandas as pd
import logging
import warnings
DB_PATH='/opt/test-data/experimental.duckdb'

# Use this line to ignore the specific UserWarning message
warnings.filterwarnings(
    action='ignore',
    message='pandas only supports SQLAlchemy',
    category=UserWarning
)

def load_sample_data(table_name, sample_size=1000):
    """
    Loads a sample of data from a DuckDB table into a pandas DataFrame.

    Args:
        table_name (str): The name of the table to sample from.
        sample_size (int): The number of rows to sample.

    Returns:
        pd.DataFrame: A DataFrame containing the sample data.
    """
    # 1. Establish a connection to the DuckDB database file
    # Using ':memory:' for a temporary in-memory database
    # Replace with 'path/to/your/database.duckdb' to connect to a file
    with duckdb.connect(database=DB_PATH, read_only=True) as conn:
        try:
            # 2. Use a SQL query to select a sample
            query = f"SELECT * FROM {table_name} using sample {sample_size} rows"

            # 3. Use pandas.read_sql to execute the query and load the data
            df = pd.read_sql(query, conn)
            
            print(f"Successfully loaded a sample of {len(df)} rows from {table_name}.")
            return df
        
        except duckdb.Error as e:
            print(f"An error occurred: {e}")
            return None

def combine_query_results(queries, db_path=DB_PATH):
    """
    Runs multiple SQL queries against a DuckDB database and combines the results
    into a single pandas DataFrame.

    Args:
        db_path (str): The path to the DuckDB database file.
        queries (list): A list of SQL query strings.

    Returns:
        pd.DataFrame: A single DataFrame containing the combined results.
    """
    # List to hold the individual query results
    result_dfs = []

    # Connect to the DuckDB database
    # Using a 'with' statement ensures the connection is closed automatically
    with duckdb.connect(database=db_path, read_only=True) as conn:
        print("Connected to DuckDB database.")
        for i, query in enumerate(queries):
            try:
                # Use pandas.read_sql to execute the query
                print(f"Executing query {i + 1}...")
                df = pd.read_sql(query, conn)
                result_dfs.append(df)
            except duckdb.Error as e:
                print(f"Error executing query {i + 1}: {e}")
                continue

    # Use pd.concat() to combine all DataFrames in the list
    if result_dfs:
        combined_df = pd.concat(result_dfs, ignore_index=True)
        print("All query results combined successfully.")
        return combined_df
    else:
        print("No results to combine.")
        return pd.DataFrame()

In [2]:
df = load_sample_data("er.pair_features",20000)
display(df.head())

Successfully loaded a sample of 20000 rows from er.pair_features.


Unnamed: 0,company_id_a,company_id_b,domain_exact,country_exact,city_exact,name_exact,name_compact_exact,name_prefix4_eq,name_prefix6_eq,emp_cur_absdiff,emp_cur_reldiff,emp_tot_absdiff,emp_tot_reldiff,both_have_domain,both_have_name,country_a,country_b,city_a,city_b
0,808699,4691738,0,1,0.0,0,0,1,1,14,0.736842,25,0.757576,1,1,united states,united states,sacramento,coral gables
1,4149873,7131113,0,1,1.0,0,0,0,0,1,0.5,1,0.5,1,1,brazil,brazil,sao paulo,sao paulo
2,4240637,6429044,0,1,0.0,0,0,1,0,68,1.0,93,0.989362,1,1,united states,united states,state college,lititz
3,2103768,3403384,0,1,0.0,0,0,1,0,2,0.666667,2,0.666667,1,1,india,india,pune,bombay
4,3046615,4102729,0,1,0.0,0,0,1,0,2,1.0,1,0.333333,1,1,united kingdom,united kingdom,hampshire,swansea


In [None]:
import pandas as pd
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import OneHotEncoder
from sklearn.compose import ColumnTransformer
from sklearn.linear_model import LogisticRegression
from sklearn.metrics import classification_report
from sklearn.pipeline import Pipeline, make_pipeline
from sklearn.impute import SimpleImputer

# Load your pair_features (CSV or from DuckDB via .df())
#df = pd.read_csv("pair_features_sample.csv")
# --- Weak labels ---
pos_mask = (df["domain_exact"] == 1) | (
    (df["country_exact"] == 1) &
    (df["name_prefix6_eq"] == 1) &
    (df["emp_tot_reldiff"].fillna(1.0) <= 0.20)
)

neg_mask = (df["country_exact"] == 0) | (
    (df["name_prefix4_eq"] == 0) &
    (df["emp_tot_reldiff"].fillna(1.0) >= 0.90)
)

# remove overlaps: positives win ties
neg_mask = neg_mask & (~pos_mask)

pos = df[pos_mask].copy()
pos["label"] = 1
neg = df[neg_mask].copy()
neg["label"] = 0

print(f"pos size: {len(pos)}")
print(f"neg size: {len(neg)}")
# downsample negatives to balance
# chances are the data will be mostly 'nonmatch' so 
# we want to try to offset that 
neg_sample = neg.sample(n=min(len(pos), len(neg)), random_state=42)
weak_train = pd.concat([pos, neg_sample], ignore_index=True).sample(frac=1, random_state=42)
print(f"size of weak_training set {len(weak_train)}")
# --- Features / target ---
drop_id_cols = ["company_id_a", "company_id_b"]
X = weak_train.drop(columns=["label"] + drop_id_cols, errors="ignore")
y = weak_train["label"]

# 2) Work out which categorical columns are actually present
candidate_cat = ["country_a", "country_b", "city_a", "city_b"]
cat_cols = [c for c in candidate_cat if c in X.columns]
num_cols = [c for c in X.columns if c not in cat_cols]

# (Optional) quick sanity: see if anything is missing
missing = set(cat_cols) - set(X.columns)
assert not missing, f"These categorical columns are missing from X: {missing}"

# 3) Preprocess: impute NaNs, one-hot the cats, pass through nums
pre = ColumnTransformer(
    transformers=[
        ("num", make_pipeline(SimpleImputer(strategy="median")), num_cols),
        ("cat", make_pipeline(SimpleImputer(strategy="most_frequent"),
                              OneHotEncoder(handle_unknown="ignore")), cat_cols),
    ],
    remainder="drop",
)

# 4) Model
clf = Pipeline([
    ("pre", pre),
    ("model", LogisticRegression(max_iter=1000, class_weight="balanced"))
])

X_train, X_val, y_train, y_val = train_test_split(
    X, y, test_size=0.25, stratify=y, random_state=42
)

clf.fit(X_train, y_train)
print(classification_report(y_val, clf.predict(X_val), digits=3))

# Probabilities for downstream use:
val_proba = clf.predict_proba(X_val)[:, 1]

pos size: 2007
neg size: 1851
size of weak_training set 3858
              precision    recall  f1-score   support

           0      0.996     0.996     0.996       463
           1      0.996     0.996     0.996       502

    accuracy                          0.996       965
   macro avg      0.996     0.996     0.996       965
weighted avg      0.996     0.996     0.996       965



In [6]:
import joblib
joblib.dump(clf, "../models/er_pair_clf.joblib")

['../models/er_pair_clf.joblib']

In [None]:
sanity_checks=[
  "SELECT COUNT(*) FROM er.pair_features ;",
"SELECT COUNT(*)  FROM er.pair_scores;",
"SELECT COUNT(*) FROM er.pair_features_scored ;",
"SELECT COUNT(*)  FROM er.pair_features_scored  WHERE model_score IS NULL;",
]
final_df = combine_query_results(sanity_checks)
print("\nFinal Combined DataFrame:")
print(final_df)


Connected to DuckDB database.
Executing query 1...
Executing query 2...
Executing query 3...
Executing query 4...
All query results combined successfully.

Final Combined DataFrame:
   count_star()
0     155577811
1     155577811
2     172867797
3             0


In [37]:
better_sanity="""
WITH pair_features_count AS (
    SELECT
        COUNT(*) AS total_pair_features
    FROM er.pair_features
), pair_scores_count AS (
    SELECT
        COUNT(*) AS total_pair_scores
    FROM er.pair_scores
), pair_features_scored_count AS (
    SELECT
        COUNT(*) AS total_pair_features_scored
    FROM er.pair_features_scored
), null_scores_count AS (
    SELECT
        COUNT(*) AS total_null_pair_features_scored
    FROM er.pair_features_scored
    WHERE model_score IS NULL
)
SELECT
    t1.total_pair_features,
    t2.total_pair_scores,
    t3.total_pair_features_scored,
    t4.total_null_pair_features_scored
FROM
    pair_features_count t1,
    pair_scores_count t2,
    pair_features_scored_count t3,
    null_scores_count t4;
"""
with duckdb.connect(database=DB_PATH, read_only=True) as conn:
    try:
        rep_df = pd.read_sql(better_sanity, conn)
    except duckdb.Error as e:
        print(f"An error occurred: {e}")

print(rep_df.to_csv(index=False))

total_pair_features,total_pair_scores,total_pair_features_scored,total_null_pair_features_scored
155577811,155577811,155577811,0

