In [1]:
from sqlalchemy import create_engine
import pandas as pd

### Load the Dataset

In [3]:
from sqlalchemy import create_engine
import pandas as pd
import urllib.parse

password = urllib.parse.quote_plus("Balaji2001@")
engine = create_engine(f"postgresql://postgres:{password}@localhost:5432/postgres")


In [4]:
schemas = pd.read_sql("""
    SELECT schema_name 
    FROM information_schema.schemata
""", engine)
print(schemas)


          schema_name
0              public
1          cherre_src
2  information_schema
3          pg_catalog
4            pg_toast


In [5]:
cherre_tables = pd.read_sql("""
    SELECT table_name 
    FROM information_schema.tables A
    WHERE table_schema = 'cherre_src'
""", engine)
print(cherre_tables)


                           table_name
0                agg_nedl_demographic
1            agg_nedl_demographic_old
2                  agg_nedl_occupancy
3                   agg_nedl_recorder
4          agg_nedl_recorder_mortgage
5                     agg_nedl_rental
6             agg_nedl_rental_persqft
7               agg_nedl_tax_assessor
8             agg_tax_assessor_owners
9         agg_tax_assessor_owners_old
10             datasource_1_c_mapping
11          lkp_owner_standardization
12       lkp_property_standardization
13     lookup_recorder_type_code_deed
14                  owner_enrich_data
15               raw_nedl_demographic
16           raw_nedl_demographic_old
17                 raw_nedl_occupancy
18                  raw_nedl_recorder
19          raw_nedl_recorder_grantee
20          raw_nedl_recorder_grantor
21         raw_nedl_recorder_mortgage
22               raw_nedl_rent_actual
23            raw_nedl_rent_per_sq_ft
24              raw_nedl_tax_assessor
25        ra

# Typecasting

### raw_nedl_tax_assessor Table

In [8]:
df = pd.read_sql('SELECT * FROM cherre_src.raw_nedl_tax_assessor', engine)

In [9]:
df.info()
len(df)

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 161424 entries, 0 to 161423
Data columns (total 61 columns):
 #   Column                                   Non-Null Count   Dtype         
---  ------                                   --------------   -----         
 0   tax_assessor_id                          161424 non-null  int64         
 1   situs_state                              161424 non-null  object        
 2   situs_county                             161424 non-null  object        
 3   fips_code                                161424 non-null  int64         
 4   cbsa_name                                160692 non-null  object        
 5   cbsa_code                                161352 non-null  object        
 6   msa_name                                 160692 non-null  object        
 7   msa_code                                 161352 non-null  object        
 8   metro_division                           48656 non-null   object        
 9   neighborhood_code         

161424

In [10]:
import pandas as pd
import numpy as np
import re

def clean_large_property_dataset(df):
    # === Integer Cleanup Function ===
    def clean_and_cast_to_int64(df, columns):
        for col in columns:
            if col not in df.columns:
                print(f"Skipping missing column: {col}")
                continue
            df[col] = df[col].astype(str).str.strip()
            df[col] = df[col].str.replace(r'^(\d+)\.0$', r'\1', regex=True)
            df[col] = pd.to_numeric(df[col], errors='coerce').astype('Int64')
        return df

    # === FIPS Code (pad to 5 digits) ===
    if 'fips_code' in df.columns:
        df['fips_code'] = df['fips_code'].astype(str).str.zfill(5)

    # === ZIP Code Cleanup ===
    def clean_zip_column(col, length):
        if col not in df.columns:
            print(f"Skipping missing ZIP column: {col}")
            return
        df[col] = df[col].astype(str).str.strip()
        df[col] = df[col].str.extract(r'(\d+)', expand=False)
        df[col] = df[col].str.zfill(length)
        df[col] = df[col].where(df[col].str.fullmatch(rf'\d{{{length}}}'), pd.NA)
        df[col] = df[col].astype('string')

    clean_zip_column('zip', 5)
    clean_zip_column('zip_4', 4)

    # === Create zip_numeric and zip_4_numeric if not present ===
    if 'zip' in df.columns:
        df['zip_numeric'] = pd.to_numeric(df['zip'], errors='coerce')
    if 'zip_4' in df.columns:
        df['zip_4_numeric'] = pd.to_numeric(df['zip_4'], errors='coerce')

    # === Integer Columns ===
    int_columns = [
        'census_tract', 'census_block_group', 'census_block',
        'assessed_tax_year', 'zip_numeric', 'zip_4_numeric',
        'bed_count', 'room_count', 'stories_count', 'units_count',
        'year_built', 'bath_count', 'effective_year_built'
    ]
    df = clean_and_cast_to_int64(df, int_columns)

    # === Float Columns ===
    float_columns = [
        'latitude', 'longitude',
        'assessed_value_total', 'assessed_value_improvements',
        'assessed_value_land', 'assessed_improvements_percent',
        'last_sale_amount', 'prior_sale_amount',
        'building_sq_ft', 'gross_sq_ft'
    ]
    for col in float_columns:
        if col in df.columns:
            df[col] = df[col].astype('float64')

    # === String Columns ===
    string_columns = [
        'fips_code', 'cbsa_name', 'cbsa_code',
        'msa_name', 'msa_code', 'metro_division',
        'neighborhood_code', 'assessor_parcel_number_raw',
        'alternate_assessor_parcel_number', 'address', 'house_number',
        'street_direction','zip_4', 'street_name', 'street_suffix', 'street_post_direction',
        'unit_prefix', 'unit_number', 'city', 'state',
        'zone_code', 'property_use_standardized_code',
        'building_sq_ft_code', 'property_use_code_mapped',
        'cherre_assessor_parcel_number_formatted', 'last_sale_document_type',
        'identifier'
    ]
    for col in string_columns:
        if col in df.columns:
            df[col] = df[col].astype('string')

    return df


In [11]:
df = clean_large_property_dataset(df)
df.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 161424 entries, 0 to 161423
Data columns (total 63 columns):
 #   Column                                   Non-Null Count   Dtype         
---  ------                                   --------------   -----         
 0   tax_assessor_id                          161424 non-null  int64         
 1   situs_state                              161424 non-null  object        
 2   situs_county                             161424 non-null  object        
 3   fips_code                                161424 non-null  string        
 4   cbsa_name                                160692 non-null  string        
 5   cbsa_code                                161352 non-null  string        
 6   msa_name                                 160692 non-null  string        
 7   msa_code                                 161352 non-null  string        
 8   metro_division                           48656 non-null   string        
 9   neighborhood_code         

### raw_nedl_tax_assessor_owner Table

In [13]:
df_owner = pd.read_sql("SELECT * FROM cherre_src.raw_nedl_tax_assessor_owner LIMIT 100", engine)
df_owner.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 100 entries, 0 to 99
Data columns (total 15 columns):
 #   Column                           Non-Null Count  Dtype              
