### テストコード②

ここからは7/30以降の内容

### pklを読み込み、2種類のdfを出力する

In [None]:
import os

import numpy as np
import pandas as pd


def read_df():
    # 定数としてファイル名を宣言
    MONTHLY_TOTAL_FILE = "月次合計.pkl"
    ABSENT_RECORD_FILE = "欠勤履歴.pkl"
    PROJECT_RECORD_FILE = "プロジェクト.pkl"
    REGULATION_OVERTIME_FILE = "安衛法.pkl"
    HOLIDAY_RECORD_FILE = "有休取得実績.pkl"
    VACATION_RECORD_FILE = "休暇取得.pkl"

    # 保存先フォルダの指定
    SAVE_FOLDER = "data"
    # pickleファイルからデータフレームを読み込むコード（コメントアウト）
    df_hours_loaded = pd.read_pickle(os.path.join(SAVE_FOLDER, MONTHLY_TOTAL_FILE))
    df_absent_loaded = pd.read_pickle(os.path.join(SAVE_FOLDER, ABSENT_RECORD_FILE))
    df_wk_loaded = pd.read_pickle(os.path.join(SAVE_FOLDER, PROJECT_RECORD_FILE))
    df_reg_loaded = pd.read_pickle(os.path.join(SAVE_FOLDER, REGULATION_OVERTIME_FILE))
    df_holiday_loaded = pd.read_pickle(os.path.join(SAVE_FOLDER, HOLIDAY_RECORD_FILE))
    df_vacation_loaded = pd.read_pickle(os.path.join(SAVE_FOLDER, VACATION_RECORD_FILE))

    # データの表示
    display(df_hours_loaded.head())
    display(df_absent_loaded.head())
    display(df_wk_loaded.head())
    display(df_reg_loaded.head())
    display(df_holiday_loaded.head())
    display(df_vacation_loaded.head())

    # 最初の2行をドロップする関数
    # 最初の一行を落とすように変更
    def drop_initial_rows(df):
        return df.drop([0]).reset_index(drop=True)

    # 月を1つ戻す関数
    def adjust_month(year_month):
        year, month = map(int, year_month.split("-"))
        if month == 1:
            year -= 1
            month = 12
        else:
            month -= 1
        return f"{year}-{month}"

    # データフレームの調整
    df_hours = drop_initial_rows(df_hours_loaded)
    df_reg = drop_initial_rows(df_reg_loaded)
    df_holiday = drop_initial_rows(df_holiday_loaded)

    df_hours = df_hours.dropna(thresh=len(df_hours.columns) - 3)

    for df in [df_hours, df_reg, df_holiday]:
        df["年月"] = df["年月"].apply(adjust_month)

    # 「月」の列を形成する関数
    def assign_month(df):
        df["月"] = df["年月"].apply(lambda x: int(x.split("-")[1]))
        return df

    # 月の列を作成
    df_hours = assign_month(df_hours)
    columns_to_remove = ["私用外出回数", "宿直回数", "出向回数", "夜勤回数", "テレワーク回数"]
    df_hours = df_hours.drop(columns=columns_to_remove)

    # df_wkのプロジェクト名称をスラッシュごとに分割する
    split_columns = df_wk_loaded["プロジェクト名称"].str.split("/", expand=True)
    new_column_names = [f"業務内容_{i+1}" for i in range(split_columns.shape[1])]
    df_wk = df_wk_loaded.drop(columns=["プロジェクト名称", "プロジェクトコード"])
    split_columns.columns = new_column_names
    df_wk = pd.concat([df_wk, split_columns], axis=1)
    columns_order = ["日付", "就労所属名称", "社員番号", "社員名称"] + new_column_names + ["工数(ｈ)"]
    df_wk = df_wk[columns_order]

    # 工数合計時間を「(時間):(分)」形式に変換する関数
    def format_time(hours):
        total_minutes = int(round(hours * 60))
        hours = total_minutes // 60
        minutes = total_minutes % 60
        return f"{hours}:{minutes:02d}"

    # 分をHH:MMに変換
    def convert_minutes_to_time(minutes):
        hours = minutes // 60
        remaining_minutes = minutes % 60

        return f"{hours}:{remaining_minutes:02d}"

    # 時間を分に変換する関数
    def convert_time_to_minutes(time_str):
        if pd.isna(time_str) or time_str == "対象外":
            return 0
        hours, minutes = map(int, time_str.split(":"))
        return hours * 60 + minutes

    # 工数を集計する
    df_wk_sum = df_wk.copy()  # 工数の合計時間を集計するためのdfを作成
    df_wk_sum["工数(ｈ)"] = pd.to_numeric(df_wk_sum["工数(ｈ)"], errors="coerce")

    # 一部の条件を除き、工数時間を15分単位に丸め込む
    # 日付ごとの工数集計
    df_wk_sum = (
        df_wk_sum.groupby(["日付", "就労所属名称", "社員番号", "社員名称"])["工数(ｈ)"].sum().reset_index()
    )
    # 工数集計時間の表記を適切な形に修正
    df_wk_sum["工数(ｈ)"] = df_wk_sum["工数(ｈ)"].apply(format_time)

    # 稼働日の判定
    def determine_data():
        # 全体の人数を月ごとにカウント
        monthly_employee_counts = df_hours.groupby("年月").size()

        # 日付ごとに出勤した人数をカウント
        daily_work_counts = df_wk_sum.groupby("日付").size()

        # 日付ごとに稼働日かどうかを判定
        df_wk_sum["稼働日"] = df_wk_sum["日付"].map(
            lambda date: daily_work_counts[date]
            >= 0.6 * monthly_employee_counts[df_hours.loc[0, "年月"]]
        )
        return df_wk_sum

    # 稼働日の列を追加
    df_wk_sum = determine_data()
    df_wk_sum_det = determine_data()  # 日ごとの稼働日
    # df_wk_sum.to_excel("df_wk_sum.xlsx", index=False)

    # 丸め込んだ工数をもとに集計を行う
    df_wk_sum["工数(ｈ)"] = df_wk_sum["工数(ｈ)"].apply(convert_time_to_minutes)
    df_wk_sum["月"] = pd.to_datetime(df_wk_sum["日付"]).dt.month
    df_wk_sum = (
        df_wk_sum.groupby(["就労所属名称", "社員番号", "社員名称", "月"])["工数(ｈ)"].sum().reset_index()
    )
    # 工数は分単位に変換したので、HH:MM形式に変換する
    df_wk_sum["工数(ｈ)"] = df_wk_sum["工数(ｈ)"].apply(convert_minutes_to_time)

    # 工数集計時間の表記を適切な形に修正
    df_wk_sum["月の工数合計時間\n（労務時間）"] = df_wk_sum["工数(ｈ)"]
    df_wk_sum = df_wk_sum[["就労所属名称", "社員番号", "社員名称", "月", "月の工数合計時間\n（労務時間）"]]

    # 追加
    # 休暇日数、欠勤日数を集計
    df_vacation_loaded["日付"] = pd.to_datetime(df_vacation_loaded["日付"])
    df_absent_loaded["日付"] = pd.to_datetime(df_absent_loaded["日付"])
    df_vacation_loaded["月"] = df_vacation_loaded["日付"].dt.to_period("M")
    df_absent_loaded["月"] = df_absent_loaded["日付"].dt.to_period("M")

    df_vacation_loaded["利用日数"] = pd.to_numeric(
        df_vacation_loaded["利用日数"], errors="coerce"
    )
    df_absent_loaded["欠勤日数"] = pd.to_numeric(df_absent_loaded["欠勤日数"], errors="coerce")

    combined_df = pd.concat(
        [
            df_vacation_loaded[["社員番号", "社員名称", "月", "利用日数"]],
            df_absent_loaded[["社員番号", "社員名称", "月", "欠勤日数"]].rename(
                columns={"欠勤日数": "利用日数"}
            ),
        ]
    )

    monthly_summary = (
        combined_df.groupby(["社員番号", "社員名称", "月"])["利用日数"].sum().reset_index()
    )
    df_hours["月"] = df_hours["月"].apply(
        lambda x: pd.Period(year=2024, month=x, freq="M")
    )
    df_hours = df_hours.merge(
        monthly_summary,
        how="left",
        left_on=["社員番号", "社員名称", "月"],
        right_on=["社員番号", "社員名称", "月"],
    )
    df_hours.rename(columns={"利用日数": "月の休暇日数"}, inplace=True)
    df_hours["月の休暇日数"].fillna(0, inplace=True)
    df_hours["月"] = df_hours["月"].astype(str).str.split("-").str[1].astype(int)

    # HH:MM形式に統一する関数（0は0：00に）
    def convert_to_hours_minutes(value):
        if isinstance(value, str) and ":" in value:
            return value
        try:
            value = float(value)
        except ValueError:
            return "0:00"
        return format_time(value)

    # 集計時間をマージ
    df_hours = pd.merge(
        df_hours, df_wk_sum, on=["就労所属名称", "社員番号", "社員名称", "月"], how="left"
    )
    df_hours["月の工数合計時間\n（労務時間）"].fillna(0, inplace=True)

    df_hours["月の工数合計時間\n（労務時間）"] = df_hours["月の工数合計時間\n（労務時間）"].apply(
        convert_to_hours_minutes
    )

    # df_hours["月の休暇日数（時間）"] = df_hours["月の休暇日数"].apply(convert_days_to_hours_minutes)
    # 日数（休暇日数）をHH:MM形式に変換する関数
    def convert_days_to_hours_minutes(days):
        hours_per_day = 8
        total_minutes = int(days * hours_per_day * 60)
        hours = total_minutes // 60
        minutes = total_minutes % 60
        return f"{hours:02}:{minutes:02}"

    df_hours["月の休暇日数（時間）"] = df_hours["月の休暇日数"].apply(convert_days_to_hours_minutes)
    df_hours = df_hours.dropna(thresh=len(df_hours.columns) - 3)

    # 安衛法のデータをマージ
    df_hours = df_hours.merge(
        df_reg[["社員名称", "年月", "総労働時間", "月の法定限度時間", "安衛法超過時間"]],
        on=["社員名称", "年月"],
        how="left",
    )

    # 列の並び替え
    new_order = [
        "年月",
        "月",
        "就労所属名称",
        "社員番号",
        "社員名称",
        "所定労働時間",
        "フレックス\n総労働時間",
        "残業時間",
        "フレックス\n超過時間",
        "所定休日出勤時間",
        "法定休日出勤時間",
        "深夜時間",
        "減時間",
        "フレックス\n不足時間",
        "遅刻回数",
        "早退回数",
        "月の休暇日数",
        "月の休暇日数（時間）",
        "月の法定限度時間",
        "安衛法超過時間",
        "月の工数合計時間\n（労務時間）",
        "総労働時間",
    ]
    df_hours = df_hours[new_order]

    # 指定した月の稼働日数を数える関数
    def count_working_days(df_wk_sum, year_month, employee_name):
        df_wk_sum["月"] = pd.to_datetime(df_wk_sum["日付"]).dt.month
        # 対象月のフィルタリング
        df_filtered = df_wk_sum[
            (df_wk_sum["月"] == year_month) & (df_wk_sum["社員名称"] == employee_name)
        ]
        # 稼働日（TRUE）の数を数える
        working_days_count = df_filtered["稼働日"].sum()
        return working_days_count

    # フレックス制を含めて、その月の残業時間を算出する関数
    def get_overtime(row):
        #  フレックス制ではない場合
        if row["所定労働時間"] != "対象外":
            total_working_minutes = convert_time_to_minutes(row["総労働時間"])
            scheduled_working_minutes = convert_time_to_minutes(row["所定労働時間"])
            fixed_holiday_work_minutes = convert_time_to_minutes(row["所定休日出勤時間"])
            legal_holiday_work_minutes = convert_time_to_minutes(row["法定休日出勤時間"])

            overtime_minutes = (
                total_working_minutes
                - scheduled_working_minutes
                - fixed_holiday_work_minutes
                - legal_holiday_work_minutes
            )
            return convert_minutes_to_time(overtime_minutes)

        #  フレックス制の場合
        else:
            year_month = row["月"]  # 対象の月
            employee_name = row["社員名称"]  # 対象の社員名称
            # 対象の社員について、その日の稼働日数を計算する
            working_days = count_working_days(df_wk_sum_det, year_month, employee_name)
            # 稼働日数に応じて、その月の所定労働時間を計算する
            monthly_scheduled_minutes = working_days * 8 * 60

            total_working_minutes = convert_time_to_minutes(row["月の工数合計時間\n（労務時間）"])
            # 労務時間から、現時点での所定労働時間を引いて、超過している時間を算出する
            overtime_minutes = total_working_minutes - monthly_scheduled_minutes
            # マイナスの時間を0に丸め込む
            overtime_minutes = max(overtime_minutes, 0)

            return convert_minutes_to_time(overtime_minutes)

    # フレックス制の時間/稼働日を考慮して、その月の残業時間を算出する
    df_hours["フレックス制を含むその月の残業時間"] = df_hours.apply(get_overtime, axis=1)

    # 不要な列を削除
    df_hours = df_hours.drop(columns=["月の休暇日数", "月の休暇日数（時間）"])

    return df_wk, df_hours

