## 一周内周KDJ金叉，期间日KDJ金叉且无死叉

## 一周内周KDJ金叉，当日日KDJ死叉

In [None]:
import akshare as ak
import pandas as pd
from pandas import DataFrame
from datetime import datetime, timedelta
import time
import matplotlib.pyplot as plt
import shutil
from pathlib import Path
from concurrent.futures import ThreadPoolExecutor, as_completed
import random
import os
from pathlib import Path

current_date = datetime.now() + timedelta(days=1)
one_week_ago = current_date - timedelta(days=7)  # 计算一周之前的日期

# 计算KDJ指标
def calculate_kdj(df: DataFrame, n=9):
    """计算 KDJ 指标"""
    low_min = df["low"].rolling(window=n).min()
    high_max = df["high"].rolling(window=n).max()
    rsv = 100 * (df["close"] - low_min) / (high_max - low_min)
    df["K"] = rsv.ewm(com=2).mean()  # K线
    df["D"] = df["K"].ewm(com=2).mean()  # D线
    df["J"] = 3 * df["K"] - 2 * df["D"]  # J线
    return df

# 按周重新采样，重新计算周K线数据的KDJ
def calculate_weekly_kdj(df: DataFrame):
    """计算周K线的KDJ指标"""
    df["date"] = pd.to_datetime(df["date"])  # 确保 date 列是 datetime 格式
    df = df.set_index("date")  # 把 date 设为索引

    weekly_data = (
        df.resample("W-FRI")  # 取当周五的数据
        .agg({"open": "first", "high": "max", "low": "min", "close": "last"})
        .dropna()
    )

    # 获取当前日期
    today = pd.Timestamp.today().normalize()  # 只取日期部分，去掉时间
    last_weekly_index = weekly_data.index[-1]  # 取最后一个周五索引

    # 如果今天不是周五，修改最后一行索引为今天
    if today != last_weekly_index:
        weekly_data = weekly_data.rename(index={last_weekly_index: today})

    return calculate_kdj(weekly_data)


def find_kdj_golden_cross(df: DataFrame):
    """查找 KDJ 金叉和死叉，并判断当日股票涨跌"""
    df["Golden_Cross"] = (df["K"] > df["D"]) & (df["K"].shift(1) <= df["D"].shift(1))
    df["Dead_Cross"] = (df["K"] < df["D"]) & (df["K"].shift(1) >= df["D"].shift(1))

    # 判断股票涨跌：今日收盘价高于昨日收盘价则为上涨（Up），否则为下跌（Down）
    df["Price_Change"] = df["close"].diff()  # 计算收盘价变化
    df["Trend"] = df["Price_Change"].apply(lambda x: "Up" if x > 0 else "Down")

    return df


# 查找最近金叉时段 且 必须是1最近一周
def get_golden_to_dead_cross_periods(df: DataFrame):
    periods = []
    start_date = None  # 用于记录金叉的开始日期

    for i in range(len(df)):
        if df["Golden_Cross"].iloc[i]:  # 如果是金叉
            if start_date is None:  # 开始记录金叉日期
                start_date = df.index[i]
        elif df["Dead_Cross"].iloc[i]:  # 如果是死叉
            if start_date is not None:  # 如果已记录金叉
                end_date = df.index[i]
                # 不再保存金叉-死叉日期段
                # periods.append((start_date, end_date))  # 保存当前金叉-死叉日期段
                start_date = None  # 重置金叉开始日期

    # 如果循环结束时还有未匹配的金叉（没有死叉）
    if start_date is not None:
        # 检查是否小于7天
        is_within_seven_days = ((df.index[-1] - start_date)).days <= 7

        if is_within_seven_days:
            periods.append((start_date, df.index[-1]))  # 配对到最后一行的日期

    return periods  # 返回金叉日期段


def filter_golden_cross_without_dead_cross(data: DataFrame, periods: list):

    # 提取金叉日期
    golden_cross_dates = data[data["Golden_Cross"]]

    # 检查周 KDJ 的金叉段内，无死叉条件
    filtered_dates = []

    # 查找最近日线kdj金叉日期
    last_golden_cross_date = golden_cross_dates.iloc[-1]["date"]

    # 判断和今日时间差
    is_recent = (current_date - last_golden_cross_date).days < 2

    if periods and is_recent:    
        filtered_dates.append(last_golden_cross_date)

    # 判断今日股票涨跌的
    if data["Trend"].iloc[-1] == "Down":
        return []

    return filtered_dates