---  ------                           --------------  -----              
 0   cherre_ingest_datetime           100 non-null    datetime64[ns, UTC]
 1   cherre_tax_assessor_owner_pk     100 non-null    int64              
 2   is_owner_company                 100 non-null    bool               
 3   owner_first_name                 0 non-null      object             
 4   owner_last_name                  74 non-null     object             
 5   owner_name_suffix                0 non-null      object             
 6   owner_name_latest_deed           63 non-null     object             
 7   owner_name                       100 non-null    object             
 8   owner_middle_name                0 non-null      object             
 9   owner_type                       33 non-null     object             
 10  own

In [14]:
def clean_owner_dataset(df_owner):
    # === Convert Integer ID columns ===
    int_columns = [
        'cherre_tax_assessor_owner_pk',
        'tax_assessor_id'
    ]
    for col in int_columns:
        if col in df_owner.columns:
            df_owner[col] = pd.to_numeric(df_owner[col], errors='coerce').astype('Int64')

    # === Convert to String dtype ===
    string_columns = [
        'owner_first_name', 'owner_last_name', 'owner_name_suffix',
        'owner_name_latest_deed', 'owner_name', 'owner_middle_name',
        'owner_type', 'owner_trust_type_code', 'ownership_vesting_relation_code',
        'identifier'
    ]
    for col in string_columns:
        if col in df_owner.columns:
            df_owner[col] = df_owner[col].astype('string')

    return df_owner

In [15]:
df_owner = clean_owner_dataset(df_owner)
df_owner.info()


<class 'pandas.core.frame.DataFrame'>
RangeIndex: 100 entries, 0 to 99
Data columns (total 15 columns):
 #   Column                           Non-Null Count  Dtype              
---  ------                           --------------  -----              
 0   cherre_ingest_datetime           100 non-null    datetime64[ns, UTC]
 1   cherre_tax_assessor_owner_pk     100 non-null    Int64              
 2   is_owner_company                 100 non-null    bool               
 3   owner_first_name                 0 non-null      string             
 4   owner_last_name                  74 non-null     string             
 5   owner_name_suffix                0 non-null      string             
 6   owner_name_latest_deed           63 non-null     string             
 7   owner_name                       100 non-null    string             
 8   owner_middle_name                0 non-null      string             
 9   owner_type                       33 non-null     string             
 10  own

### raw_nedl_recorder Table

In [17]:
df_recorder = pd.read_sql("SELECT * FROM cherre_src.raw_nedl_recorder", engine)
df_recorder.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 837324 entries, 0 to 837323
Data columns (total 41 columns):
 #   Column                            Non-Null Count   Dtype              
---  ------                            --------------   -----              
 0   recorder_id                       837324 non-null  int64              
 1   document_recording_state          837308 non-null  object             
 2   document_recording_county         837324 non-null  object             
 3   document_recording_fips           837324 non-null  int64              
 4   document_type_code                837324 non-null  object             
 5   document_number_formatted         686290 non-null  object             
 6   transfer_multiple_parcel_code     163548 non-null  float64            
 7   arms_length_code                  690220 non-null  float64            
 8   document_amount                   540198 non-null  float64            
 9   owner_relationship_code           659516 non-nul

In [18]:
import pandas as pd

def clean_recorder_dataset(df_recorder):
    # === Convert Integer ID columns ===
    int_columns = [
        'recorder_id', 'document_recording_fips', 'tax_assessor_id'
    ]
    for col in int_columns:
        if col in df_recorder.columns:
            df_recorder[col] = pd.to_numeric(df_recorder[col], errors='coerce').astype('Int64')

    # === Convert to String dtype ===
    string_columns = [
        'document_recording_state', 'document_recording_county', 'document_type_code',
        'document_number_formatted', 'owner_relationship_code', 'title_company_code',
        'title_company_name', 'property_address', 'property_house_number',
        'property_street_direction', 'property_street_name', 'property_street_suffix',
        'property_street_post_direction', 'property_unit_prefix', 'property_unit_number',
        'property_city', 'property_state', 'property_crrt', 'property_group_type',
        'property_use_standardized_code', 'assessor_parcel_number_formatted',
        'resale_flag', 'new_construction_flag', 'inter_family_flag',
        'reo_sale_flag', 'identifier'
    ]
    for col in string_columns:
        if col in df_recorder.columns:
            df_recorder[col] = df_recorder[col].astype('string')

    return df_recorder


In [19]:
df_recorder = clean_recorder_dataset(df_recorder)
df_recorder.info()


<class 'pandas.core.frame.DataFrame'>
RangeIndex: 837324 entries, 0 to 837323
Data columns (total 41 columns):
 #   Column                            Non-Null Count   Dtype              
---  ------                            --------------   -----              
 0   recorder_id                       837324 non-null  Int64              
 1   document_recording_state          837308 non-null  string             
 2   document_recording_county         837324 non-null  string             
 3   document_recording_fips           837324 non-null  Int64              
 4   document_type_code                837324 non-null  string             
 5   document_number_formatted         686290 non-null  string             
 6   transfer_multiple_parcel_code     163548 non-null  float64            
 7   arms_length_code                  690220 non-null  float64            
 8   document_amount                   540198 non-null  float64            
 9   owner_relationship_code           659516 non-nul

### raw_nedl_recorder_grantee Table

In [21]:
df_grantee = pd.read_sql("SELECT * FROM cherre_src.raw_nedl_recorder_grantee", engine)
df_grantee.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 4699144 entries, 0 to 4699143
Data columns (total 29 columns):
 #   Column                         Dtype              
---  ------                         -----              
 0   cherre_ingest_datetime         datetime64[ns, UTC]
 1   cherre_recorder_grantee_pk     int64              
 2   grantee_address                object             
 3   grantee_care_of_name           object             
 4   grantee_city                   object             
 5   grantee_crrt                   object             
 6   grantee_entity_code            object             
 7   grantee_entity_count           float64            
 8   grantee_first_name             object             
 9   grantee_house_number           object             
 10  grantee_last_name              object             
 11  grantee_middle_name            object             
 12  grantee_name                   object             
 13  grantee_name_suffix            object     

