# Description


Second EDA for catbase database.

Focuses on analyzing data during data processing.


# Start


In [None]:
import os
from pathlib import Path
from typing import Dict

import numpy as np
import pandas as pd
import plotly.express as px
from dash import Dash, dcc, html

from src.utils import load_data_config

## Definitons


In [None]:
data_paths, column_types = load_data_config()
data_path_initial = Path(data_paths["initial"])
data_path_helper = Path(data_paths["helper"])
data_path_processed = Path(data_paths["processed"])

# EDA


## Before data processing


### Missing values


In [None]:
def show_missing_values():
    all_cats_df = pd.read_csv(f"{data_path_initial}/all_cats.csv", dtype=column_types, low_memory=False)

    missing_values = all_cats_df.isnull().mean() * 100
    missing_values = missing_values.sort_values(ascending=False)

    app = Dash(__name__)

    fig = px.bar(
        x=missing_values.index,
        y=missing_values.values,
        title="Percentage of Missing Values by Column",
        labels={"x": "Columns", "y": "Missing Values (%)"},
    )

    fig.update_traces(
        text=missing_values.values.round(0),
    )

    app.layout = html.Div([dcc.Graph(id="missing-values-graph", figure=fig)])

    app.run_server(debug=True)


show_missing_values()

### Same country


In [None]:
def same_country(row: pd.Series) -> int:
    """
    Check if the country of origin is the same as the current country.

    :param row: A row from the DataFrame.
    :returns: 1 if the countries are same, 0 otherwise.
    """
    if row["country_origin"] == row["country_current"]:
        return 1
    if pd.isna(row["country_origin"]) and pd.isna(row["country_current"]):
        return 1
    return 0

In [None]:
def show_country_check():
    all_cats_df = pd.read_csv(
        f"{data_path_initial}/all_cats.csv",
        usecols=["country_origin", "country_current"],
        dtype=column_types,
        low_memory=False,
    )

    all_cats_df["country_check"] = all_cats_df.apply(same_country, axis=1)

    value_counts = all_cats_df["country_check"].value_counts()
    percentages = all_cats_df["country_check"].value_counts(normalize=True) * 100

    print("\nCountry Origin vs Current Country Analysis:")
    print("-" * 45)
    for value in value_counts.index:
        count = value_counts[value]
        percentage = percentages[value]
        status = "Same country" if value else "Different country"
        print(f"{status}: {count:,} cats ({percentage:.1f}%)")


show_country_check()

## During data processing phase 3


### Countries


In [None]:
all_cats_df = pd.read_csv(
    f"{data_path_processed}/all_cats_first_fixes.csv",
    usecols=["cat_id", "country_origin", "country_current", "country_origin_name", "country_current_name"],
    dtype=column_types,
    low_memory=False,
)

In [None]:
origin_mismatch = all_cats_df[
    (all_cats_df["country_origin_name"] == "unknown")
    & (all_cats_df["country_origin"] != "unknown")
    & (~all_cats_df["country_origin"].isna())
]

current_mismatch = all_cats_df[
    (all_cats_df["country_current_name"] == "unknown")
    & (all_cats_df["country_current"] != "unknown")
    & (~all_cats_df["country_current"].isna())
]

print("Country Origin codes with 'unknown' country names:")
if len(origin_mismatch) > 0:
    print(origin_mismatch["country_origin"].value_counts())
else:
    print("None found")

print("\nCountry Current codes with 'unknown' country names:")
if len(current_mismatch) > 0:
    print(current_mismatch["country_current"].value_counts())
else:
    print("None found")

if len(origin_mismatch) > 0:
    origin_percentage = len(origin_mismatch) / len(all_cats_df) * 100
    print(
        f"\nPercentage of rows with 'unknown' country_origin_name but valid country_origin: {origin_percentage:.2f}%"
    )

if len(current_mismatch) > 0:
    current_percentage = len(current_mismatch) / len(all_cats_df) * 100
    print(
        f"Percentage of rows with 'unknown' country_current_name but valid country_current: {current_percentage:.2f}%"
    )

unique_origin_mismatch = origin_mismatch["country_origin"].unique()
unique_current_mismatch = current_mismatch["country_current"].unique()

print("\nUnique country_origin codes with 'unknown' country names:")
if len(unique_origin_mismatch) > 0:
    print(unique_origin_mismatch)