# 查找符合的股票
def get_recent_golden_cross_dates(stock_code: str):
    data = download_with_retry(stock_code)

    if data.empty:  # 检查是否下载失败
        print(f"Failed to download data for {stock_code}. Skipping...")
        return []

    # 换成ak了，不需要去掉第二层的ticker
    # data.columns = data.columns.droplevel(1)  # 去掉第二层的 ticker，变为单层索引

    data = calculate_kdj(data)  # 计算日 KDJ
    weekly_data = calculate_weekly_kdj(data)  # 计算周 KDJ
    weekly_data = find_kdj_golden_cross(weekly_data)  # 查找周 K线金叉
    data = find_kdj_golden_cross(data)  # 查找日线 KDJ 金叉和死叉
    periods = get_golden_to_dead_cross_periods(
        weekly_data
    )  # 提取周 KDJ 金叉到死叉的日期段

    # 获取符合条件的金叉日期
    filtered_golden_cross_dates = filter_golden_cross_without_dead_cross(data, periods)

    # 将 filtered_golden_cross_dates 转换为 pandas 的 DatetimeIndex 以便筛选
    filtered_golden_cross_dates = pd.to_datetime(filtered_golden_cross_dates)

    # 筛选出最近一周内的日期
    recent_dates = [
        date.strftime("%Y-%m-%d")
        for date in filtered_golden_cross_dates
        if one_week_ago <= date <= current_date
    ]

    return recent_dates


# 查找周kdj金叉 + 当日kdj死叉
def filter_golden_cross_with_dead_cross(data: DataFrame, periods: list):
    filtered_dates = []
    if periods:
        # 检查今日是否出现死叉
        today_dead_cross = data["Dead_Cross"].iloc[-1]
        if today_dead_cross:
            filtered_dates.append(current_date)

    return filtered_dates


# 查找符合的股票
def get_recent_death_cross_dates(stock_code: str):
    data = download_with_retry(stock_code)

    if data.empty:  # 检查是否下载失败
        print(f"Failed to download data for {stock_code}. Skipping...")
        return []

    # 换成ak了，不需要去掉第二层的ticker
    # data.columns = data.columns.droplevel(1)  # 去掉第二层的 ticker，变为单层索引

    data = calculate_kdj(data)  # 计算日 KDJ
    weekly_data = calculate_weekly_kdj(data)  # 计算周 KDJ
    weekly_data = find_kdj_golden_cross(weekly_data)  # 查找周 K线金叉
    data = find_kdj_golden_cross(data)  # 查找日线 KDJ 金叉和死叉
    periods = get_golden_to_dead_cross_periods(
        weekly_data
    )  # 提取周 KDJ 金叉到死叉的日期段

    # 获取符合条件的金叉日期
    filtered_golden_cross_dates = filter_golden_cross_with_dead_cross(data, periods)

    # 将 filtered_golden_cross_dates 转换为 pandas 的 DatetimeIndex 以便筛选
    filtered_golden_cross_dates = pd.to_datetime(filtered_golden_cross_dates)

    # 筛选出最近一周内的日期
    recent_dates = [
        date.strftime("%Y-%m-%d")
        for date in filtered_golden_cross_dates
        if one_week_ago <= date <= current_date
    ]

    return recent_dates


# 生成股票代码 名称 词典
def generate_stock_dict(filename: str):
    stock_dict = {}

    with open(filename, "r", encoding="utf-8") as file:
        # 读取文件内容
        content = file.read()

        # 按照股票名和股票代码的格式分割
        stock_info = content.split(")")  # 按照')'分割

        for info in stock_info:
            if "(" in info:  # 如果包含'('，说明这是一个有效的股票信息
                code_name = info.split("(")  # 通过'('分割代码和名称
                if len(code_name) == 2:
                    code = code_name[1]
                    name = code_name[0].strip()
                    stock_dict[code] = name

    return stock_dict

# 带有重试机制的 yfinance 数据下载函数。
def download_with_retry(stock_code: str, max_retries=2, retry_delay=3):
    end_date = current_date
    for attempt in range(max_retries):
        try:
            df = ak.stock_zh_a_cdr_daily(
                symbol=stock_code,
                start_date="2024-01-01",
                end_date=end_date.strftime("%Y-%m-%d")
            )
            if not df.empty:  # 检查数据是否下载成功
                return df
            else:
                raise ValueError("Empty DataFrame returned.")
        except Exception as e:
            print(f"Attempt {attempt + 1} failed for {stock_code}: {e}")
            if attempt < max_retries - 1:
                time.sleep(retry_delay)  # 等待后重试
                end_date = end_date - timedelta(1)
            else:
                print(f"Max retries reached for {stock_code}. Skipping...")
    return pd.DataFrame()  # 如果失败，返回空 DataFrame