In [22]:
def clean_grantee_dataset(df_grantee):
    # === Integer Columns ===
    int_columns = [
        'cherre_recorder_grantee_pk',
        'recorder_id'
    ]
    for col in int_columns:
        if col in df_grantee.columns:
            df_grantee[col] = pd.to_numeric(df_grantee[col], errors='coerce').astype('Int64')

    # === Float/Numeric Columns ===
    if 'grantee_entity_count' in df_grantee.columns:
        df_grantee['grantee_entity_count'] = pd.to_numeric(df_grantee['grantee_entity_count'], errors='coerce')

    # === String Columns ===
    string_columns = [
        'grantee_address', 'grantee_care_of_name', 'grantee_city', 'grantee_crrt',
        'grantee_entity_code', 'grantee_first_name', 'grantee_house_number',
        'grantee_last_name', 'grantee_middle_name', 'grantee_name',
        'grantee_name_suffix', 'grantee_owner_type', 'grantee_state',
        'grantee_street_direction', 'grantee_street_name', 'grantee_street_post_direction',
        'grantee_street_suffix', 'grantee_unit_number', 'grantee_unit_prefix',
        'grantee_vesting_deed_code', 'grantee_vesting_recorded_code', 'identifier'
    ]
    for col in string_columns:
        if col in df_grantee.columns:
            df_grantee[col] = df_grantee[col].astype('string')

    # === ZIP Code Padding ===
    def format_zip_code(x, length):
        try:
            return str(int(float(x))).zfill(length) if pd.notnull(x) else pd.NA
        except:
            return pd.NA

    if 'grantee_zip' in df_grantee.columns:
        df_grantee['grantee_zip'] = df_grantee['grantee_zip'].apply(lambda x: format_zip_code(x, 5)).astype('string')

    if 'grantee_zip_4' in df_grantee.columns:
        df_grantee['grantee_zip_4'] = df_grantee['grantee_zip_4'].apply(lambda x: format_zip_code(x, 4)).astype('string')

    return df_grantee

In [23]:
df_grantee = clean_grantee_dataset(df_grantee)
df_grantee.info()


<class 'pandas.core.frame.DataFrame'>
RangeIndex: 4699144 entries, 0 to 4699143
Data columns (total 29 columns):
 #   Column                         Dtype              
---  ------                         -----              
 0   cherre_ingest_datetime         datetime64[ns, UTC]
 1   cherre_recorder_grantee_pk     Int64              
 2   grantee_address                string             
 3   grantee_care_of_name           string             
 4   grantee_city                   string             
 5   grantee_crrt                   string             
 6   grantee_entity_code            string             
 7   grantee_entity_count           float64            
 8   grantee_first_name             string             
 9   grantee_house_number           string             
 10  grantee_last_name              string             
 11  grantee_middle_name            string             
 12  grantee_name                   string             
 13  grantee_name_suffix            string     

### raw_nedl_recorder_grantor Table

In [25]:
df_grantor = pd.read_sql("SELECT * FROM cherre_src.raw_nedl_recorder_grantor", engine)
df_grantor.info()


<class 'pandas.core.frame.DataFrame'>
RangeIndex: 3551506 entries, 0 to 3551505
Data columns (total 24 columns):
 #   Column                         Dtype              
---  ------                         -----              
 0   cherre_ingest_datetime         datetime64[ns, UTC]
 1   cherre_recorder_grantor_pk     float64            
 2   grantor_address                object             
 3   grantor_city                   object             
 4   grantor_crrt                   object             
 5   grantor_entity_code            object             
 6   grantor_first_name             object             
 7   grantor_house_number           float64            
 8   grantor_last_name              object             
 9   grantor_middle_name            object             
 10  grantor_name                   object             
 11  grantor_name_suffix            object             
 12  grantor_state                  object             
 13  grantor_street_direction       object     

In [26]:

def clean_grantor_dataset(df_grantor):
    # === Safe cast for grantor PK if it's strictly integer ===
    if 'cherre_recorder_grantor_pk' in df_grantor.columns:
        df_grantor['cherre_recorder_grantor_pk'] = df_grantor['cherre_recorder_grantor_pk'].apply(
            lambda x: int(x) if pd.notnull(x) and x == int(x) else pd.NA
        ).astype('Int64')

    # === Recorder ID as nullable int ===
    if 'recorder_id' in df_grantor.columns:
        df_grantor['recorder_id'] = pd.to_numeric(df_grantor['recorder_id'], errors='coerce').astype('Int64')

    # === House number to cleaned string ===
    if 'grantor_house_number' in df_grantor.columns:
        df_grantor['grantor_house_number'] = df_grantor['grantor_house_number'].apply(
            lambda x: str(int(x)) if pd.notnull(x) else pd.NA
        ).astype('string')

    # === String columns ===
    string_columns = [
        'grantor_address', 'grantor_city', 'grantor_crrt', 'grantor_entity_code',
        'grantor_first_name', 'grantor_last_name', 'grantor_middle_name',
        'grantor_name', 'grantor_name_suffix', 'grantor_state',
        'grantor_street_direction', 'grantor_street_name', 'grantor_street_post_direction',
        'grantor_street_suffix', 'grantor_unit_number', 'grantor_unit_prefix',
        'identifier'
    ]
    for col in string_columns:
        if col in df_grantor.columns:
            df_grantor[col] = df_grantor[col].astype('string')

    # === ZIP code cleaning function ===
    def format_zip_code(x, length):
        try:
            return str(int(float(x))).zfill(length) if pd.notnull(x) else pd.NA
        except:
            return pd.NA

    if 'grantor_zip' in df_grantor.columns:
        df_grantor['grantor_zip'] = df_grantor['grantor_zip'].apply(lambda x: format_zip_code(x, 5)).astype('string')

    if 'grantor_zip_4' in df_grantor.columns:
        df_grantor['grantor_zip_4'] = df_grantor['grantor_zip_4'].apply(lambda x: format_zip_code(x, 4)).astype('string')

    return df_grantor

In [27]:
df_grantor = clean_grantor_dataset(df_grantor)
df_grantor.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 3551506 entries, 0 to 3551505
Data columns (total 24 columns):
 #   Column                         Dtype              
---  ------                         -----              
 0   cherre_ingest_datetime         datetime64[ns, UTC]
 1   cherre_recorder_grantor_pk     Int64              
 2   grantor_address                string             
 3   grantor_city                   string             
 4   grantor_crrt                   string             
 5   grantor_entity_code            string             
 6   grantor_first_name             string             
 7   grantor_house_number           string             
 8   grantor_last_name              string             
 9   grantor_middle_name            string             
 10  grantor_name                   string             
 11  grantor_name_suffix            string             
 12  grantor_state                  string             
 13  grantor_street_direction       string     

### raw_nedl_recorder_mortgage Table

In [29]:
df_mortgage = pd.read_sql("SELECT * FROM cherre_src.raw_nedl_recorder_mortgage", engine)
df_mortgage.info()


<class 'pandas.core.frame.DataFrame'>
RangeIndex: 537876 entries, 0 to 537875
Data columns (total 27 columns):
 #   Column                               Non-Null Count   Dtype              
---  ------                               --------------   -----              
 0   cherre_recorder_mortgage_pk          537876 non-null  int64              
 1   recorder_id                          537876 non-null  int64              
 2   document_number                      433414 non-null  object             
 3   document_recorded_date               521422 non-null  datetime64[ns]     
 4   type_code                            509290 non-null  float64            
 5   amount                               536592 non-null  float64            
 6   lender_name                          524940 non-null  object             
 7   term                                 410604 non-null  float64            
 8   term_type_code                       410614 non-null  object             
 9   mortgage_due_da