print("\nUnique country_current codes with 'unknown' country names:")
if len(unique_current_mismatch) > 0:
    print(unique_current_mismatch)

### Registration_number_current


In [None]:
all_cats_df = pd.read_csv(f"{data_path_processed}/all_cats_first_fixes.csv", dtype=column_types, low_memory=False)
all_cats_df["src_db"].unique().tolist()

Split data based on "src_db" column


In [None]:
def create_db_dataframes(df: pd.DataFrame) -> Dict[str, pd.DataFrame]:
    """
    Create separate DataFrames for each src_db in the DataFrame.

    :param df: DataFrame containing the data.
    :returns: Dictionary where keys are src_db names and values are DataFrames.
    """
    src_db_list = [
        "Finland",
        "German Database",
        "Norway",
        "pawpeds",
        "DB Polska",
        "sibcat",
        "FFS_SK",
        "SVERAK",
        "TOP-CAT",
        "unknown",
    ]

    src_db_frames = {}

    for db in src_db_list:
        filtered_df = df[df["src_db"] == db][["registration_number_current"]]

        filtered_df = filtered_df.drop_duplicates(subset=["registration_number_current"])
        filtered_df = filtered_df.sort_values(by=["registration_number_current"])
        filtered_df = filtered_df.reset_index(drop=True)

        src_db_frames[db] = filtered_df

        print(f"{db}: {len(filtered_df)} records")

    return src_db_frames


src_db_frames = create_db_dataframes(all_cats_df)

In [None]:
def output_dataframes(src_db_frames: Dict[str, pd.DataFrame], output_path="../../output/reg_num_initial") -> None:
    """
    Output the dataframes to CSV files.

    :param src_db_frames: Dictionary of DataFrames to output.
    """
    os.makedirs("../../output", exist_ok=True)
    os.makedirs(output_path, exist_ok=True)

    for db_name, df in src_db_frames.items():
        if len(df) == 1 and df["registration_number_current"].iloc[0] == "unknown":
            continue

        filename = f"{output_path}/{db_name.replace(' ', '_').replace('-', '_')}.csv"
        df.to_csv(filename, index=False)


output_dataframes(src_db_frames)

Export only "registration_number_current" that seems to resemble actual words


In [None]:
def export_only_words(df: pd.DataFrame) -> pd.DataFrame:
    """
    Export only registration_numbers_current that could mean some words.

    :param df: DataFrame to filter.
    :returns: DataFrame with only words.
    """

    output_path = "../../output/reg_num_initial/only_words.csv"

    os.makedirs("../../output", exist_ok=True)
    os.makedirs(os.path.dirname(output_path), exist_ok=True)

    filtered_df = df[~df["registration_number_current"].str.isnumeric()]
    filtered_df = filtered_df[~filtered_df["registration_number_current"].str.contains(r"[\/\-\.\_\(\)№]")]
    filtered_df.to_csv(output_path, index=False)


export_only_words(src_db_frames["TOP-CAT"])

In [None]:
def export_reg_num_with_parentheses(df: pd.DataFrame) -> None:
    """
    Export all registration_number_current that have parentheses.

    :param df: DataFrame to filter.
    """

    output_path = "../../output/reg_num_initial/registration_numbers_with_parentheses.csv"

    os.makedirs("../../output", exist_ok=True)
    os.makedirs(os.path.dirname(output_path), exist_ok=True)

    filtered_df = df[df["registration_number_current"].str.contains(r"[\(\)]", na=False)]
    filtered_df = filtered_df[["registration_number_current"]].drop_duplicates()
    filtered_df = filtered_df.sort_values(by="registration_number_current").reset_index(drop=True)
    filtered_df.to_csv(output_path, index=False)


export_reg_num_with_parentheses(all_cats_df)

In [None]:
fixed_cats = pd.read_csv(
    f"{data_path_processed}/all_cats_done.csv",
    dtype=column_types,
    low_memory=False,
)

src_db_frames = create_db_dataframes(fixed_cats)
output_dataframes(src_db_frames, output_path="../../output/reg_num_fixes")

In [None]:
unknown_count = all_cats_df["registration_number_current"].value_counts().get("unknown", 0)
print(f"Number of 'unknown' values in 'registration_number_current': {unknown_count}")