# 输出最终 Excel 文件
def output_excel(all_cross_dates: dict):
    """
    all_cross_dates: dict, 包含所有股票的交叉日期
    """
    filename = "stock_name.txt"  # 股票名称文件
    stock_dict = generate_stock_dict(filename)  # 读取股票代码-名称映射

    # 统计股票数量
    num_golden = 0
    num_death = 0
    golden_data = []
    death_data = []

    for stock_code, cross_dates in all_cross_dates.items():

        golden_dates = cross_dates.get("Golden_Cross", [])
        death_dates = cross_dates.get("Death_Cross", [])

        if not golden_dates and not death_dates:
            continue

        cur_stock_code = str(stock_code[-6:])  # 取出六位股票代码
        stock_name = stock_dict.get(cur_stock_code, "未知")  # 获取股票名称

        # 只输出最近一周内的金叉或死叉
        if golden_dates:
            cross_type = "金叉"
            # 记录数据
            golden_data.append(
                {
                    "股票代码": cur_stock_code,
                    "股票名称": stock_name,
                    f"一周内{cross_type}日期": golden_dates[
                        0
                    ],  # 记录最近一周的交叉日期
                }
            )
            num_golden += 1  # 统计金叉

        if death_dates:
            cross_type = "死叉"
            # 记录数据
            death_data.append(
                {
                    "股票代码": cur_stock_code,
                    "股票名称": stock_name,
                    f"一周内{cross_type}日期": death_dates[0],  # 记录最近一周的交叉日期
                }
            )
            num_death += 1  # 统计死叉

    # 创建 DataFrame
    df = pd.DataFrame(golden_data)

    # 添加汇总信息
    golden_row = pd.DataFrame(
        {
            "股票代码": [f"总计金叉股票个数:{num_golden}"],
            "股票名称": [""],
            f"一周内{cross_type}日期": [""],
        }
    )

    df = pd.concat([golden_row, df], ignore_index=True)

    # 锁定输出地址
    output_file = "./自选golden_output/golden_cross_summary.xlsx"
    # 导出 Excel
    df.to_excel(output_file, index=False)

    # 创建 DataFrame
    df = pd.DataFrame(death_data)

    death_row = pd.DataFrame(
        {
            "股票代码": [f"总计死叉股票个数:{num_death}"],
            "股票名称": [""],
            f"一周内{cross_type}日期": [""],
        }
    )

    df = pd.concat([death_row, df], ignore_index=True)

    # 锁定输出地址
    output_file = "./自选death_output/death_cross_summary.xlsx"
    # 导出 Excel
    df.to_excel(output_file, index=False)

    print(f"Excel 文件已生成")


def process_stock(stock_code: str):
    """处理单个股票，获取金叉和死叉日期"""
    time.sleep(random.uniform(0.1, 0.2))  # 随机延时
    try:
        golden_cross_dates = get_recent_golden_cross_dates(stock_code)
        death_cross_dates = get_recent_death_cross_dates(stock_code)
        return stock_code, golden_cross_dates, death_cross_dates
    except Exception as e:
        print(f"Error processing {stock_code}: {e}")
        return stock_code, [], []


def process_stocks(stock_list: list):
    """多线程处理股票列表，获取所有股票的金叉和死叉日期"""
    all_cross_dates = {}  # 存储所有股票的交叉日期
    with ThreadPoolExecutor(max_workers=10) as executor:
        future_to_stock = {
            executor.submit(process_stock, stock): stock for stock in stock_list
        }

        for future in as_completed(future_to_stock):
            stock_code, golden_dates, death_dates = future.result()
            all_cross_dates[stock_code] = {
                "Golden_Cross": golden_dates,
                "Death_Cross": death_dates,
            }

    return all_cross_dates


def clean_folder(folder_path: str):
    """删除指定文件夹中的所有文件"""
    for filename in os.listdir(folder_path):
        file_path = os.path.join(folder_path, filename)
        try:
            if os.path.isfile(file_path):
                os.remove(file_path)
        except Exception as e:
            print(f"Error deleting {file_path}: {e}")
# 生成股票代码 名称 词典
def generate_stock_dict(filename: str):
    stock_dict = {}

    with open(filename, "r", encoding="utf-8") as file:
        # 读取文件内容
        content = file.read()

        # 按照股票名和股票代码的格式分割
        stock_info = content.split(")")  # 按照')'分割

        for info in stock_info:
            if "(" in info:  # 如果包含'('，说明这是一个有效的股票信息
                code_name = info.split("(")  # 通过'('分割代码和名称
                if len(code_name) == 2:
                    code = code_name[1]
                    name = code_name[0].strip()
                    stock_dict[code] = name

    return stock_dict


# 带有重试机制的 yfinance 数据下载函数。
def download_with_retry(stock_code: str, max_retries=2, retry_delay=3):
    end_date = current_date
    for attempt in range(max_retries):
        try:
            df = ak.stock_zh_a_cdr_daily(
                symbol=stock_code,
                start_date="2010-01-01",
                end_date=end_date.strftime("%Y-%m-%d"),
            )
            if not df.empty:  # 检查数据是否下载成功
                return df
            else:
                raise ValueError("Empty DataFrame returned.")
        except Exception as e:
            print(f"Attempt {attempt + 1} failed for {stock_code}: {e}")
            if attempt < max_retries - 1:
                time.sleep(retry_delay)  # 等待后重试
                end_date = end_date - timedelta(1)
            else:
                print(f"Max retries reached for {stock_code}. Skipping...")
    return pd.DataFrame()  # 如果失败，返回空 DataFrame