In [30]:

def clean_mortgage_dataset(df_mortgage):
    # === Integer Columns ===
    int_columns = ['cherre_recorder_mortgage_pk', 'recorder_id']
    for col in int_columns:
        if col in df_mortgage.columns:
            df_mortgage[col] = pd.to_numeric(df_mortgage[col], errors='coerce').astype('Int64')

    # === String Columns ===
    string_columns = [
        'document_number', 'lender_name', 'term_type_code',
        'interest_rate_type_code', 'fixed_step_conversion_rate_code',
        'is_adjustable_rate_rider', 'interest_rate_change_frequency_code',
        'adjustable_rate_index_code', 'has_interest_only_period', 'identifier'
    ]
    for col in string_columns:
        if col in df_mortgage.columns:
            df_mortgage[col] = df_mortgage[col].astype('string')

    return df_mortgage


In [31]:
df_mortgage = clean_mortgage_dataset(df_mortgage)
df_mortgage.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 537876 entries, 0 to 537875
Data columns (total 27 columns):
 #   Column                               Non-Null Count   Dtype              
---  ------                               --------------   -----              
 0   cherre_recorder_mortgage_pk          537876 non-null  Int64              
 1   recorder_id                          537876 non-null  Int64              
 2   document_number                      433414 non-null  string             
 3   document_recorded_date               521422 non-null  datetime64[ns]     
 4   type_code                            509290 non-null  float64            
 5   amount                               536592 non-null  float64            
 6   lender_name                          524940 non-null  string             
 7   term                                 410604 non-null  float64            
 8   term_type_code                       410614 non-null  string             
 9   mortgage_due_da

### raw_nedl_rent_actual Table

In [33]:
# Generator that loads 1 million rows at a time
chunk_iter = pd.read_sql(
    "SELECT * FROM cherre_src.raw_nedl_rent_actual",
    engine,
    chunksize=1_000_000  
)

# Combine all chunks into one dataframe 
df_rent_actual = pd.concat(chunk_iter, ignore_index=True)


In [34]:
df_rent_actual.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 15842812 entries, 0 to 15842811
Data columns (total 7 columns):
 #   Column          Dtype         
---  ------          -----         
 0   property_id     object        
 1   market_name     object        
 2   submarket_name  object        
 3   property_name   object        
 4   series          object        
 5   record_date     datetime64[ns]
 6   record_values   float64       
dtypes: datetime64[ns](1), float64(1), object(5)
memory usage: 846.1+ MB


In [35]:
import pandas as pd

def clean_rent_actual_dataset(df_rent_actual):
    string_columns = [
        'property_id', 'market_name',
        'submarket_name', 'property_name',
        'series'
    ]
    for col in string_columns:
        if col in df_rent_actual.columns:
            df_rent_actual[col] = df_rent_actual[col].astype('string')

    return df_rent_actual


In [36]:
df_rent_actual = clean_rent_actual_dataset(df_rent_actual)
df_rent_actual.info()


<class 'pandas.core.frame.DataFrame'>
RangeIndex: 15842812 entries, 0 to 15842811
Data columns (total 7 columns):
 #   Column          Dtype         
---  ------          -----         
 0   property_id     string        
 1   market_name     string        
 2   submarket_name  string        
 3   property_name   string        
 4   series          string        
 5   record_date     datetime64[ns]
 6   record_values   float64       
dtypes: datetime64[ns](1), float64(1), string(5)
memory usage: 846.1 MB


### raw_nedl_occupancy Table

In [38]:
chunk_iter = pd.read_sql(
    "SELECT * FROM cherre_src.raw_nedl_occupancy",
    engine,
    chunksize=1_000_000  # Adjust based on RAM
)

# combine all chunks
df_occupancy = pd.concat(chunk_iter, ignore_index=True)


In [39]:
df_occupancy.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 14105108 entries, 0 to 14105107
Data columns (total 7 columns):
 #   Column          Dtype         
---  ------          -----         
 0   property_id     object        
 1   market_name     object        
 2   submarket_name  object        
 3   property_name   object        
 4   series          object        
 5   record_date     datetime64[ns]
 6   record_values   object        
dtypes: datetime64[ns](1), object(6)
memory usage: 753.3+ MB


In [40]:

def clean_occupancy_dataset(df_occupancy):
    # === String Columns ===
    string_columns = [
        'property_id', 'market_name',
        'submarket_name', 'property_name',
        'series'
    ]
    for col in string_columns:
        if col in df_occupancy.columns:
            df_occupancy[col] = df_occupancy[col].astype('string')

    # === Numeric Column (float) ===
    if 'record_values' in df_occupancy.columns:
        df_occupancy['record_values'] = pd.to_numeric(df_occupancy['record_values'], errors='coerce').astype('float64')

    return df_occupancy


In [41]:
df_occupancy = clean_occupancy_dataset(df_occupancy)
df_occupancy.info()


<class 'pandas.core.frame.DataFrame'>
RangeIndex: 14105108 entries, 0 to 14105107
Data columns (total 7 columns):
 #   Column          Dtype         
---  ------          -----         
 0   property_id     string        
 1   market_name     string        
 2   submarket_name  string        
 3   property_name   string        
 4   series          string        
 5   record_date     datetime64[ns]
 6   record_values   float64       
dtypes: datetime64[ns](1), float64(1), string(5)
memory usage: 753.3 MB


### raw_nedl_demographic Table

In [43]:
df_demo = pd.read_sql("SELECT * FROM cherre_src.raw_nedl_demographic", engine)


In [44]:
df_demo.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 155592 entries, 0 to 155591
Columns: 464 entries, geography_type to zip
dtypes: float64(6), int64(447), object(11)
memory usage: 550.8+ MB


In [45]:
import pandas as pd

def clean_demo_dataset(df_demo):
    # === Convert all object columns to string ===
    object_cols = df_demo.select_dtypes(include='object').columns
    for col in object_cols:
        df_demo[col] = df_demo[col].astype('string')

    # === Convert all int64 columns to nullable Int64 ===
    int_cols = df_demo.select_dtypes(include='int64').columns
    for col in int_cols:
        df_demo[col] = df_demo[col].astype('Int64')

    return df_demo


In [46]:
df_demo = clean_demo_dataset(df_demo)
df_demo.info()


<class 'pandas.core.frame.DataFrame'>
RangeIndex: 155592 entries, 0 to 155591
Columns: 464 entries, geography_type to zip
dtypes: Int64(447), float64(6), string(11)
memory usage: 617.1 MB


## Finding primary key

In [48]:
df['tax_assessor_id'].is_unique


False

In [49]:
len(df)

