In [3]:
import pandas as pd
import numpy as np
import os
import shutil # <-- Import the shutil module for file operations
from typing import Dict, Any, List

# --- Helper Functions for File I/O (Kept the same) ---

def _read_file(file_path: str) -> pd.DataFrame | None:
    """Reads a DataFrame from a CSV or XLSX file."""
    extension = os.path.splitext(file_path)[1].lower()
    
    try:
        if extension == '.csv':
            df = pd.read_csv(file_path, sep=',')
        elif extension in ('.xlsx', '.xls'):
            df = pd.read_excel(file_path)
        else:
            print(f"Error: Unsupported file type: {extension}. Must be CSV or XLSX.")
            return None

        df.columns = df.columns.str.strip()
        return df

    except FileNotFoundError:
        print(f"Error: Input file '{file_path}' not found.")
        return None
    except Exception as e:
        print(f"Error loading file '{file_path}': {e}")
        return None

def _write_file(df: pd.DataFrame, file_path: str):
    """Writes a DataFrame to a CSV or XLSX file."""
    extension = os.path.splitext(file_path)[1].lower()
    
    if extension == '.csv':
        df.to_csv(file_path, index=False, sep=',')
    elif extension in ('.xlsx', '.xls'):
        df.to_excel(file_path, index=False, sheet_name='FamilyData')
    else:
        raise ValueError(f"Unsupported output file type: {extension}")


# --------------------------------------------------------------------------------
# fill_family_relationships (Kept the same)
# --------------------------------------------------------------------------------

def fill_family_relationships(input_file: str, output_file: str) -> str | None:
    """
    Reads family data, calculates ChildID and SiblingID based on parent columns,
    fills in missing values in the existing 'ChildID' and 'SiblingID' columns,
    and saves the resulting DataFrame.
    """
    # NOTE: The output file here will be the temporary file where the relationship
    # data is saved before generation calculation.
    print(f"--- Step 1: Filling SiblingID and ChildID and saving to '{output_file}' ---")
    
    df = _read_file(input_file)
    if df is None:
        return None

    id_cols_calc = ['PersonID', 'PartnerID', 'FatherID', 'MotherID']
    for col in id_cols_calc:
        df[col] = pd.to_numeric(df[col], errors='coerce').astype('Int64')
        
    id_cols_fill = ['SiblingID', 'ChildID']
    for col in id_cols_fill:
        if col not in df.columns:
            df[col] = ''
        df[col] = df[col].astype(str).str.strip().replace({None: np.nan, 'nan': np.nan, '': np.nan})


    # --- 1. Calculate and Fill Child IDs ---
    children_map = {}
    for _, row in df.iterrows():
        child_id = row['PersonID']
        father_id = row['FatherID']
        mother_id = row['MotherID']

        if pd.notna(father_id):
            children_map.setdefault(int(father_id), []).append(child_id)
        
        if pd.notna(mother_id):
            children_map.setdefault(int(mother_id), []).append(child_id)

    calculated_children_series = df['PersonID'].map(children_map).fillna('').apply(
        lambda x: ', '.join(map(str, x)) if isinstance(x, list) else ''
    )
    
    df['ChildID'] = df['ChildID'].fillna(
        calculated_children_series.replace('', np.nan)
    ).fillna('')


    # --- 2. Calculate and Fill Sibling IDs (Full Siblings only) ---
    siblings_map = {}
    sibling_groups = df.dropna(subset=['FatherID', 'MotherID']).groupby(['FatherID', 'MotherID'])

    for _, group in sibling_groups:
        group_ids = group['PersonID'].tolist()
        
        if len(group_ids) > 1:
            for person_id in group_ids:
                siblings = [str(int(id)) for id in group_ids if id != person_id]
                siblings_map[int(person_id)] = ', '.join(siblings)

    calculated_siblings_series = df['PersonID'].map(siblings_map).fillna('')
    
    df['SiblingID'] = df['SiblingID'].fillna(
        calculated_siblings_series.replace('', np.nan)
    ).fillna('')

    try:
        _write_file(df, output_file)
        print(f"Successfully created intermediate file '{output_file}'.")
        return output_file
    except Exception as e:
        print(f"Error saving file '{output_file}': {e}")
        return None

# --------------------------------------------------------------------------------