unknown_count_fixed = fixed_cats["registration_number_current"].value_counts().get("unknown", 0)
print(f"Number of 'unknown' values in 'registration_number_current' after fixing: {unknown_count_fixed}")

### Correct mother/father names


In [None]:
all_cats_df = pd.read_csv(
    f"{data_path_initial}/all_cats.csv",
    usecols=["id", "father_id", "mother_id"],
    dtype=column_types,
    low_memory=False,
)

all_cats_df["father_check"] = (all_cats_df["father_id"] == all_cats_df["id"]).fillna(False)
all_cats_df["mother_check"] = (all_cats_df["mother_id"] == all_cats_df["id"]).fillna(False)

father_check_counts = all_cats_df["father_check"].value_counts()
mother_check_counts = all_cats_df["mother_check"].value_counts()

print("\nFather and Mother Check Analysis:")
for value in father_check_counts.index:
    count = father_check_counts[value]
    status = "Invalid" if value else "Valid"
    print(f"Father check: {count:,} cats ({status})")

### Unknown cat names


In [None]:
all_cats_df = pd.read_csv(
    f"{data_path_initial}/all_cats.csv",
    usecols=["id", "name"],
    dtype=column_types,
    low_memory=False,
)

all_cats_df["name"] = all_cats_df["name"].fillna("Unknown cat name")
all_cats_df["name_prefix"] = all_cats_df["name"].str.split(" ").str[0]

In [None]:
unique_prefixes = sorted(all_cats_df["name_prefix"].unique())

short_prefixes = [prefix for prefix in unique_prefixes if len(prefix) < 5]

output_dir = "../../output"
os.makedirs(output_dir, exist_ok=True)
output_path = os.path.join(output_dir, "unique_name_prefixes.txt")

try:
    with open(output_path, "w", encoding="utf-8") as f:
        for prefix in short_prefixes:
            f.write(prefix + "\n")
except Exception as e:
    print(f"Error writing to file: {e}")

In [None]:
"""
Prefix	    Language
"of"	    English
"de"	    French/Spanish/Portuguese
"van"	    Dutch
"von"	    German
"z"	        Polish/Czech
"del"	    Spanish/Italian
"di"	    Italian
"od"	    Croatian/Serbian/Bosnian
"iz" (из)	Russian
"aus"	    German
"el"	    Arabic/Spanish
"al"	    Arabic
"ot" (от)	Russian
"""

prefix_list = ["of", "de", "van", "von", "z", "del", "di", "od", "iz", "el", "al", "ot", "auf", "aus", "из", "от"]
prefix_counts = all_cats_df["name_prefix"].value_counts()
prefix_counts = prefix_counts[prefix_counts.index.isin(prefix_list)]
prefix_counts = prefix_counts.reset_index()

prefix_counts

In [None]:
print("Prefixes in dataset:")
for _, row in prefix_counts.iterrows():
    print(row["name_prefix"])

print("-" * 45)
print("Prefixes not in dataset:")
for prefix in prefix_list:
    if prefix not in prefix_counts["name_prefix"].values:
        print(prefix)

## After phase 3


### Missing values


Missing values (NA, NaN, None).


In [None]:
def show_missing_values():
    all_cats_df = pd.read_csv(
        f"{data_path_processed}/all_cats_first_fixes.csv", dtype=column_types, low_memory=False
    )

    missing_values = all_cats_df.isnull().mean() * 100
    missing_values = missing_values.sort_values(ascending=False)

    app = Dash(__name__)

    fig = px.bar(
        x=missing_values.index,
        y=missing_values.values,
        title="Percentage of Missing Values by Column",
        labels={"x": "Columns", "y": "Missing Values (%)"},
    )

    fig.update_traces(
        text=missing_values.values.round(0),
    )

    app.layout = html.Div([dcc.Graph(id="missing-values-graph", figure=fig)])
    app.run_server(debug=True)


show_missing_values()

Missing values (unknown, -1, ...)


