In [ ]:
# 手动检查并调整逻辑，确保特定区域的变化点得到正确处理
def validate_individual_strict(df, column_name):
    for i in range(1, len(df)):
        current_value = df.loc[i, column_name].upper()

        # 如果当前值是"-"，继续向后寻找下一个有效值
        if current_value == '-':
            continue

        # 找到前一个不是"-"的值
        previous_value = None
        for j in range(i - 1, -1, -1):
            if df.loc[j, column_name] != '-':
                previous_value = df.loc[j, column_name].upper()
                break

        if previous_value is None:
            continue

        # 严格处理变化点
        if previous_value == 'H' and current_value == 'A':
            start_pos = df.loc[i, 'POS-2']
            end_pos = start_pos + 100000
            in_range_mask = (df['POS-2'] > start_pos) & (df['POS-2'] <= end_pos)
            in_range_values = df.loc[in_range_mask, column_name].str.upper()
            a_count = in_range_values.value_counts().get('A', 0)
            total_count = in_range_values.isin(['A', 'H']).sum()
            if total_count == 0 or (a_count / total_count) <= 0.5:
                df.at[i, column_name] = 'h'
                # 检查后续连续的A，确保它们被正确处理
                for k in range(i + 1, len(df)):
                    if df.loc[k, column_name].upper() == 'A':
                        df.at[k, column_name] = 'h'
                    else:
                        break
        
        elif previous_value == 'A' and current_value == 'H':
            start_pos = df.loc[i, 'POS-2']
            end_pos = start_pos + 100000
            in_range_mask = (df['POS-2'] > start_pos) & (df['POS-2'] <= end_pos)
            in_range_values = df.loc[in_range_mask, column_name].str.upper()
            h_count = in_range_values.value_counts().get('H', 0)
            total_count = in_range_values.isin(['A', 'H']).sum()
            if total_count == 0 or (h_count / total_count) <= 0.5:
                df.at[i, column_name] = 'a'
                # 检查后续连续的H，确保它们被正确处理
                for k in range(i + 1, len(df)):
                    if df.loc[k, column_name].upper() == 'H':
                        df.at[k, column_name] = 'a'
                    else:
                        break

    return df

# 重新加载数据
df = pd.read_csv('E:/20231205桌面/4Z-2-binsTEST.csv')

# 只处理Individual1列，手动检查
df = validate_individual_strict(df, 'Individual1')


# 对所有Individual列应用严格的逻辑并保存结果
df = pd.read_csv('E:/20231205桌面/4Z-2-binsTEST.csv')

# 对所有Individual列进行严格处理
individual_columns = [col for col in df.columns if col.startswith('Individual')]
for col in individual_columns:
    df = validate_individual_strict(df, col)

# 保存结果到新的CSV文件
output_file_path = 'E:/20231205桌面/processed_individuals_final.csv'
df.to_csv(output_file_path, index=False)

output_file_path  # 返回保存的文件路径供下载


In [ ]:
import pandas as pd

# 修改后的通用判断逻辑函数，处理无效区间
def validate_individual_with_adjustment(df, column_name):
    for i in range(1, len(df)):
        current_value = df.loc[i, column_name].upper()

        # 如果当前值是"-"，继续向后寻找下一个有效值
        if current_value == '-':
            continue

        # 找到前一个不是"-"的值
        previous_value = None
        for j in range(i - 1, -1, -1):
            if df.loc[j, column_name] != '-':
                previous_value = df.loc[j, column_name].upper()
                break

        if previous_value is None:
            continue

        # 严格处理变化点
        if previous_value == 'H' and current_value == 'A':
            start_pos = df.loc[i, 'POS-2']
            end_pos = start_pos + 100000
            in_range_mask = (df['POS-2'] > start_pos) & (df['POS-2'] <= end_pos)
            in_range_values = df.loc[in_range_mask, column_name].str.upper()
            
            # 如果没有有效单元格，加入下一个有效单元格
            if in_range_values.empty:
                next_valid_index = df.loc[i+1:, column_name].ne('-').idxmax()
                in_range_values = pd.Series([df.loc[next_valid_index, column_name].upper()])
            
            a_count = in_range_values.value_counts().get('A', 0)
            total_count = in_range_values.isin(['A', 'H']).sum()
            if total_count == 0 or (a_count / total_count) <= 0.5:
                df.at[i, column_name] = 'h'
                # 检查后续连续的A，确保它们被正确处理
                for k in range(i + 1, len(df)):
                    if df.loc[k, column_name].upper() == 'A':
                        df.at[k, column_name] = 'h'
                    else:
                        break
        
        elif previous_value == 'A' and current_value == 'H':
            start_pos = df.loc[i, 'POS-2']
            end_pos = start_pos + 100000
            in_range_mask = (df['POS-2'] > start_pos) & (df['POS-2'] <= end_pos)
            in_range_values = df.loc[in_range_mask, column_name].str.upper()

            # 如果没有有效单元格，加入下一个有效单元格
            if in_range_values.empty:
                next_valid_index = df.loc[i+1:, column_name].ne('-').idxmax()
                in_range_values = pd.Series([df.loc[next_valid_index, column_name].upper()])
            
            h_count = in_range_values.value_counts().get('H', 0)
            total_count = in_range_values.isin(['A', 'H']).sum()
            if total_count == 0 or (h_count / total_count) <= 0.5:
                df.at[i, column_name] = 'a'
                # 检查后续连续的H，确保它们被正确处理
                for k in range(i + 1, len(df)):
                    if df.loc[k, column_name].upper() == 'H':
                        df.at[k, column_name] = 'a'
                    else:
                        break

    return df

