In [9]:
#!pip install splink

In [1]:
import duckdb
from tqdm.auto import tqdm 
import pandas as pd

connection_source = duckdb.connect(database="../dbt/database_name.duckdb", read_only = True)

In [2]:
# OK : Filtrer les propriétaires ayant un logements vacant dans la prod (lovac 2024)
# Logement est le lien entre un propriétaire national et départemental 
# Aller chercher les logements via les ID locaux dans les fichiers fonciers
# Aller chercher les logements via les ID locaux dans la prod 

In [3]:
# Queries pour charger les DataFrames
query_prod_owners = """ 
SELECT
    CAST(o.id AS VARCHAR) AS unique_id,
    full_name AS owner_fullname,
    birth_date AS owner_birth_date,
    list_aggregate(o.address_dgfip, 'string_agg', ' ') as owner_address,
    kind_class AS owner_category_detail,
    CAST(idpersonne AS VARCHAR) AS owner_idpersonne,
    city AS owner_city,
    postal_code as owner_postal_code,
    array_agg(DISTINCT ph.local_id) as local_ids
FROM main_stg.stg_production_owners o 
JOIN production.ban_addresses ba
    ON ba.address_kind = 'Owner' AND ba.ref_id = o.id
JOIN main_stg.stg_production_owners_housing poh ON poh.owner_id = o.id
JOIN main_stg.stg_production_housing ph ON poh.housing_id = ph.id
WHERE (occupancy_source = 'V' OR occupancy = 'V') AND data_file_years IS NOT NULL AND list_contains(data_file_years, 'lovac-2024')
AND o.birth_date IS NOT NULL
GROUP BY o.id, full_name, birth_date, o.address_dgfip, kind_class, idpersonne, city, postal_code
;
"""
query_ff_owners = """
WITH idlocal_idprocte AS (
    SELECT ff_idlocal, ff_idprocpte
    FROM raw_lovac_2024
    UNION ALL
    SELECT ff_idlocal, ff_idprocpte
    FROM raw_lovac_2023
),
idprocte AS (
    SELECT
        ff_idprocpte,
        array_agg(ff_idlocal) AS ff_idlocals
    FROM idlocal_idprocte
    GROUP BY ff_idprocpte
),
owners_local_ids AS (
    SELECT
        o.idprocpte,
        array_agg(ids.ff_idlocals) AS ff_idlocals -- Applatissement des arrays imbriqués
    FROM raw_ff_owners o
    LEFT JOIN idprocte ids ON o.idprocpte = ids.ff_idprocpte
    GROUP BY o.idprocpte
),
unique_owner_ids AS (
    SELECT
        o.ff_owner_idpersonne,
        unnest(o.ff_owner_idprocpte) AS idprocpte
    FROM main_int.int_ff_owners_dedup o
)
SELECT
    CAST(o.ff_owner_idpersonne AS VARCHAR) AS unique_id,
    CAST(o.ff_owner_idpersonne AS VARCHAR) AS owner_idpersonne,
    ff_owner_address_1 || ' ' || ff_owner_address_2 || ' ' || ff_owner_address_3 || ' ' || ff_owner_address_4 AS owner_address,
    ff_owner_postal_code AS owner_postal_code,
    ff_owner_birth_date AS owner_birth_date,
    ff_owner_lastname AS owner_lastname,
    ff_owner_firstname AS owner_firstname,
    ff_owner_fullname AS owner_fullname,
    ff_owner_category_text AS owner_category_detail,
    ff_owner_city AS owner_city,
    flatten(flatten(array_agg(DISTINCT li.ff_idlocals))) AS local_ids -- Concaténation des arrays aplatis
FROM main_int.int_ff_owners_dedup o
JOIN unique_owner_ids u ON u.ff_owner_idpersonne = o.ff_owner_idpersonne
JOIN owners_local_ids li ON li.idprocpte = u.idprocpte
WHERE ff_owner_birth_date IS NOT NULL
GROUP BY
    o.ff_owner_idpersonne,
    ff_owner_address_1,
    ff_owner_address_2,
    ff_owner_address_3,
    ff_owner_address_4,
    ff_owner_postal_code,
    ff_owner_birth_date,
    ff_owner_lastname,
    ff_owner_firstname,
    ff_owner_fullname,
    ff_owner_category_text,
    ff_owner_city;
"""

df_ff_owners = connection_source.execute(query_ff_owners).fetchdf()
df_prod_owners = connection_source.execute(query_prod_owners).fetchdf()
print(df_ff_owners.shape)
print(df_prod_owners.shape)

connection_source.close()

FloatProgress(value=0.0, layout=Layout(width='auto'), style=ProgressStyle(bar_color='black'))

FloatProgress(value=0.0, layout=Layout(width='auto'), style=ProgressStyle(bar_color='black'))

(3693499, 11)
(1116443, 9)


In [None]:
import splink.comparison_library as cl
from splink import DuckDBAPI, Linker, SettingsCreator, block_on
from splink.comparison_level_library import ElseLevel, NullLevel
import duckdb