161424

In [50]:
import pandas as pd

# Sample usage assumes 'df' is already loaded

def suggest_identifier_columns(df, uniqueness_threshold=0.40):
    identifier_keywords = ['id', 'identifier', 'parcel', 'number', 'code', 'key', 'formatted']

    candidate_columns = []

    for col in df.columns:
        col_lower = col.lower()

        # 1. Check if column name contains common identifier keywords
        if any(keyword in col_lower for keyword in identifier_keywords):
            # 2. Check uniqueness ratio (non-null unique / total non-null)
            unique_count = df[col].nunique(dropna=True)
            non_null_count = df[col].notnull().sum()
            if non_null_count > 0:
                uniqueness_ratio = unique_count / non_null_count
                if uniqueness_ratio >= uniqueness_threshold:
                    candidate_columns.append(col)

    return candidate_columns


candidate_cols = suggest_identifier_columns(df)

print(" Suggested Identifier Columns:")
for col in candidate_cols:
    print(f" {col}")


 Suggested Identifier Columns:
 tax_assessor_id
 assessor_parcel_number_raw
 alternate_assessor_parcel_number
 cherre_assessor_parcel_number_formatted
 identifier


In [51]:
import pandas as pd
from itertools import combinations

# Step 1: Select likely identifier columns
candidate_columns = [
    'tax_assessor_id',
    'assessor_parcel_number_raw',
    'cherre_assessor_parcel_number_formatted',
    'identifier',
    'insert_date',
    'data_publish_date',
    'last_update_date',
    'cherre_ingest_datetime'
]

# Step 2: Check all combinations and print duplicate counts
for r in range(1, len(candidate_columns) + 1):
    for combo in combinations(candidate_columns, r):
        combo_list = list(combo)
        dupes = df.duplicated(subset=combo_list).sum()
        print(f"Columns: {combo_list} → Duplicate Rows: {dupes}")


Columns: ['tax_assessor_id'] → Duplicate Rows: 81142
Columns: ['assessor_parcel_number_raw'] → Duplicate Rows: 81155
Columns: ['cherre_assessor_parcel_number_formatted'] → Duplicate Rows: 81165
Columns: ['identifier'] → Duplicate Rows: 95267
Columns: ['insert_date'] → Duplicate Rows: 161410
Columns: ['data_publish_date'] → Duplicate Rows: 161334
Columns: ['last_update_date'] → Duplicate Rows: 160862
Columns: ['cherre_ingest_datetime'] → Duplicate Rows: 160712
Columns: ['tax_assessor_id', 'assessor_parcel_number_raw'] → Duplicate Rows: 81142
Columns: ['tax_assessor_id', 'cherre_assessor_parcel_number_formatted'] → Duplicate Rows: 81142
Columns: ['tax_assessor_id', 'identifier'] → Duplicate Rows: 80712
Columns: ['tax_assessor_id', 'insert_date'] → Duplicate Rows: 81142
Columns: ['tax_assessor_id', 'data_publish_date'] → Duplicate Rows: 81142
Columns: ['tax_assessor_id', 'last_update_date'] → Duplicate Rows: 81142
Columns: ['tax_assessor_id', 'cherre_ingest_datetime'] → Duplicate Rows: 81

In [52]:
# Get all rows that are full duplicates (across all columns)
duplicate_rows = df[df.duplicated(keep=False)]

# Count of full duplicate rows (each duplicate occurrence)
print("\n Total number of fully duplicated rows (including all copies):", len(duplicate_rows))

# Count of unique duplicates (how many unique sets of duplicates)
num_unique_duplicate_sets = df[df.duplicated()].drop_duplicates()
print(" Number of unique duplicate row patterns:", len(num_unique_duplicate_sets))



 Total number of fully duplicated rows (including all copies): 161424
 Number of unique duplicate row patterns: 80712


In [53]:
# Verify if the DataFrame is made entirely of row pairs
print(df.duplicated(keep=False).all())  


True


In [54]:
# Get all fully duplicated rows
duplicate_rows = df[df.duplicated(keep=False)]

# Drop duplicates so you get each unique pattern once
unique_duplicate = duplicate_rows.drop_duplicates()


In [55]:
# Remove fully duplicated rows
df_cleaned = df.drop_duplicates(keep='first')


In [56]:
len(df_cleaned)

80712

In [57]:
# Identify columns where values are unique (could be primary key candidates)
unique_columns = [col for col in df_cleaned.columns if df_cleaned[col].is_unique]

print(" Primary key candidates (single column):")
for col in unique_columns:
    print(f" - {col}")


 Primary key candidates (single column):


In [58]:
from itertools import combinations

def find_combination_keys(df, max_comb=2):
    cols = df.columns.tolist()
    for r in range(2, max_comb + 1):
        for combo in combinations(cols, r):
            if df.duplicated(subset=list(combo)).sum() == 0:
                return combo
    return None

combo_key = find_combination_keys(df_cleaned, max_comb=3)

if combo_key:
    print(f"Combination primary key found: {combo_key}")
else:
    print("No combination of columns up to 3 found to uniquely identify rows.")


Combination primary key found: ('tax_assessor_id', 'identifier')


### Total number of units per situs_state


In [60]:
units_per_state = df_cleaned.groupby('situs_state')['units_count'].sum()
print(units_per_state)

situs_state
AK       5135
AL      52419
AR      36487
AZ     311836
CA     964610
CO      88160
CT      65988
DC      74746
DE       8669
FL     361526
GA     410498
HI        282
IA      14838
ID      13222
IL     118701
IN      51949
KS      40736
KY      25662
LA      76747
MA     131784
MD     199321
ME       5759
MI      70141
MN     135769
MO      84989
MS      43199
MT       6571
NC      99318
ND       3519
NE      30545
NH      11093
NJ      58985
NM      17669
NV     161821
NY     375051
OH     137734
OK      77696
OR      54373
PA     142826
RI      14178
SC     156322
SD       8352
TN     124185
TX    1363016
UT      33862
VA     167219
VT       3152
WA     220168
WI      17570
WV       5639
WY       1181
Name: units_count, dtype: Int64


### Unique tax_assessor_id values

In [62]:
unique_tax_ids = df_cleaned['tax_assessor_id'].nunique()
print("Unique tax_assessor_id values: ", unique_tax_ids) 


Unique tax_assessor_id values:  80282


### Average sale amount by msa_name

In [64]:
avg_sale_by_msa = df_cleaned.groupby('msa_name')['last_sale_amount'].mean()
print(avg_sale_by_msa)


msa_name
Aberdeen, SD                         2.250000e+05
Akron, OH                            3.602346e+06
Alamogordo, NM                       7.821143e+05
Albany-Lebanon, OR                   4.317939e+06
Albany-Schenectady-Troy, NY          4.116525e+06
                                         ...     