def clean_folder(folder_path: str):
    """删除指定文件夹中的所有文件"""
    for filename in os.listdir(folder_path):
        file_path = os.path.join(folder_path, filename)
        try:
            if os.path.isfile(file_path):
                os.remove(file_path)
        except Exception as e:
            print(f"Error deleting {file_path}: {e}")


def mk_pic(output_file: str):
    # 读取 Excel
    file_path = output_file
    df = pd.read_excel(file_path, usecols=[0])  # 只读取第一列

    directory = os.path.dirname(output_file)  # 提取目录部分

    # 解决中文乱码
    plt.rcParams["font.sans-serif"] = ["SimHei"]
    plt.rcParams["axes.unicode_minus"] = False

    # 设置每张图最多显示的行数
    rows_per_image = 50

    # 按每 50 行划分数据并生成图片
    for i in range(0, len(df), rows_per_image):
        subset = df.iloc[i : i + rows_per_image]  # 取出 50 行数据
        fig, ax = plt.subplots(figsize=(5, len(subset) * 0.4))  # 调整图片大小
        ax.axis("tight")
        ax.axis("off")
        ax.table(
            cellText=subset.values,
            colLabels=subset.columns,
            cellLoc="center",
            loc="center",
        )

        # 生成文件名，如 output_0_0.png, output_0_1.png
        output_path = f"{directory}\output_{i // rows_per_image}.png"
        plt.savefig(output_path, bbox_inches="tight", dpi=300)

        # 关闭图像，释放内存
        plt.close(fig)

    print("所有图片已生成！")

In [None]:
# 读取原始文件并处理
input_file = "自选stock_code.txt"
output_file = "自选stock_code1.txt"

with open(input_file, "r") as infile, open(output_file, "w") as outfile:
    for line in infile:
        stock_code = line.strip()
        if stock_code.startswith("6"):
            processed_code = f"sh{stock_code}"
        else:
            processed_code = f"sz{stock_code}"
        outfile.write(processed_code + "\n")

print(f"Processed stock codes saved to {output_file}")

Processed stock codes saved to 自选stock_code1.txt


: 

In [None]:
file_name = "自选stock_code1.txt"
with open(file_name, "r") as file:
    stock_list = [line.strip() for line in file if line.strip()]

# 限制股票数量
stock_list = stock_list[:150]

all_cross_dates = process_stocks(stock_list)

In [None]:
filename = "stock_name.txt"  # 替换新文件路径
stock_dict = generate_stock_dict(filename)

# 统计股票数量
num_golden = 0
num_death = 0

for stock_code, cross_dates in all_cross_dates.items():
    golden_text, death_text = "无", "无"

    golden_dates = cross_dates.get("Golden_Cross", [])
    death_dates = cross_dates.get("Death_Cross", [])

    if not golden_dates and not death_dates:
        continue

    cur_stock_code = str(stock_code[-6:])  # 取出六位股票代码
    stock_name = stock_dict.get(cur_stock_code, "未知")  # 获取股票名称

    # 只输出最近一周内的金叉或死叉
    if golden_dates:
        golden_text = f"一周内金叉日期: {golden_dates[0]}"
        num_golden += 1  # 统计金叉

    if death_dates:
        death_text = f"一周内死叉日期: {death_dates[0]}"
        num_death += 1  # 统计死叉

    print(
        f"股票代码: {cur_stock_code}, 股票名称: {stock_name}, {golden_text}, {death_text}"
    )

print(f"总计金叉股票个数：{num_golden}")
print(f"总计死叉股票个数：{num_death}")

股票代码: 600067, 股票名称: 冠城新材, 无, 一周内死叉日期: 2025-03-03
股票代码: 600162, 股票名称: 香江控股, 无, 一周内死叉日期: 2025-03-03
总计金叉股票个数：0
总计死叉股票个数：2


In [None]:
try:
    folder = Path("自选golden_output")
    folder.mkdir()
    folder = Path("自选death_output")
    folder.mkdir()
except Exception as e:
    pass

# 清理两个文件夹
clean_folder("./自选death_output")
clean_folder("./自选golden_output")

In [18]:
output_excel(all_cross_dates)

Excel 文件已生成


In [None]:
output_file = "./自选golden_output/golden_cross_summary.xlsx"
mk_pic(output_file)
output_file = "./自选death_output/death_cross_summary.xlsx"
mk_pic(output_file)

所有图片已生成！
所有图片已生成！