def _get_generation_from_relationships(row: pd.Series, df_original: pd.DataFrame, gen_map: Dict[int, int]) -> int | None:
    """Calculates a person's generation based on their known relatives."""
    person_id = row['PersonID']
    calculated_gens = set()
    
    # 1. Parents: Child Gen = Parent Gen + 1
    for parent_id in [row['FatherID'], row['MotherID']]:
        if pd.notna(parent_id) and int(parent_id) in gen_map:
            calculated_gens.add(gen_map[int(parent_id)] + 1)
            
    # Helper to parse string ID lists
    def _parse_id_list(id_str: str) -> List[int]:
        s = str(id_str).replace('[', '').replace(']', '').replace("'", '').replace('"', '')
        return [int(x.strip()) for x in s.split(',') if x.strip() and x.strip() != 'nan']

    # 2. Siblings: Sibling Gen = Sibling Gen (Same)
    sibling_ids_str = str(df_original.loc[df_original['PersonID'] == person_id, 'SiblingID'].iloc[0])
    for sibling_id in _parse_id_list(sibling_ids_str):
        if sibling_id in gen_map:
            calculated_gens.add(gen_map[sibling_id])
                
    # 3. Children: Parent Gen = Child Gen - 1
    child_ids_str = str(df_original.loc[df_original['PersonID'] == person_id, 'ChildID'].iloc[0])
    for child_id in _parse_id_list(child_ids_str):
        if child_id in gen_map:
            calculated_gens.add(gen_map[child_id] - 1)
                
    # 4. Partner: Partner Gen = Self Gen (Same)
    partner_id = row['PartnerID']
    if pd.notna(partner_id) and int(partner_id) in gen_map:
        calculated_gens.add(gen_map[int(partner_id)])

    
    if not calculated_gens:
        return None
        
    # Check for conflicts
    if len(calculated_gens) > 1:
        print(f"--- CONFLICT DETECTED ---")
        print(f"PersonID: {person_id}")
        print(f"Conflicting calculated generations: {list(calculated_gens)}")
        return None 

    return list(calculated_gens)[0]

# --------------------------------------------------------------------------------

def _perform_consistency_checks(df: pd.DataFrame) -> bool:
    """
    Performs a final set of checks on all relationship and generation columns for self-consistency.
    
    Returns: True if all checks pass, False if any inconsistency is found.
    """
    
    # Use a list to store error messages, which will be printed in CAPITAL LETTERS if inconsistent
    error_messages = []
    
    # Map PersonID to Generation for easy lookup
    gen_map = df.set_index('PersonID')['Generation'].to_dict()

    # Helper to safely get relationship IDs from a string column (e.g., SiblingID, ChildID)
    def _get_relatives_from_string(person_id: int, col_name: str) -> List[int]:
        s = df.loc[df['PersonID'] == person_id, col_name].iloc[0]
        if pd.isna(s) or s == '':
            return []
        
        # Clean and split the string
        s = str(s).replace('[', '').replace(']', '').replace("'", '').replace('"', '')
        return [int(x.strip()) for x in s.split(',') if x.strip() and x.strip() != 'nan']

    # Iterate through all people for comprehensive checks
    for _, row in df.iterrows():
        p_id = row['PersonID']
        p_gen = row['Generation']

        # 1. Parent/Child Reciprocity Check (FatherID/MotherID vs ChildID)
        child_ids = _get_relatives_from_string(p_id, 'ChildID')
        
        for c_id in child_ids:
            child_row = df.loc[df['PersonID'] == c_id]
            if child_row.empty: continue 
            child_row = child_row.iloc[0]
            
            # Use safe boolean checks to avoid the TypeError
            is_father = pd.notna(child_row['FatherID']) and child_row['FatherID'] == p_id
            is_mother = pd.notna(child_row['MotherID']) and child_row['MotherID'] == p_id
            is_parent = is_father or is_mother
            
            if not is_parent:
                error_messages.append(f"RECIPROCITY ERROR: PERSON {p_id} LISTS {c_id} AS A CHILD, BUT {c_id}'S PARENTS ARE NOT P.")
            
        # 2. Sibling Reciprocity Check (SiblingID)
        sibling_ids = _get_relatives_from_string(p_id, 'SiblingID')
        
        for s_id in sibling_ids:
            sibling_row = df.loc[df['PersonID'] == s_id]
            if sibling_row.empty: continue
            
            sibling_siblings = _get_relatives_from_string(s_id, 'SiblingID')
            if p_id not in sibling_siblings:
                error_messages.append(f"SIBLING ERROR: PERSON {p_id} LISTS {s_id} AS SIBLING, BUT {s_id} DOES NOT LIST P AS SIBLING.")

        # 3. Partner Reciprocity Check (PartnerID)
        partner_id = row['PartnerID']
        if pd.notna(partner_id):
            p_id_int = int(partner_id)
            partner_row = df.loc[df['PersonID'] == p_id_int]
            
            if not partner_row.empty:
                partner_partner_id = partner_row.iloc[0]['PartnerID']
                # Check for reciprocity: ensure partner_partner_id is not missing AND equals p_id
                if pd.isna(partner_partner_id) or int(partner_partner_id) != p_id:
                    error_messages.append(f"PARTNER ERROR: PERSON {p_id} LISTS {p_id_int} AS PARTNER, BUT {p_id_int} DOES NOT LIST P AS PARTNER.")

        # 4. Generation Consistency Check (Partner/Sibling Generations must match)
        
        # Partner Gen Check
        if pd.notna(partner_id) and pd.notna(p_gen) and int(partner_id) in gen_map:
            p_id_int = int(partner_id)
            partner_gen = gen_map[p_id_int]
            if p_gen != partner_gen:
                error_messages.append(f"GENERATION ERROR: PARTNER {p_id} (GEN {p_gen}) AND {p_id_int} (GEN {partner_gen}) HAVE INCONSISTENT GENERATIONS.")
                
        # Sibling Gen Check
        for s_id in sibling_ids:
            if s_id in gen_map and pd.notna(p_gen):
                sibling_gen = gen_map[s_id]
                if p_gen != sibling_gen:
                    error_messages.append(f"GENERATION ERROR: SIBLING {p_id} (GEN {p_gen}) AND {s_id} (GEN {sibling_gen}) HAVE INCONSISTENT GENERATIONS.")
        
        # 5. Generation Consistency Check (Parent/Child Generations must differ by 1)
        
        # Parent Gen Check
        for parent_id in [row['FatherID'], row['MotherID']]:
            if pd.notna(parent_id) and pd.notna(p_gen) and int(parent_id) in gen_map:
                p_id_int = int(parent_id)
                parent_gen = gen_map[p_id_int]
                if p_gen != parent_gen + 1:
                    error_messages.append(f"GENERATION ERROR: PARENT {p_id_int} (GEN {parent_gen}) AND CHILD {p_id} (GEN {p_gen}) HAVE INCORRECT GENERATIONAL DIFFERENCE.")
                    
    if error_messages:
        print("\n\n" + "#" * 50)
        print("!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!")
        print("!!! FAMILY DATA INCONSISTENCY DETECTED AFTER GENERATION CALCULATION !!!".upper())
        print("!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!")
        for msg in error_messages:
            print(f"!!! {msg}")
        print("#" * 50 + "\n\n")
        return False
    else:
        print("\n✅ All relationship and generation fields are self-consistent.")
        return True

