<a href="https://colab.research.google.com/github/delajiqi/awesome/blob/main/%E5%BD%A9%E7%A5%A8%E5%88%86%E6%9E%90%E8%84%9A%E6%9C%AC_(%E5%A4%9A%E7%AD%96%E7%95%A5%E7%89%88).ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
import pandas as pd
import numpy as np
import os
import requests
import io
import time
from datetime import datetime
from itertools import product

# ====================== 配置路径 ======================
# 将输出文件保存到脚本运行目录下的 'output' 文件夹中
output_dir = os.path.join(os.getcwd(), "彩票分析结果")
os.makedirs(output_dir, exist_ok=True) # 确保输出目录存在

# 数据源URL
fc_url = "http://data.17500.cn/3d_desc.txt"
tc_url = "https://data.17500.cn/pl32_desc.txt"

# 生成带时间戳的文件名
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
output_file = os.path.join(output_dir, f"大中小分析_{timestamp}.xlsx")

# ====================== 数据加载和预处理 ======================
def load_and_process_from_url(url, lottery_type):
    """
    从指定URL获取数据并进行预处理。
    """
    try:
        headers = {
            "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36"
        }
        print(f"尝试从 {url} 获取数据...")
        response = requests.get(url, headers=headers, timeout=10)
        response.raise_for_status()

        content = response.text
        data_io = io.StringIO(content)

        if lottery_type == 'FC':
            col_names = [
                "开奖期号", "开奖日期", "百位", "十位", "个位", "试机号1", "试机号2", "试机号3", "组选类型",
                "销量", "直选_中奖注数", "直选_奖金", "组三_中奖注数", "组三_奖金",
                "组六_中奖注数", "组六_奖金", "其他中奖信息"
            ]
        elif lottery_type == 'TC':
            col_names = [
                "开奖期号", "开奖日期", "百位", "十位", "个位", "试机号1", "试机号2", "试机号3",
                "直选销量", "组选销量", "直选奖金", "组选3奖金", "组选6奖金",
                "直选_中奖注数", "组选3_中奖注数", "组选6_中奖注数", "奖池余额"
            ]

        df = pd.read_csv(data_io, header=None, names=col_names, encoding='utf-8', on_bad_lines='skip', sep='\s+')
        df = df[['开奖期号', '开奖日期', '百位', '十位', '个位']]

        df["开奖日期"] = pd.to_datetime(df["开奖日期"], errors="coerce").dt.strftime("%Y%m%d")
        df = df.sort_values(by=["开奖日期", "开奖期号"], ascending=True).reset_index(drop=True)

        for col in ["百位", "十位", "个位"]:
            df[col] = pd.to_numeric(df[col], errors="coerce").fillna(-1).astype(int).clip(0, 9)

        valid_mask = (df["百位"] >= 0) & (df["十位"] >= 0) & (df["个位"] >= 0) & (df["开奖日期"].notnull())
        df = df[valid_mask]

        df["开奖号码"] = df.apply(lambda row: f"{row['百位']}-{row['十位']}-{row['个位']}", axis=1)

        return df.drop(columns=["百位", "十位", "个位"])

    except requests.exceptions.RequestException as e:
        print(f"从 URL {url} 获取数据失败: {e}")
        return pd.DataFrame()
    except Exception as e:
        print(f"处理来自 URL {url} 的数据时发生错误: {e}")
        return pd.DataFrame()

# ====================== 形态分析函数 ======================
def analyze_shapes(df, prefix):
    """
    对开奖号码进行大中小形态分析。
    """
    def safe_split(number_str):
        try:
            return list(map(int, str(number_str).split("-")))
        except:
            return []

    def check_dzx_group(number_str):
        numbers = safe_split(number_str)
        if len(numbers) != 3:
            return ""
        labels = set()
        for n in numbers:
            if n >= 7: labels.add("大")
            elif n >= 3: labels.add("中")
            else: labels.add("小")
        return "中奖" if len(labels) == 3 else ""

    df[f"{prefix}大中小"] = df["开奖号码"].apply(check_dzx_group)
    return df

# ====================== 精确顺序遗漏计算函数 ======================
def compute_sequential_omission(df, fc_result_col, tc_result_col, new_fc_omission_col, new_tc_omission_col):
    """
    计算精确的、按福彩->体彩顺序的连续遗漏值。
    """
    counter = 0
    fc_omissions = []
    tc_omissions = []

    for index, row in df.iterrows():
        if row.get(fc_result_col) == "中奖":
            fc_omissions.append("中奖")
            counter = 0
        else:
            counter += 1
            fc_omissions.append(counter)

        if row.get(tc_result_col) == "中奖":
            tc_omissions.append("中奖")
            counter = 0
        else:
            counter += 1
            tc_omissions.append(counter)

    df[new_fc_omission_col] = fc_omissions
    df[new_tc_omission_col] = tc_omissions
    return df