# 加载并处理数据
df = pd.read_csv('E:/20231205桌面/4Z-2-binsTEST.csv')

# 对所有Individual列进行处理并保存结果
individual_columns = [col for col in df.columns if col.startswith('Individual')]
for col in individual_columns:
    df = validate_individual_with_adjustment(df, col)

# 保存结果
output_file_path = 'E:/20231205桌面/processed_individuals_final_with_adjustment.csv'
df.to_csv(output_file_path, index=False)

print(f"数据处理完成，结果已保存到 {output_file_path}")


In [ ]:
####暂时成功了，目前没发现问题，是将所有的标记矩阵进行imputing

import pandas as pd

# 修改后的判断逻辑，包含新规则
def validate_individual_with_extended_logic(df, column_name):
    for i in range(1, len(df)):
        current_value = df.loc[i, column_name].upper()

        # 如果当前值是"-"，继续向后寻找下一个有效值
        if current_value == '-':
            continue

        # 找到前一个不是"-"的值
        previous_value = None
        previous_pos = None
        for j in range(i - 1, -1, -1):
            if df.loc[j, column_name] != '-':
                previous_value = df.loc[j, column_name].upper()
                previous_pos = df.loc[j, 'POS-2']
                break

        if previous_value is None:
            continue

        # 严格处理变化点
        if previous_value == 'H' and current_value == 'A':
            start_pos = df.loc[i, 'POS-2']
            end_pos = start_pos + 100000
            in_range_mask = (df['POS-2'] > start_pos) & (df['POS-2'] <= end_pos)
            in_range_values = df.loc[in_range_mask, column_name].str.upper()

            # 如果没有有效单元格，加入下一个有效单元格，并考虑上一个单元格的位置
            if in_range_values.empty:
                next_valid_index = df.loc[i+1:, column_name].ne('-').idxmax()
                in_range_values = pd.Series([df.loc[next_valid_index, column_name].upper()])

                # 检查上一个单元格的位置
                if previous_pos is not None and (start_pos - previous_pos) > 100000:
                    # 如果上一个单元格位置超出100,000范围，则将当前单元格设为`-`
                    df.at[i, column_name] = '-'
                    continue
            
            a_count = in_range_values.value_counts().get('A', 0)
            total_count = in_range_values.isin(['A', 'H']).sum()
            if total_count == 0 or (a_count / total_count) <= 0.5:
                df.at[i, column_name] = 'h'
                # 检查后续连续的A，确保它们被正确处理
                for k in range(i + 1, len(df)):
                    if df.loc[k, column_name].upper() == 'A':
                        df.at[k, column_name] = 'h'
                    else:
                        break
        
        elif previous_value == 'A' and current_value == 'H':
            start_pos = df.loc[i, 'POS-2']
            end_pos = start_pos + 100000
            in_range_mask = (df['POS-2'] > start_pos) & (df['POS-2'] <= end_pos)
            in_range_values = df.loc[in_range_mask, column_name].str.upper()

            # 如果没有有效单元格，加入下一个有效单元格，并考虑上一个单元格的位置
            if in_range_values.empty:
                next_valid_index = df.loc[i+1:, column_name].ne('-').idxmax()
                in_range_values = pd.Series([df.loc[next_valid_index, column_name].upper()])

                # 检查上一个单元格的位置
                if previous_pos is not None and (start_pos - previous_pos) > 100000:
                    # 如果上一个单元格位置超出100,000范围，则将当前单元格设为`-`
                    df.at[i, column_name] = '-'
                    continue

            h_count = in_range_values.value_counts().get('H', 0)
            total_count = in_range_values.isin(['A', 'H']).sum()
            if total_count == 0 or (h_count / total_count) <= 0.5:
                df.at[i, column_name] = 'a'
                # 检查后续连续的H，确保它们被正确处理
                for k in range(i + 1, len(df)):
                    if df.loc[k, column_name].upper() == 'H':
                        df.at[k, column_name] = 'a'
                    else:
                        break

    return df

# 加载并处理数据
df = pd.read_csv('E:/20231205桌面/4Z-2-binsTEST.csv')

# 对所有Individual列进行处理并保存结果
individual_columns = [col for col in df.columns if col.startswith('Individual')]
for col in individual_columns:
    df = validate_individual_with_extended_logic(df, col)

# 保存结果
output_file_path = 'E:/20231205桌面/processed_individuals_final_with_extended_logic.csv'
df.to_csv(output_file_path, index=False)

print(f"数据处理完成，结果已保存到 {output_file_path}")