Worcester, MA-CT                     1.061141e+07
Yakima, WA                           2.358043e+06
Yankton, SD                                   NaN
York-Hanover, PA                     3.877843e+06
Youngstown-Warren-Boardman, OH-PA    1.352248e+06
Name: last_sale_amount, Length: 434, dtype: float64


### Top 5 situs Countries built before 1980

In [66]:
pre_1980 = df[df['year_built'] < 1980]
top_5_counties = pre_1980['situs_county'].value_counts().head(5)
print(top_5_counties)


situs_county
Los Angeles County    3440
New York County       2460
Cook County           1570
Harris County         1512
Orange County         1324
Name: count, dtype: int64


### Number of properties which had last_sale_date within 12 months

In [68]:
one_year_ago = pd.Timestamp.today() - pd.DateOffset(years=1)
recent_sales = df_cleaned[df_cleaned['last_sale_date'] >= one_year_ago]
print("Properties sold in past 12 months:", len(recent_sales))

Properties sold in past 12 months: 183


### Average building_sq_ft grouped by property_use_code_mapped

In [70]:
avg_sq_ft_by_use = df_cleaned.groupby('property_use_code_mapped')['building_sq_ft'].mean()
print(avg_sq_ft_by_use)


property_use_code_mapped
         86017.293814
10       13551.836066
10.0         0.000000
11       11343.000000
12       76274.784091
            ...      
45.0    196491.925234
46       11073.362069
46.0      9804.250000
47      155917.900285
47.0     82866.594203
Name: building_sq_ft, Length: 73, dtype: float64


## Transformer Tables Task

In [72]:
df_grantee_transf = pd.read_sql("SELECT * FROM cherre_src.transform_nedl_recorder_grantee", engine)
df_grantee_transf.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 402210 entries, 0 to 402209
Data columns (total 32 columns):
 #   Column                         Non-Null Count   Dtype              
---  ------                         --------------   -----              
 0   cherre_ingest_datetime         402210 non-null  datetime64[ns, UTC]
 1   grantee_address                339380 non-null  object             
 2   grantee_care_of_name           46632 non-null   object             
 3   grantee_city                   341750 non-null  object             
 4   grantee_crrt                   325880 non-null  object             
 5   grantee_entity_code            311700 non-null  object             
 6   grantee_entity_count           25734 non-null   float64            
 7   grantee_first_name             87924 non-null   object             
 8   grantee_house_number           320234 non-null  object             
 9   grantee_last_name              88932 non-null   object             
 10  grantee_

In [73]:
df_grantor_transf = pd.read_sql("SELECT * FROM cherre_src.transform_nedl_recorder_grantor", engine)
df_grantor_transf.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 368720 entries, 0 to 368719
Data columns (total 27 columns):
 #   Column                         Non-Null Count   Dtype              
---  ------                         --------------   -----              
 0   cherre_ingest_datetime         207218 non-null  datetime64[ns, UTC]
 1   grantor_address                70884 non-null   object             
 2   grantor_city                   105030 non-null  object             
 3   grantor_crrt                   67698 non-null   object             
 4   grantor_entity_code            260896 non-null  object             
 5   grantor_first_name             105828 non-null  object             
 6   grantor_house_number           63952 non-null   float64            
 7   grantor_last_name              105840 non-null  object             
 8   grantor_middle_name            70104 non-null   object             
 9   grantor_name                   368716 non-null  object             
 10  grantor_

In [74]:
df_mortgage_transf = pd.read_sql("SELECT * FROM cherre_src.transform_nedl_recorder_mortgage", engine)
df_mortgage_transf.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 347548 entries, 0 to 347547
Data columns (total 31 columns):
 #   Column                               Non-Null Count   Dtype              
---  ------                               --------------   -----              
 0   cherre_recorder_mortgage_pk          347548 non-null  int64              
 1   recorder_id                          347548 non-null  int64              
 2   document_number                      286474 non-null  object             
 3   document_recorded_date               347544 non-null  datetime64[ns]     
 4   type_code                            336448 non-null  float64            
 5   amount                               347290 non-null  float64            
 6   lender_name                          347058 non-null  object             
 7   term                                 270622 non-null  float64            
 8   term_type_code                       270632 non-null  object             
 9   mortgage_due_da

In [75]:
df_recorder_transf = pd.read_sql("SELECT * FROM cherre_src.transform_nedl_recorder", engine)
df_recorder_transf.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 362336 entries, 0 to 362335
Data columns (total 43 columns):
 #   Column                            Non-Null Count   Dtype              
---  ------                            --------------   -----              
 0   recorder_id                       362336 non-null  int64              
 1   document_recording_state          362324 non-null  object             
 2   document_recording_county         362336 non-null  object             
 3   document_recording_fips           362336 non-null  int64              
 4   document_type_code                362336 non-null  object             
 5   document_number_formatted         287962 non-null  object             
 6   transfer_multiple_parcel_code     74738 non-null   float64            
 7   arms_length_code                  350894 non-null  float64            
 8   document_amount                   359916 non-null  float64            
 9   owner_relationship_code           281206 non-nul

### 1. Grantor, Grantee and Mortgage Amount per Transaction

In [77]:
# Step 1: Aggregate Grantees
grantee_grouped = df_grantee_transf.groupby(['recorder_id', 'nedl_property_id_pk']).agg({
    'grantee_name': lambda x: ', '.join(x.dropna().unique()),
    'identifier': lambda x: ', '.join(x.dropna().unique())
}).reset_index().rename(columns={
    'grantee_name': 'grantee_names',
    'identifier': 'grantee_identifiers'
})

# Step 2: Aggregate Grantors
grantor_grouped = df_grantor_transf.groupby(['recorder_id', 'nedl_property_id_pk']).agg({
    'grantor_name': lambda x: ', '.join(x.dropna().unique()),
    'identifier': lambda x: ', '.join(x.dropna().unique())
}).reset_index().rename(columns={
    'grantor_name': 'grantor_names',
    'identifier': 'grantor_identifiers'
})

# Step 3: Merge Grantor & Grantee info
df_transfer = pd.merge(grantee_grouped, grantor_grouped, on=['recorder_id', 'nedl_property_id_pk'], how='inner')

# Step 4: Add Mortgage Amount from Mortgage Table
df_transfer = pd.merge(df_transfer, df_mortgage_transf[['recorder_id', 'amount']], on='recorder_id', how='left')
df_transfer.rename(columns={'amount': 'mortgage_amount'}, inplace=True)


In [78]:
df_transfer.head()