# ====================== 杀号玩法分析函数 (已重构) ======================
def analyze_kill_strategy(df, history_count, strategy_name):
    """
    分析杀号玩法的遗漏情况。
    "中奖"定义: 当期大中小号码不在最近(history_count*2)次的历史大中小号码列表中。
    "遗漏"定义: 当期大中小号码在历史列表中（被错杀）。
    """
    print(f"正在执行 {strategy_name} 分析 (杀 {history_count*2} 期)...")
    fc_dzx_history = []
    tc_dzx_history = []

    fc_kill_results = []
    tc_kill_results = []

    counter = 0

    for index, row in df.iterrows():
        killed_set = set(fc_dzx_history[-history_count:] + tc_dzx_history[-history_count:])

        fc_num_str = row.get('3D')
        is_fc_dzx = (row.get('福彩大中小') == '中奖')

        if fc_num_str and is_fc_dzx:
            normalized_fc_num = "".join(sorted(fc_num_str.split('-')))

            if normalized_fc_num not in killed_set:
                fc_kill_results.append('中奖')
                counter = 0
            else:
                counter += 1
                fc_kill_results.append(counter)

            fc_dzx_history.append(normalized_fc_num)
        else:
            counter += 1
            fc_kill_results.append(counter)

        killed_set = set(fc_dzx_history[-history_count:] + tc_dzx_history[-history_count:])

        tc_num_str = row.get('排列三')
        is_tc_dzx = (row.get('体彩大中小') == '中奖')

        if tc_num_str and is_tc_dzx:
            normalized_tc_num = "".join(sorted(tc_num_str.split('-')))

            if normalized_tc_num not in killed_set:
                tc_kill_results.append('中奖')
                counter = 0
            else:
                counter += 1
                tc_kill_results.append(counter)

            tc_dzx_history.append(normalized_tc_num)
        else:
            counter += 1
            tc_kill_results.append(counter)

    df[f'{strategy_name}福彩'] = fc_kill_results
    df[f'{strategy_name}体彩'] = tc_kill_results

    final_killed_set = set(fc_dzx_history[-history_count:] + tc_dzx_history[-history_count:])
    return df, final_killed_set

# ====================== 生成大中小组合函数 ======================
def generate_dzx_combinations():
    """
    生成所有36注大中小组选号码组合。
    """
    small = [0, 1, 2]
    medium = [3, 4, 5, 6]
    large = [7, 8, 9]

    all_combinations = set()
    for combo in product(small, medium, large):
        all_combinations.add("".join(map(str, sorted(combo))))
    return all_combinations

# ====================== Excel格式设置 (已更新) ======================
def add_excel_formatting(writer, merged_df, summary_data):
    """
    为Excel工作表添加必要的格式和策略小结。
    """
    workbook = writer.book
    worksheet = writer.sheets['大中小分析']

    worksheet.freeze_panes(1, 4)

    header_fmt = workbook.add_format({'bold': True, 'bg_color': '#D9D9D9', 'border': 1, 'align': 'center', 'valign': 'vcenter'})
    yellow_fmt = workbook.add_format({'bg_color': '#FFFF00'})
    green_fmt  = workbook.add_format({'bg_color': '#92D050'})
    red_fmt    = workbook.add_format({'bg_color': '#FFC7CE', 'font_color': '#9C0006'})
    purple_fmt = workbook.add_format({'bg_color': '#7030A0', 'font_color': '#FFFFFF'})
    summary_header_fmt = workbook.add_format({'bold': True, 'bg_color': '#4F81BD', 'font_color': 'white', 'border': 1})
    summary_text_fmt = workbook.add_format({'text_wrap': True, 'valign': 'top', 'border': 1})

    for col_num, value in enumerate(merged_df.columns.values):
        worksheet.write(0, col_num, value, header_fmt)

    for idx, col in enumerate(merged_df):
        series = merged_df[col]
        max_len = max((
            series.astype(str).map(lambda x: len(str(x).encode('utf-8', 'replace'))).max(),
            len(str(col).encode('utf-8', 'replace'))
        )) + 2
        worksheet.set_column(idx, idx, max_len)

    omission_cols = [
        '福彩大中小遗漏', '体彩大中小遗漏',
        '杀8期策略福彩', '杀8期策略体彩',
        '杀10期策略福彩', '杀10期策略体彩',
        '杀12期策略福彩', '杀12期策略体彩'
    ]
    for col_name in omission_cols:
        if col_name not in merged_df.columns: continue
        try:
            col_idx = merged_df.columns.get_loc(col_name)
            col_letter = chr(ord('A') + col_idx)
            cell_range = f"{col_letter}2:{col_letter}{len(merged_df) + 1}"

            worksheet.conditional_format(cell_range, {'type': 'formula', 'criteria': f'=AND(ISNUMBER({col_letter}2), {col_letter}2>30)', 'format': purple_fmt})
            worksheet.conditional_format(cell_range, {'type': 'formula', 'criteria': f'=AND(ISNUMBER({col_letter}2), {col_letter}2>=21, {col_letter}2<=30)', 'format': red_fmt})
            worksheet.conditional_format(cell_range, {'type': 'formula', 'criteria': f'=AND(ISNUMBER({col_letter}2), {col_letter}2>=11, {col_letter}2<=20)', 'format': green_fmt})
            worksheet.conditional_format(cell_range, {'type': 'formula', 'criteria': f'=AND(ISNUMBER({col_letter}2), {col_letter}2<10, {col_letter}2>0)', 'format': yellow_fmt})
        except KeyError:
            print(f"警告: 未找到列 '{col_name}'，跳过其条件格式设置。")

    # --- 添加策略小结 ---
    last_row = len(merged_df) + 3
    worksheet.write(last_row, 0, "策略小结 (下次投注参考)", header_fmt)
    worksheet.merge_range(last_row, 1, last_row, 12, "") # 合并单元格

    for i, (strategy_name, remaining_nums) in enumerate(summary_data.items()):
        current_row = last_row + 1 + i
        worksheet.write(current_row, 0, f"{strategy_name}剩余组合 ({len(remaining_nums)}注):", summary_header_fmt)
        worksheet.merge_range(current_row, 1, current_row, 12, ", ".join(sorted(list(remaining_nums))), summary_text_fmt)