In [ ]:
import pandas as pd

# 修改后的判断逻辑，包含新规则，并确保`-`不会影响判断逻辑
def validate_individual_with_extended_logic(df, column_name):
    for i in range(1, len(df)):
        current_value = df.loc[i, column_name].upper()

        # 如果当前值是"-"，继续向后寻找下一个有效值
        if current_value == '-':
            continue

        # 找到前一个不是"-"的值
        previous_value = None
        previous_pos = None
        for j in range(i - 1, -1, -1):
            if df.loc[j, column_name] != '-':
                previous_value = df.loc[j, column_name].upper()
                previous_pos = df.loc[j, 'POS-2']
                break

        if previous_value is None:
            continue

        # 严格处理变化点
        if previous_value == 'H' and current_value == 'A':
            start_pos = df.loc[i, 'POS-2']
            end_pos = start_pos + 100000
            in_range_mask = (df['POS-2'] > start_pos) & (df['POS-2'] <= end_pos)
            in_range_values = df.loc[in_range_mask, column_name].str.upper()

            # 忽略`-`并计算A的比例
            in_range_values = in_range_values[in_range_values != '-']

            # 如果没有有效单元格，加入下一个有效单元格，并考虑上一个单元格的位置
            if in_range_values.empty:
                next_valid_index = df.loc[i+1:, column_name].ne('-').idxmax()
                in_range_values = pd.Series([df.loc[next_valid_index, column_name].upper()])

                # 检查上一个单元格的位置
                if previous_pos is not None and (start_pos - previous_pos) > 100000:
                    # 如果上一个单元格位置超出100,000范围，则将当前单元格设为`-`
                    df.at[i, column_name] = '-'
                    continue
            
            a_count = in_range_values.value_counts().get('A', 0)
            total_count = in_range_values.isin(['A', 'H']).sum()
            if total_count == 0 or (a_count / total_count) < 0.5:
                df.at[i, column_name] = 'h'
                # 检查后续连续的A，确保它们被正确处理
                for k in range(i + 1, len(df)):
                    if df.loc[k, column_name].upper() == 'A':
                        df.at[k, column_name] = 'h'
                    else:
                        break
            else:
                # 当 A 的比例大于 50% 时，进一步检查下一个有效单元格
                next_valid_index = i + 1
                while next_valid_index < len(df) and df.loc[next_valid_index, column_name] == '-':
                    next_valid_index += 1

                # 确保我们在检查范围内找到了有效单元格
                if next_valid_index < len(df):
                    if df.loc[next_valid_index, column_name].upper() == 'H':
                        df.at[i, column_name] = 'h'
 
        
        elif previous_value == 'A' and current_value == 'H':
            start_pos = df.loc[i, 'POS-2']
            end_pos = start_pos + 100000
            in_range_mask = (df['POS-2'] > start_pos) & (df['POS-2'] <= end_pos)
            in_range_values = df.loc[in_range_mask, column_name].str.upper()

            # 忽略`-`并计算H的比例
            in_range_values = in_range_values[in_range_values != '-']

            # 如果没有有效单元格，加入下一个有效单元格，并考虑上一个单元格的位置
            if in_range_values.empty:
                next_valid_index = df.loc[i+1:, column_name].ne('-').idxmax()
                in_range_values = pd.Series([df.loc[next_valid_index, column_name].upper()])

                # 检查上一个单元格的位置
                if previous_pos is not None and (start_pos - previous_pos) > 100000:
                    # 如果上一个单元格位置超出100,000范围，则将当前单元格设为`-`
                    df.at[i, column_name] = '-'
                    continue
                        
                        
            h_count = in_range_values.value_counts().get('H', 0)
            total_count = in_range_values.isin(['A', 'H']).sum()
            if total_count == 0 or (h_count / total_count) < 0.5:
                df.at[i, column_name] = 'a'
                # 检查后续连续的A，确保它们被正确处理
                for k in range(i + 1, len(df)):
                    if df.loc[k, column_name].upper() == 'H':
                        df.at[k, column_name] = 'a'
                    else:
                        break
            else:
                # 当 A 的比例大于 50% 时，进一步检查下一个有效单元格
                next_valid_index = i + 1
                while next_valid_index < len(df) and df.loc[next_valid_index, column_name] == '-':
                    next_valid_index += 1

                # 确保我们在检查范围内找到了有效单元格
                if next_valid_index < len(df):
                    if df.loc[next_valid_index, column_name].upper() == 'A':
                        df.at[i, column_name] = 'a'
 
    return df

# 加载并处理数据
df = pd.read_csv('E:/20231205桌面/4Z-2-binsTEST.csv')

# 对所有Individual列进行处理并保存结果
individual_columns = [col for col in df.columns if col.startswith('Individual')]
for col in individual_columns:
    df = validate_individual_with_extended_logic(df, col)

# 保存结果
output_file_path = 'E:/20231205桌面/processed_individuals_final_with_extended_logic-3.csv'
df.to_csv(output_file_path, index=False)

print(f"数据处理完成，结果已保存到 {output_file_path}")