# Create DuckDBAPI connection
db_api = DuckDBAPI()

def local_ids_overlap_comparison():
    return {
        "output_column_name": "local_ids",
        "comparison_levels": [
            {
                "sql_condition": "local_ids_l IS NULL OR local_ids_r IS NULL",
                "label_for_charts": "Null",
                "is_null_level": True,
            },
            {
                "sql_condition": "ARRAY_INTERSECT(local_ids_l, local_ids_r) IS NOT NULL",
                "label_for_charts": "Overlap exists",
                "tf_adjustment_column": "local_ids",
                "tf_adjustment_weight": 1.0,
            },
            {
                "sql_condition": "ELSE",
                "label_for_charts": "No overlap",
            },
        ],
    }


# Splink settings
settings = SettingsCreator(
    link_type="link_only",  # Comparing two datasets
    comparisons=[
        cl.NameComparison("owner_fullname"),
        cl.DateOfBirthComparison(
            "owner_birth_date",
            input_is_string=False,
            datetime_metrics=["year", "month"],
            datetime_thresholds=[1, 2],
        ),
        cl.LevenshteinAtThresholds("owner_address"),
        cl.PostcodeComparison("owner_postal_code"),
        cl.ExactMatch("owner_city"),
        cl.ExactMatch("owner_category_detail"),
        # local_ids_overlap_comparison(),
    ],
    blocking_rules_to_generate_predictions=[
        block_on("owner_postal_code"),
        block_on("owner_category_detail"),
    ],
    retain_intermediate_calculation_columns=False,
)

In [13]:
# Linker instantiation
linker = Linker(
    input_table_or_tables=[df_prod_owners, df_ff_owners],
    settings=settings,
    db_api=db_api,
)

# Estimate the probability of a match between two random records
linker.training.estimate_probability_two_random_records_match(
    deterministic_matching_rules=[
        block_on("owner_postal_code", "owner_birth_date"),
        block_on("owner_birth_date"),
    ],
    recall=0.8,
)

Probability two random records match is estimated to be  1.61e-06.
This means that amongst all possible pairwise record comparisons, one in 619,980.29 are expected to match.  With 4,123,581,104,057 total possible comparisons, we expect a total of around 6,651,148.75 matching pairs


In [14]:
import duckdb
duckdb.sql("PRAGMA max_temp_directory_size='150GiB'")


In [15]:
pairwise_predictions = linker.inference.predict(threshold_match_weight=-5)

FloatProgress(value=0.0, layout=Layout(width='auto'), style=ProgressStyle(bar_color='black'))

SplinkException: Error executing the following sql for table `__splink__blocked_id_pairs`(__splink__blocked_id_pairs_dec1593a4):
CREATE TABLE __splink__blocked_id_pairs_dec1593a4 AS
WITH __splink__df_concat_with_tf AS (
  SELECT
    *
  FROM __splink__df_concat_with_tf_3acdcc836
), __splink__df_concat_with_tf_left AS (
  SELECT
    *
  FROM __splink__df_concat_with_tf
  WHERE
    source_dataset = (
      SELECT
        MIN(source_dataset)
      FROM __splink__df_concat_with_tf
    )
), __splink__df_concat_with_tf_right AS (
  SELECT
    *
  FROM __splink__df_concat_with_tf
  WHERE
    source_dataset = (
      SELECT
        MAX(source_dataset)
      FROM __splink__df_concat_with_tf
    )
)
SELECT
  '0' AS match_key,
  l."source_dataset" || '-__-' || l."unique_id" AS join_key_l,
  r."source_dataset" || '-__-' || r."unique_id" AS join_key_r
FROM __splink__df_concat_with_tf_left AS l
INNER JOIN __splink__df_concat_with_tf_right AS r
  ON (
    l."owner_postal_code" = r."owner_postal_code"
  )
WHERE
  1 = 1
UNION ALL
SELECT
  '1' AS match_key,
  l."source_dataset" || '-__-' || l."unique_id" AS join_key_l,
  r."source_dataset" || '-__-' || r."unique_id" AS join_key_r
FROM __splink__df_concat_with_tf_left AS l
INNER JOIN __splink__df_concat_with_tf_right AS r
  ON (
    l."owner_category_detail" = r."owner_category_detail"
  )
WHERE
  1 = 1
  AND NOT (
    COALESCE((
      l."owner_postal_code" = r."owner_postal_code"
    ), FALSE)
  )

Error was: Out of Memory Error: failed to offload data block of size 256.0 KiB (245.0 GiB/245.0 GiB used).
This limit was set by the 'max_temp_directory_size' setting.
By default, this setting utilizes the available disk space on the drive where the 'temp_directory' is located.
You can adjust this setting, by using (for example) PRAGMA max_temp_directory_size='10GiB'

In [4]:
pairwise_predictions

NameError: name 'pairwise_predictions' is not defined