# ====================== 主程序 ======================
if __name__ == "__main__":
    final_columns = [
        "开奖日期", "开奖期号", "3D", "排列三",
        "福彩大中小遗漏", "体彩大中小遗漏",
        "杀8期策略福彩", "杀8期策略体彩",
        "杀10期策略福彩", "杀10期策略体彩",
        "杀12期策略福彩", "杀12期策略体彩",
    ]

    df_fc = load_and_process_from_url(fc_url, 'FC')
    time.sleep(np.random.randint(2, 6))
    df_tc = load_and_process_from_url(tc_url, 'TC')

    if df_fc.empty and df_tc.empty:
        print("无法获取任何数据，程序终止。")
        exit()

    if not df_fc.empty: df_fc = analyze_shapes(df_fc, "福彩")
    if not df_tc.empty: df_tc = analyze_shapes(df_tc, "体彩")

    merged_df = pd.merge(
        df_fc, df_tc, on="开奖日期", suffixes=('_福彩', '_体彩'), how="outer"
    ).sort_values(by="开奖日期", ascending=True).reset_index(drop=True)

    merged_df['开奖期号'] = merged_df['开奖期号_福彩'].fillna(merged_df['开奖期号_体彩'])
    merged_df = merged_df.rename(columns={"开奖号码_福彩": "3D", "开奖号码_体彩": "排列三"})
    merged_df = merged_df.fillna("").drop(columns=['开奖期号_福彩', '开奖期号_体彩'], errors='ignore')

    merged_df = compute_sequential_omission(
        merged_df, "福彩大中小", "体彩大中小", "福彩大中小遗漏", "体彩大中小遗漏"
    )

    # --- 并行执行三种杀号策略 ---
    all_dzx_combinations = generate_dzx_combinations()
    summary_data = {}

    merged_df, final_killed_set_8 = analyze_kill_strategy(merged_df, 4, "杀8期策略")
    summary_data['杀8期'] = all_dzx_combinations - final_killed_set_8

    merged_df, final_killed_set_10 = analyze_kill_strategy(merged_df, 5, "杀10期策略")
    summary_data['杀10期'] = all_dzx_combinations - final_killed_set_10

    merged_df, final_killed_set_12 = analyze_kill_strategy(merged_df, 6, "杀12期策略")
    summary_data['杀12期'] = all_dzx_combinations - final_killed_set_12

    # 筛选2005年及以后的数据
    merged_df['开奖日期_dt'] = pd.to_datetime(merged_df['开奖日期'], errors='coerce')
    merged_df = merged_df[merged_df['开奖日期_dt'] >= '2005-01-01'].copy()
    merged_df['开奖日期'] = merged_df['开奖日期_dt'].dt.strftime('%Y-%m-%d')
    merged_df = merged_df.drop(columns=['开奖日期_dt'])

    for col in final_columns:
        if col not in merged_df.columns:
            merged_df[col] = ""

    merged_df = merged_df[final_columns]

    with pd.ExcelWriter(output_file, engine='xlsxwriter') as writer:
        merged_df.to_excel(writer, index=False, sheet_name='大中小分析')
        add_excel_formatting(writer, merged_df, summary_data)

    print(f"\n分析完成！已增加多种杀号玩法分析及策略小结，文件已保存至：{output_file}")