In [None]:
df_wk, df_hours = read_df()

In [None]:
df_hours

In [None]:
df_hours.to_excel("df_hours.xlsx", index=False)

### df_hoursから、工数合計と総労働時間がズレているデータで、2種類のdfに分割する

In [None]:
def separate_df(df_hours):
    # HH:MMを分に変換する関数
    def convert_time_to_minutes(time_str):
        if pd.isna(time_str) or time_str == "対象外":
            return 0
        hours, minutes = map(int, time_str.split(":"))
        return hours * 60 + minutes

    # 比較のため、列のデータを分に変換する
    df_hours["月の工数合計時間\n（労務時間）_分"] = df_hours["月の工数合計時間\n（労務時間）"].apply(
        convert_time_to_minutes
    )
    df_hours["総労働時間_分"] = df_hours["総労働時間"].apply(convert_time_to_minutes)

    # 値が違うデータを別のdfに格納
    df_diff = df_hours[df_hours["月の工数合計時間\n（労務時間）_分"] != df_hours["総労働時間_分"]]

    # 減時間に値があるかどうかで分割する
    df1 = df_diff[(df_diff["減時間"] != "0:00") & (df_diff["減時間"] != "対象外")].reset_index(
        drop=True
    )
    df2 = df_diff[(df_diff["減時間"] == "0:00") | (df_diff["減時間"] == "対象外")].reset_index(
        drop=True
    )

    # 不要な列を削除
    df1 = df1.drop(columns=["月の工数合計時間\n（労務時間）_分", "総労働時間_分"])
    df2 = df2.drop(columns=["月の工数合計時間\n（労務時間）_分", "総労働時間_分"])

    return df1, df2

