# 采样时间安排测试


In [None]:
from datetime import time
import random

import pandas as pd
from nptyping import DataFrame
from occupational_health_module.occupational_health import OccupationalHealthItemInfo
from occupational_health_module.other_infos import templates_info

In [None]:
company_name: str = 'MSCN'
project_name: str = '23ZXP0000'

file_path: str = r'./templates/项目信息试验模板2.xlsx'
point_info_df: DataFrame = pd.read_excel(file_path, sheet_name='定点') # type: ignore
personnel_info_df: DataFrame = pd.read_excel(file_path, sheet_name='个体') # type: ignore

In [None]:
# point_info_df['采样日程'].str.split('|')

In [None]:
new_project = OccupationalHealthItemInfo(company_name, project_name, point_info_df, personnel_info_df)

In [None]:
test_raw_point_df = (
    new_project
    .output_deleterious_substance_info_dict
    ['1']['定点']
)

test_raw_point_df['采样批次'] = (
    test_raw_point_df['采样数量/天']
    .apply(lambda x: list(range(1, x + 1)))
)

available_cols: list[str] = [
    '采样点编号',
    '单元',
    '检测地点',
    '工种',
    '日接触时间',
    '检测因素',
    '收集方式',
    '定点采样时间',
    '采样数量/天',
    '采样日程',
    '采样批次',
]

test_point_df = (
    test_raw_point_df.explode('采样批次')
    .sort_values(['采样点编号'])
    .reset_index(drop=True)
    [available_cols]
)

test_point_df.head()

In [None]:
instruments_raw = {
    '收集方式': ['大气', '大气', '大气'],
    '代号': ['Q1', 'Q2', 'Q3'],
    '端口数': ['2', '2', '2'],
    '启动时间': [
        time(8, 0, 0),
        time(8, 0, 0),
        time(8, 0, 0),
        ],
}

instrument_df: DataFrame = pd.DataFrame(data=instruments_raw).set_index('代号')
instrument_df['端口'] = instrument_df['端口数'].apply(lambda x: list(range(1, int(x) + 1)))
# instrument_df = instrument_df.explode('端口')

instrument_df.head()

In [None]:
def initialize_instruments(instrument_df: DataFrame):
    instrument_df = instrument_df.assign(是否完成=False, 上一个采样点=0)

    return instrument_df

initialize_instruments(instrument_df)

In [None]:
class SampleScheduleManage():
    '''采样时间安排'''
    def __init__(self, raw_work_df: DataFrame, instruments: dict) -> None:
        self.instruments: dict = instruments # 仪器字典
        self.work_df: DataFrame = self.initialize_work_df(raw_work_df)
        pass

    def initialize_work_df(self, raw_work_df: DataFrame) -> DataFrame:
        new_false_col = pd.Series([False] * len(raw_work_df), name='是否完成')
        raw_work_df = raw_work_df.assign(是否完成=new_false_col)
        raw_work_df['采样批次'] = (
            raw_work_df['采样数量/天']
            .apply(lambda x: list(range(1, x + 1)))
        )

        available_cols: list[str] = [
            '采样点编号',
            '单元',
            '检测地点',
            '工种',
            '日接触时间',
            '检测因素',
            '收集方式',
            '定点采样时间',
            '采样数量/天',
            '采样日程',
            '采样批次',
        ]

        work_df: DataFrame = (
            raw_work_df.explode('采样批次')
            .sort_values(['采样点编号'])
            .reset_index(drop=True)
            [available_cols]
        )
        return work_df

In [None]:
class GasSampler():
    """
    Gas Sampler
    """
    def __init__(
            self,
            sample_type: str,
            ports: int,
            code_name: str,
            startup_time: time
        ) -> None:
        self.code_name: str = code_name
        self.name: str = '大气采样器'
        self.sample_type: str = sample_type
        self.ports: list[int] = list(range(1, ports + 1))
        self.startup_time: time = startup_time
        self.last_point: int = 0
        self.last_sample_time: int = 0
        self.is_sample: bool = True
    
    def judge_is_sample(self, df: DataFrame) -> None:
        '''是否可以采样'''
        df_sample_count: int = (
            df
            .query(f'收集方式 == {self.sample_type} and 是否完成 == False')
            .shape[0]
        )
        if df_sample_count > 0:
            pass
        else:
            self.is_sample = False
    
    def select_sample_point(self, df: DataFrame):
        '''选择采样点'''
        # 所有采样点
        sample_points: list[int] = df['采样点编号'].drop_duplicates().tolist()
        # [ ] 判断上一个采样点是否有符合要求的采样点
        # is_continue_sample: bool = (
        #     self.is_sample
        #     and

        # )
        if self.is_sample:
            if self.last_point == 0:
                new_point: int = random.choice(sample_points)
                self.last_point = new_point
                pass
            pass
    
    def do_sample(self, df: DataFrame):
        '''采样'''
        pass


In [None]:
# Q1 = GasSampler('Q1', startup_time)

In [None]:
times_1_df: DataFrame = test_point_df[test_point_df['采样批次'] == 1]#.reset_index(drop=True)
new_false_col = pd.Series([False] * len(times_1_df), name='是否完成')
times_1_df = times_1_df.assign(是否完成=new_false_col)

times_1_df.head()

In [None]:
times_1_df.query('收集方式 == "大气"')['采样点编号'].drop_duplicates().tolist()

In [None]:
sample_points: list[int] = times_1_df['采样点编号'].drop_duplicates().tolist()

sample_points

In [None]:
current_point: int = random.choice(sample_points)
current_point

In [None]:
for n in sample_points:
    # 当前点位的df
    current_df: DataFrame = times_1_df.query(f'采样点编号 == {n}')
    # current_df.head()
    # current_df['收集方式'].value_counts()#['大气']
    # 当前点位的大气检测因素的数量
    current_count = current_df['收集方式'].value_counts()
    # print(current_count)
    if '大气' in current_count.index:
        print(n, current_count['大气'])
    else:
        print(n, 0)