Unnamed: 0,recorder_id,nedl_property_id_pk,grantee_names,grantee_identifiers,grantor_names,grantor_identifiers,mortgage_amount
0,14000077268,80214,CRESTVIEW SENIOR COTTAGES LTD,Crestview Senior Cottages,EAGLE AL 1 SPE LLC,Crestview Senior Cottages,
1,14000085508,22807,"STATE STREET BANK & TRUST COMPANY, SERIES 1991...",Virginia Circle,BENEFIC,Virginia Circle,
2,14000085512,22807,ADJ HOLDINGS LLC,Virginia Circle,DE JONG ARIE JR TR,Virginia Circle,
3,14000085520,22807,"JOSE M GONZALEZ, MERCEDES C GONZALEZ",Virginia Circle,ADJ HOLDINGS LLC,Virginia Circle,750000.0
4,14000085520,22807,"JOSE M GONZALEZ, MERCEDES C GONZALEZ",Virginia Circle,ADJ HOLDINGS LLC,Virginia Circle,750000.0


### 2. Property Transfers with Conflicting Owner identity

In [80]:
# Merge grantor & grantee full info by recorder_id and nedl_property_id_pk
df_merged_identities = pd.merge(
    df_grantee_transf[['recorder_id', 'nedl_property_id_pk', 'grantee_name', 'identifier']],
    df_grantor_transf[['recorder_id', 'nedl_property_id_pk', 'grantor_name', 'identifier']],
    on=['recorder_id', 'nedl_property_id_pk'],
    how='inner',
    suffixes=('_grantee', '_grantor')
)

# Filter where names match but identifiers differ
conflicting_identity = df_merged_identities[
    (df_merged_identities['grantee_name'] == df_merged_identities['grantor_name']) &
    (df_merged_identities['identifier_grantee'] != df_merged_identities['identifier_grantor'])
]

# Optional: drop duplicates
conflicting_identity = conflicting_identity.drop_duplicates()


In [81]:
df_merged_identities.head()

Unnamed: 0,recorder_id,nedl_property_id_pk,grantee_name,identifier_grantee,grantor_name,identifier_grantor
0,14520132976,53957,PECO CUSTOM HOMES LLC,12th & River Senior,PECO CUSTOM HOMES LLC,12th & River Senior
1,14520132976,53957,PECO CUSTOM HOMES LLC,12th & River Senior,PECO CUSTOM HOMES LLC,12th & River Senior
2,14520264596,53957,PECO CUSTOM HOMES LLC,12th & River Senior,PECO CUSTOM HOMES LLC,12th & River Senior
3,14520264596,53957,PECO CUSTOM HOMES LLC,12th & River Senior,PECO CUSTOM HOMES LLC,12th & River Senior
4,14757505256,62241,L & M CONSTRUCTION LLP,2030 Trail Street,JOHN W JOHNSON,2030 Trail Street


### 3. Top 5 companies handling high value transfers

In [83]:
# Step 1: Filter high-value transfers
high_value = df_recorder_transf[
    (df_recorder_transf['document_amount'] > 500000) &
    (df_recorder_transf['transfer_purchase_loan_to_value'] > 50)
]

# Step 2: Remove placeholder title companies (optional but recommended)
valid_high_value = high_value[
    ~high_value['title_company_name'].isin([
        'NONE AVAILABLE', 
        'NONE LISTED ON DOCUMENT', 
        'ATTORNEY'
    ])
]

# Step 3: Count and get top 5 title companies
top_title_companies = (
    valid_high_value['title_company_name']
    .value_counts()
    .head(5)
    .reset_index()
)
top_title_companies.columns = ['title_company_name', 'high_value_transfer_count']


In [84]:
top_title_companies.head()

Unnamed: 0,title_company_name,high_value_transfer_count
0,CHICAGO TITLE,182
1,FIRST AMERICAN TITLE INS CO,182
2,FIDELITY NATIONAL TITLE,140
3,FIRST AMERICAN TITLE,138
4,STEWART TITLE,116


### 4. Property Transfer Chain Tracker

In [86]:
# Step 1: Count number of transfers per unique property (grouped by identifier + nedl_property_id + nedl_property_id_pk)
property_transfer_tracker = df_grantee_transf.groupby(
    ['recorder_id', 'identifier', 'nedl_property_id', 'nedl_property_id_pk']
).size().reset_index(name='total_transfers')

# Step 2: Sort and get top 10 properties with the most transfers
final_df = property_transfer_tracker.sort_values(
    by='total_transfers', ascending=False
).head(10)

# Display final DataFrame
final_df


Unnamed: 0,recorder_id,identifier,nedl_property_id,nedl_property_id_pk,total_transfers
171563,147843325712,Greeley Manor,Greeley Manor 80631,81030,8
151380,147426977188,South City,South City 29486,43131,8
19816,14136437836,Golden Meadows,Golden Meadows 93630,21686,8
156026,147531857880,"Rio, The","Rio, The 78541",50442,8
169934,147776167500,Boulder Creek,Boulder Creek 29210,72171,8
156029,147532154204,Henson Creek,Henson Creek 20748,56911,8
156799,147544274096,Cottage Grove,Cottage Grove 23605,68572,8
152189,147446469500,"Linq, The","Linq, The 85224",6356,8
168397,147738171548,Pointe Townhomes,Pointe Townhomes 95825,25569,8
159017,147570388692,Northwood,Northwood 95825,26056,8


## Recorder Table Aggregation

In [88]:
def aggregate_recorder_data(df_recorder_transf):
    """Aggregate recorder data at recorder_id level."""
    
    # Custom aggregators
    def unique_list(series):
        return list(set(series.dropna()))  # unique non-null values

    def max_flag(series):
        return series.max()  # works for 0/1 or boolean flags

    # Aggregate by recorder_id
    agg_recorder = (
        df_recorder_transf
        .groupby("recorder_id")
        .agg(
            recorder_event_count=('recorder_id', pd.Series.nunique),  # count of unique recorder events
            first_record_date=('document_recorded_date', 'min'),
            last_record_date=('document_recorded_date', 'max'),
            total_document_amount=('document_amount', 'sum'),
            total_transfer_tax=('transfer_tax_amount', 'sum'),
            document_type_list=('document_type_code', unique_list),
            arms_length_code_list=('arms_length_code', unique_list),
            has_resale=('resale_flag', max_flag),
            is_new_construction=('new_construction_flag', max_flag),
            is_reo_sale=('reo_sale_flag', max_flag),
            is_inter_family=('inter_family_flag', max_flag)
        )
        .reset_index()
    )

    return agg_recorder


In [89]:
agg_recorder = aggregate_recorder_data(df_recorder_transf)

print(agg_recorder.head())


   recorder_id  recorder_event_count first_record_date last_record_date  \
0  14000077268                     1        2010-10-15       2010-10-15   
1  14000085508                     1        1995-10-20       1995-10-20   
2  14000085512                     1        2000-05-16       2000-05-16   
3  14000085520                     1        2001-06-19       2001-06-19   
4  14000085524                     1        1995-08-11       1995-08-11   

   total_document_amount  total_transfer_tax document_type_list  \