In [None]:
def show_missing_values():
    all_cats_df = pd.read_csv(
        f"{data_path_processed}/all_cats_first_fixes.csv", dtype=column_types, low_memory=False
    )

    date_placeholder = "1111-11-11 00:00:00"
    unknown_placeholder = "unknown"
    name_placeholder = "Unknown cat name"
    number_placeholder = -1

    date_columns = ["date_of_birth"]
    unknown_columns = [
        "gender",
        "registration_number_current",
        "src_db",
        "cattery",
        "breed_code",
        "extracted_breed",
        "colour_code",
        "colour_definition",
        "full_breed_name",
        "group",
        "category",
        "country_origin",
        "country_current",
        "country_origin_name",
        "country_current_name",
        "title_after",
        "title_before",
        "chip",
    ]
    name_columns = ["name"]
    number_columns = ["father_id", "mother_id"]

    missing_percentages = {}

    for col in date_columns:
        if col in all_cats_df.columns:
            missing_percentages[col] = (all_cats_df[col] == date_placeholder).mean() * 100

    for col in unknown_columns:
        if col in all_cats_df.columns:
            missing_percentages[col] = (all_cats_df[col] == unknown_placeholder).mean() * 100

    for col in name_columns:
        if col in all_cats_df.columns:
            missing_percentages[col] = (all_cats_df[col] == name_placeholder).mean() * 100

    for col in number_columns:
        if col in all_cats_df.columns:
            missing_percentages[col] = (all_cats_df[col] == number_placeholder).mean() * 100

    missing_df = pd.DataFrame(
        {
            "column": list(missing_percentages.keys()),
            "percent_missing": list(missing_percentages.values()),
        }
    )

    missing_df = missing_df[missing_df["percent_missing"] > 0]

    missing_df = missing_df.sort_values(by="percent_missing", ascending=False)

    app = Dash(__name__)

    fig = px.bar(
        missing_df,
        x="column",
        y="percent_missing",
        title="Missing Values by Column",
        labels={
            "column": "Columns",
            "percent_missing": "Missing Values (%)",
        },
        hover_data=["percent_missing"],
    )

    fig.update_layout(xaxis_tickangle=-45, height=600)
    fig.update_traces(texttemplate="%{y:.1f}%", textposition="outside")
    app.layout = html.Div(
        [
            dcc.Graph(id="missing-values-graph", figure=fig),
        ]
    )

    app.run_server(debug=True)


show_missing_values()

## During phase 5


In [None]:
def display_country_code_mappings() -> pd.DataFrame:
    """
    Display a mapping between country names and all their associated codes
    found in the dataset.
    """
    df = pd.read_csv(
        f"{data_path_processed}/all_cats_done.csv",
        usecols=["country_origin_name", "country_origin", "country_current_name", "country_current"],
        dtype=column_types,
        low_memory=False,
    )

    name_to_codes = {}

    for name, code in zip(df["country_origin_name"], df["country_origin"]):
        if name != "unknown" and code != "unknown":
            if name not in name_to_codes:
                name_to_codes[name] = set()
            name_to_codes[name].add(code)

    for name, code in zip(df["country_current_name"], df["country_current"]):
        if name != "unknown" and code != "unknown":
            if name not in name_to_codes:
                name_to_codes[name] = set()
            name_to_codes[name].add(code)

    result_data = []
    for name in sorted(name_to_codes.keys()):
        result_data.append({"country_name": name, "country_codes": ", ".join(sorted(name_to_codes[name]))})

    return pd.DataFrame(result_data)


display_country_code_mappings()

## Later


In [None]:
def analyze_id_gaps():
    """
    Analyze if there are gaps in the 'id' column sequence.
    Reports statistics about gaps and visualizes them if present.
    """
    print("Loading data...")
    all_cats_df = pd.read_csv(
        f"{data_path_initial}/all_cats.csv", usecols=["id"], dtype=column_types, low_memory=False
    )

    all_cats_df = all_cats_df.sort_values(by="id").drop_duplicates()

    ids = all_cats_df["id"].unique()
    ids.sort()

    expected_ids = np.arange(ids.min(), ids.max() + 1)
    missing_ids = np.setdiff1d(expected_ids, ids)

    print("\nID Range Analysis:")
    print(f"Min ID: {ids.min():,}")
    print(f"Max ID: {ids.max():,}")
    print(f"Total unique IDs: {len(ids):,}")
    print(f"Expected IDs in range: {len(expected_ids):,}")

    if len(missing_ids) > 0:
        gap_percentage = (len(missing_ids) / len(expected_ids)) * 100
        print("\nGap Analysis:")
        print(f"Number of gaps: {len(missing_ids):,} ({gap_percentage:.2f}% of ID range)")
    else:
        print("\nNo gaps found in the ID sequence!")


analyze_id_gaps()