# --------------------------------------------------------------------------------

def calculate_generations(input_file: str, output_file: str, max_iterations: int = 20):
    """
    Reads the data, iteratively calculates the Generation number for each person,
    performs consistency checks, and saves the final result.
    """
    # NOTE: The input_file here is the intermediate file from fill_family_relationships.
    # The output_file here is the final desired file (family_data.xlsx).
    print(f"\n--- Step 2: Iteratively Calculating Generation numbers (Max {max_iterations} iterations) ---")
    
    df = _read_file(input_file)
    if df is None: return

    # Since the input file already has ChildID/SiblingID, use it as the source for lookups
    df_original = df.copy() 

    id_cols = ['PersonID', 'PartnerID', 'FatherID', 'MotherID']
    for col in id_cols:
        df[col] = pd.to_numeric(df[col], errors='coerce').astype('Int64')

    if 'Generation' not in df.columns: df['Generation'] = np.nan
    df['Generation'] = pd.to_numeric(df['Generation'].astype(str).str.strip(), errors='coerce').astype('Int64')

    initial_unknown_count = df['Generation'].isna().sum()
    gen_map = df.set_index('PersonID')['Generation'].dropna().apply(int).to_dict()
    df_sorted = df.sort_values(by='PersonID').copy() 
    unknown_gen_ids = set(df.loc[df['Generation'].isna(), 'PersonID'].apply(int).tolist())

    
    iteration_count = 0
    
    while unknown_gen_ids and iteration_count < max_iterations:
        
        iteration_count += 1
        newly_determined_count_pass = 0
        rows_to_process = df_sorted[df_sorted['PersonID'].isin(unknown_gen_ids)].to_dict('records')
        
        new_gen_updates = {} 
        
        for row in rows_to_process:
            person_id = row['PersonID']
            
            new_gen = _get_generation_from_relationships(pd.Series(row), df_original, gen_map)

            if new_gen is None:
                continue 
            
            else:
                if person_id not in gen_map and person_id not in new_gen_updates:
                    new_gen_updates[person_id] = new_gen
                    newly_determined_count_pass += 1
        
        gen_map.update(new_gen_updates)
        
        if newly_determined_count_pass == 0:
            break
        
        unknown_gen_ids.difference_update(new_gen_updates.keys())

    
    # Apply final generations to the DataFrame
    df['Generation'] = df['PersonID'].map(gen_map).astype('Int64')
    final_unknown_count = df['Generation'].isna().sum()

    print(f"Generation calculation complete: {iteration_count} iterations performed.")
    
    # --- Final Reporting on Calculation ---
    
    if iteration_count == max_iterations and unknown_gen_ids:
        print(f"\n🛑 LIMIT REACHED: Stopped after {max_iterations} iterations.")
    
    if final_unknown_count > 0:
        unknown_ids = df.loc[df['Generation'].isna(), 'PersonID'].tolist()
        print(f"\n⚠️ WARNING: {final_unknown_count} generations are still missing (out of {len(df)} total people).")
        print(f"Unknown PersonIDs: {unknown_ids}")
    else:
        print(f"\n✅ SUCCESS: All generations were determined in {iteration_count} iterations.")

    # ----------------------------------------------------
    # POST-CALCULATION CONSISTENCY CHECK
    # ----------------------------------------------------
    
    is_consistent = _perform_consistency_checks(df)


    if is_consistent:
        print("\n--- Final Filled Family Data with Generations (Readably Formatted) ---")
        print(df.to_string())
        print("----------------------------------------------------------------------\n")
        
        try:
            # Save the final result to the desired file (which is the original family_data.xlsx)
            _write_file(df, output_file)
            print(f"Successfully saved final data to original file path: '{output_file}'.")
            
            # Since the original file is overwritten, delete the temporary file
            os.remove(input_file)
            print(f"Successfully removed temporary file: '{input_file}'.")

        except Exception as e:
            print(f"Error saving file '{output_file}': {e}")
    else:
        # If inconsistent, the capitalized error report has already been printed.
        print(f"\nPROCESS STOPPED: Data inconsistency found. Output file '{output_file}' was NOT written.")