0                    0.0                 0.0               [55]   
1              2602098.0                 0.0               [69]   
2                    0.0                 0.0               [36]   
3              2710000.0              2980.0               [39]   
4              2602098.0                 0.0               [69]   

  arms_length_code_list has_resale is_new_construction is_reo_sale  \
0                 [0.0]        NaN                 NaN         NaN   
1     

## Recorder Mortgage Table Aggregation

In [91]:
def aggregate_mortgage_data(df_mortgage_transf):
    """Aggregate mortgage data at recorder_id level."""
    
    # Custom aggregator: sorted unique values
    def sorted_unique_list(series):
        return sorted(series.dropna().unique().tolist())

    # Aggregate by recorder_id
    agg_mortgage = (
        df_mortgage_transf
        .groupby("recorder_id")
        .agg(
            mortgage_record_count=("recorder_id", "count"),
            total_mortgage_amount=("amount", "sum"),
            max_interest_rate=("interest_rate", "max"),
            max_term=("term", "max"),
            mortgage_type_code_list=("type_code", sorted_unique_list),
            interest_rate_type_list=("interest_rate_type_code", sorted_unique_list),
        )
        .reset_index()
    )

    return agg_mortgage


In [92]:
agg_mortgage = aggregate_mortgage_data(df_mortgage_transf)
print(agg_mortgage.head())


   recorder_id  mortgage_record_count  total_mortgage_amount  \
0  14000085520                      2              1500000.0   
1  14000085528                      2               900000.0   
2  14000175928                      2              8000000.0   
3  14000238948                      2              1612064.0   
4  14000247224                      2              4200000.0   

   max_interest_rate  max_term mortgage_type_code_list interest_rate_type_list  
0               7.14     360.0                  [21.0]                      []  
1               8.30     360.0                  [21.0]                      []  
2               4.02     360.0                  [21.0]                      []  
3               4.77     360.0                  [29.0]                      []  
4               6.98     180.0                  [21.0]                      []  


## Recorder Grantor Table Aggregation

In [94]:

def aggregate_grantor_data(df_grantor_transf):
    """Aggregate grantor data at recorder_id level."""
    
    def sorted_unique_list(series):
        return sorted(series.dropna().unique().tolist())

    agg_grantor = (
        df_grantor_transf
        .groupby("recorder_id")
        .agg(
            grantor_record_count=("recorder_id", "count"),
            grantor_names=("grantor_name", sorted_unique_list),
            grantor_entity_code_list=("grantor_entity_code", sorted_unique_list)
        )
        .reset_index()
    )

    return agg_grantor


In [95]:
agg_grantor = aggregate_grantor_data(df_grantor_transf)
print(agg_grantor.head())


   recorder_id  grantor_record_count                grantor_names  \
0  14000077268                     2         [EAGLE AL 1 SPE LLC]   
1  14000085508                     2                    [BENEFIC]   
2  14000085512                     2         [DE JONG ARIE JR TR]   
3  14000085520                     2           [ADJ HOLDINGS LLC]   
4  14000085524                     4  [ALAN GRINNELL, FEELIE LEE]   

  grantor_entity_code_list  
0                      [Y]  
1                       []  
2                      [Y]  
3                      [Y]  
4                       []  


## Recorder Grantee Table Aggregation

In [97]:

def aggregate_grantee_data(df_grantee_transf):
    """Aggregate grantee data at recorder_id level."""
    
    def sorted_unique_list(series):
        return sorted(series.dropna().unique().tolist())

    agg_grantee = (
        df_grantee_transf
        .groupby("recorder_id")
        .agg(
            grantee_record_count=("recorder_id", "count"),
            grantee_names=("grantee_name", sorted_unique_list),
            grantee_owner_type_list=("grantee_owner_type", sorted_unique_list),
            grantee_entity_code_list=("grantee_entity_code", sorted_unique_list)
        )
        .reset_index()
    )

    return agg_grantee


In [98]:
agg_grantee = aggregate_grantee_data(df_grantee_transf)
print(agg_grantee.head())


   recorder_id  grantee_record_count  \
0  14000077268                     2   
1  14000085508                     4   
2  14000085512                     2   
3  14000085520                     4   
4  14000085524                     4   

                                       grantee_names grantee_owner_type_list  \
0                    [CRESTVIEW SENIOR COTTAGES LTD]                      []   
1  [SERIES 1991-M6 RTC MULTIFAMILY MTG PASS, STAT...                    [MI]   
2                                 [ADJ HOLDINGS LLC]                      []   
3             [JOSE M GONZALEZ, MERCEDES C GONZALEZ]                    [TR]   
4  [SERIES 1991-M2 RTC MULTIFAMILY MTG PASS, STAT...                    [MI]   

  grantee_entity_code_list  
0                      [Y]  
1                      [Y]  
2                      [Y]  
3                       []  
4                      [Y]  


## Derived Flags (Post-Aggregation)

In [100]:

def process_recorder_data(df_recorder_transf, agg_mortgage, agg_grantor, agg_grantee):
    """Merge aggregated datasets and create yes/no flag columns."""
    
    # Work on a copy to avoid modifying the original dataframe
    df_final = df_recorder_transf.copy()
    
    # Merge aggregated datasets on recorder_id
    for agg_df in [agg_mortgage, agg_grantor, agg_grantee]:
        df_final = df_final.merge(agg_df, on="recorder_id", how="left")

    # Replace missing counts with 0 and ensure integer type
    count_cols = ["mortgage_record_count", "grantor_record_count", "grantee_record_count"]
    df_final[count_cols] = df_final[count_cols].fillna(0).astype(int)

    # Helper for yes/no flags
    def yes_no_flag(condition):
        return np.where(condition, "yes", "no")

    # Create flag columns
    df_final["has_mortgage"] = yes_no_flag(df_final["mortgage_record_count"] > 0)
    df_final["has_grantee"] = yes_no_flag(df_final["grantee_record_count"] > 0)
    df_final["has_grantor"] = yes_no_flag(df_final["grantor_record_count"] > 0)
    df_final["is_new_construction_flag"] = yes_no_flag(df_final["new_construction_flag"] == 1)

    return df_final


In [101]:
# Process the data
df_final = process_recorder_data(df_recorder_transf, agg_mortgage, agg_grantor, agg_grantee)

# Select only flag-related columns
df_flags = df_final[[
    "recorder_id",
    "has_mortgage",
    "has_grantee",
    "has_grantor",
    "is_new_construction_flag"
]]

print(df_flags.head())


   recorder_id has_mortgage has_grantee has_grantor is_new_construction_flag
0  14127684356           no         yes         yes                       no
1  14127684388           no         yes         yes                       no
2  14127684368           no         yes         yes                       no
3  14127684384           no         yes         yes                       no
4  14127684372           no         yes         yes                       no