### コードを実行

In [None]:
df1, df2 = separate_df(df_hours)

In [None]:
df1

In [None]:
df2

### 現状の業務負荷、個人と課の算出、36協定基準で

In [None]:
def calc_hardwk(df_hours):
    # 工数合計時間を「(時間):(分)」形式に変換する関数
    def format_time(hours):
        total_minutes = int(round(hours * 60))
        hours = total_minutes // 60
        minutes = total_minutes % 60
        return f"{hours}:{minutes:02d}"

    # 分をHH:MMに変換
    def convert_minutes_to_time(minutes):
        hours = minutes // 60
        remaining_minutes = minutes % 60

        return f"{hours}:{remaining_minutes:02d}"

    # 時間を分に変換する関数
    def convert_time_to_minutes(time_str):
        if pd.isna(time_str) or time_str == "対象外":
            return 0
        hours, minutes = map(int, time_str.split(":"))
        return hours * 60 + minutes

    # 「月」の列を形成する関数
    def assign_month(df):
        df["月"] = df["年月"].apply(lambda x: int(x.split("-")[1]))
        return df

    # df_memberの作成
    # 安衛法の情報と、超過回数の記録

    # "安衛法超過時間"が0:00以外の行をフィルタリング
    # df_filtered = df_hours[df_hours["安衛法超過時間"] != "0:00"].copy()

    # 最新の年月を取得
    latest_month = df_hours["月"].max()

    # 最新の月にフィルタリング
    df_filtered = df_hours[df_hours["月"] == latest_month].copy()

    # 安衛法超過時間を分に変換し、45時間（2700分）以上の行のみフィルタリング
    df_filtered["安衛法超過時間_分"] = df_filtered["安衛法超過時間"].apply(convert_time_to_minutes)
    df_filtered_with_overtime = df_filtered[df_filtered["安衛法超過時間_分"] >= 2700].copy()

    # 超過回数をカウントし、45時間以上の場合は1回追加
    df_filtered["安衛法超過時間_分"] = df_filtered["安衛法超過時間"].apply(convert_time_to_minutes)
    df_filtered["超過回数"] = df_filtered["安衛法超過時間_分"].apply(
        lambda x: 1 if x > 45 * 60 else 0
    )
    df_filtered["超過回数"] = df_filtered.groupby(["社員番号"])["超過回数"].cumsum()

    # 月ごとに集計し、必要な行情報と累積回数を取得
    df_member = df_filtered.groupby(["就労所属名称", "社員番号", "社員名称"], as_index=False).agg(
        {
            "総労働時間": "first",
            "月の法定限度時間": "first",
            "安衛法超過時間": "first",
            "超過回数": "last",  # 累積回数を取得
        }
    )

    # df_secの作成

    # "フレックス制を含むその月の残業時間"を分に変換して、5時間以下の人数をカウント
    df_hours["フレックス残業時間_分"] = df_hours["フレックス制を含むその月の残業時間"].apply(
        convert_time_to_minutes
    )
    df_hours["残業時間が5時間以下の人数"] = df_hours["フレックス残業時間_分"] <= 5 * 60

    # df_secを作成し、課ごとに残業時間を集計
    # df_hours["残業時間_分"] = df_hours["残業時間"].apply(convert_time_to_minutes)
    df_sec = (
        df_hours[df_hours["月"] == latest_month]
        .groupby(["年月", "就労所属名称"], as_index=False)
        .agg(
            # df_sec = df_hours.groupby(["年月", "就労所属名称"], as_index=False).agg(
            {
                "フレックス残業時間_分": "sum",
                "社員番号": "count",
                "残業時間が5時間以下の人数": "sum",
            }  # 組織人数をカウント
        )
    )

    # 平均残業時間を計算
    df_sec["平均残業時間_分"] = (df_sec["フレックス残業時間_分"] / df_sec["社員番号"]).astype(int)

    # 組織人数の列名を変更
    df_sec.rename(columns={"社員番号": "組織人数"}, inplace=True)

    # 分をHH:MM表記に変換
    df_sec["課の合計残業時間"] = df_sec["フレックス残業時間_分"].apply(convert_minutes_to_time)
    df_sec["平均残業時間"] = df_sec["平均残業時間_分"].apply(convert_minutes_to_time)

    # 不要な列を削除
    df_sec = df_sec.drop(columns=["年月", "フレックス残業時間_分", "平均残業時間_分"])

    return df_member, df_sec

In [None]:
df_member, df_sec = calc_hardwk(df_hours)

In [None]:
df_member

In [None]:
df_sec

​    df_member,df_sec=calc_hardwk(df_work) # 現状の業務負荷、個人と課の算出、36協定基準で



EVA_LIMIT_OVERWORK=35

今日の日付から、　limit_overwork=   int（今日の日/今月の最終日　＊EVA_LIMIT_OVERWORK）

今月の超過者のdf

このdfに36協定の対象期間情報、超過回数も入れる





上記の超過者dfいる組織（課）の全体の負荷　残業時間、組織人数、課内5ｈ残業以下の人の人数、平均残業時間、





dt = datetime.now()
dt_day, dt_month, dt_year, dt_week = dt.day, dt.month, dt.year, dt.weekday()

# 基準となる時間

# 現在時間

print((now_time := datetime.now().time()))

    def get_last_day_of_month(year, month):
        import calendar
        # 月の最終日を取得
        last_day = calendar.monthrange(year, month)[1]
        return last_day