if __name__ == '__main__':
    FILE_EXT = '.xlsx' 
    
    # --- File Paths Configuration ---
    # INPUT_FILE is the file to be processed (and the final output file)
    INPUT_FILE = f'family_data{FILE_EXT}'
    # ORIGINAL_FILE is the backup of the initial data
    ORIGINAL_FILE = f'family_data_original{FILE_EXT}'
    # TEMPORARY_FILE is the intermediate file used between the two main functions
    TEMPORARY_FILE = f'family_data_temp{FILE_EXT}'
    
    MAX_GEN_ITERATIONS = 20 

    # 1. Make a copy of the initial file.
    if os.path.exists(INPUT_FILE):
        try:
            shutil.copy2(INPUT_FILE, ORIGINAL_FILE)
            print(f"✅ Created backup of initial file: '{ORIGINAL_FILE}'")
        except Exception as e:
            print(f"❌ Error creating backup file: {e}")
            exit()
    else:
        print(f"❌ Error: Initial file '{INPUT_FILE}' not found.")
        exit()

    # 2. Run the first step, saving to the temporary file.
    # The output_file of this function becomes the input_file for the next.
    filled_file = fill_family_relationships(INPUT_FILE, TEMPORARY_FILE)
    
    # 3. Run the second step, reading from the temporary file and saving the final result to the INPUT_FILE.
    if filled_file:
        # Note: TEMPORARY_FILE is passed as input, and INPUT_FILE is passed as the final output.
        calculate_generations(filled_file, INPUT_FILE, max_iterations=MAX_GEN_ITERATIONS)
    
    # Cleanup: If the temporary file still exists (e.g., due to an error during calculate_generations), 
    # you might want to remove it here. However, calculate_generations already handles successful removal.
    # We can add a simple check for safety.
    if os.path.exists(TEMPORARY_FILE):
        try:
            os.remove(TEMPORARY_FILE)
            print(f"Cleanup: Removed any remaining temporary file: '{TEMPORARY_FILE}'.")
        except Exception as e:
            print(f"Cleanup warning: Could not remove temporary file: {e}")

✅ Created backup of initial file: 'family_data_original.xlsx'
--- Step 1: Filling SiblingID and ChildID and saving to 'family_data_temp.xlsx' ---
Successfully created intermediate file 'family_data_temp.xlsx'.

--- Step 2: Iteratively Calculating Generation numbers (Max 20 iterations) ---
Generation calculation complete: 0 iterations performed.

✅ SUCCESS: All generations were determined in 0 iterations.

✅ All relationship and generation fields are self-consistent.

--- Final Filled Family Data with Generations (Readably Formatted) ---
    PersonID          Name       Name-ru LastName-ru Gender   BirthDate  DeathDate  PartnerID  FatherID  MotherID   SiblingID         ChildID  Generation                       PlaceBirth                       PlaceNow                                                         Occupation             Photo                                        Other_photo
0          1         Misha        Михаил    Савченко      M  12.12.2019        NaN          2        27