In [2]:
from pydantic import (
    BaseModel,
    NegativeInt,
    PositiveInt,
    conint,
    conlist,
    constr,
    ValidationError,
    Field,
    condecimal,
    validator,
    root_validator,
)
from typing import Optional
from decimal import Decimal
from datetime import datetime
from ami import constant, func, lp_config

In [3]:
class header_map(BaseModel):
    rec_no: int
    version: int
    read: int
    kwh: condecimal(max_digits=14, decimal_places=4)
    time: int

    @validator("rec_no", "version", "read")
    def neg_num(cls, v):
        if v < 0:
            raise ValueError("W20006")  # 小於0
        
    @validator("time")
    def int_db_limit(cls, v):
        if v > 2147483647:
            raise ValueError("W20006")

    @validator("rec_no")
    def rt_count_zero(cls, v):
        if v == 0:
            raise ValueError("W23001")


In [4]:
try:
    L = {"rec_no": 0, "version": 123, "read": 2023, "kwh": 123456789012345, "time": 2147483648}
    header_map(**L)

except ValidationError as e:
    for v in e.errors():
        # print(e.errors())
        try:
            type_cd = constant.warn_cd[v["type"]]["code"]
            print(type_cd)
            column = v["loc"][0]
            o_val = L[column]
            new_val = constant.warn_cd[v["type"]]["func"](o_val)
        except Exception:
            type_cd = v["msg"]
            column = v["loc"][0]
            o_val = L[column]
            try:
                # 格式修訂
                new_val = constant.warn_cd[v["msg"]]["func"](o_val)
                L[column] = new_val
            except Exception:
                warn_log = lp_config.WarnLog(
                    type_cd=type_cd,
                    col_nm=column,
                    log_data_time=datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
                )

            warn_log = lp_config.WarnLog(
                type_cd=type_cd,
                col_nm=column,
                log_data_time=datetime.now().strftime("%Y-%m-%d %H:%M:%S")
            )
        print(warn_log)
    print(L)


file_type=None raw_gzfile=None raw_file=None rec_time=None file_path=None file_seqno=None source=None read_group=None meter_id=None read_time=None type_cd='ensure that there are no more than 14 digits in total' col_nm='kwh' rt_count=None log_data_time='2023-06-08 15:44:19'
file_type=None raw_gzfile=None raw_file=None rec_time=None file_path=None file_seqno=None source=None read_group=None meter_id=None read_time=None type_cd='W20006' col_nm='time' rt_count=None log_data_time='2023-06-08 15:44:19'
{'rec_no': 0, 'version': 123, 'read': 2023, 'kwh': 123456789012345, 'time': 2147483648}


In [14]:
import math

def truncate_decimal(input_value):
    int_part = int(input_value)
    int_len = len(str(int_part))
    int_fix = str(int_part)[-5:]

    dec_len = len(str(input_value)[int_len+1:])
    dec_fix = str(input_value)[int_len+1:][:4]
    print(dec_fix)
    input_fix = float(int_fix+'.'+dec_fix)

    if int_len > 5 or dec_len > 4:
        return input_fix
    else:
        return input_value

truncate_decimal(456.1234546546)


1234


456.1234

In [25]:
from datetime import datetime

date_str = "2022-11-18 07:00:00"
timestamp = int(datetime.strptime(date_str, "%Y-%m-%d %H:%M:%S").timestamp())

print(timestamp)


1668726000


In [30]:
import uuid

def generate_uuid_from_string(input_string):
    namespace = uuid.NAMESPACE_DNS  # 選擇名稱空間
    return str(uuid.uuid5(namespace, input_string))

# 將指定的字串轉換為 UUID
uuid_value = generate_uuid_from_string("WT20069447_1111111111")
print(uuid_value)


00e7c09e-9fee-5c52-b62a-c60330b52452


In [62]:
import csv

csv_string = '''
FAN_ID,SOURCE,CUST_ID,METER_ID,READ_TIME,REC_NO,DEL_KWH,REC_KWH,DEL_KVARH_LAG,DEL_KVARH_LEAD,REC_KVARH_LAG,REC_KVARH_LEAD,INTERVAL,NOTE
,HES-TMAP20210525,08093497036,WB15001459,2022-03-01T00:15:00,,0.0006,0,0.0020,0,0,0,1,1
,HES-TMAP20210525,08093497036,WB15001459,2022-03-01T00:30:00,,0.0006,0,0.0019,0,0,0,1,1
,HES-TMAP20210525,08093497036,WB15001459,2022-03-01T00:45:00,,0.0006,0,0.0020,0,0,0,1,1
,HES-TMAP20210525,08093497036,WB15001459,2022-03-01T01:00:00,,0.0007,0,0.0019,0,0,0,1,1
,HES-TMAP20210525,08093497036,WB15001459,2022-03-01T01:15:00,,0.0006,0,0.0020,0,0,0,1,1
'''

reader = csv.DictReader(csv_string.splitlines())
data_list = list(reader)

# 將每個字典的值轉為空字串（如果值為 None，則轉換為空字串）
data_dict = {k: "" if v is None else v for k, v in data_list[0].items()}

print(data_dict)


{None: ['FAN_ID', 'SOURCE', 'CUST_ID', 'METER_ID', 'READ_TIME', 'REC_NO', 'DEL_KWH', 'REC_KWH', 'DEL_KVARH_LAG', 'DEL_KVARH_LEAD', 'REC_KVARH_LAG', 'REC_KVARH_LEAD', 'INTERVAL', 'NOTE']}


In [41]:
import csv
from io import StringIO
import itertools

csv_string = '''
FAN_ID,SOURCE,CUST_ID,METER_ID,READ_TIME,REC_NO,DEL_KWH,REC_KWH,DEL_KVARH_LAG,DEL_KVARH_LEAD,REC_KVARH_LAG,REC_KVARH_LEAD,INTERVAL,NOTE
,HES-TMAP20210525,08093497036,WB15001459,2022-03-01T00:15:00,,0.0006,0,0.0020,0,0,0,1
,HES-TMAP20210525,08093497036,WB15001459,2022-03-01T00:30:00,,0.0006,0,0.0019,0,0,0
,HES-TMAP20210525,08093497036,WB15001459,2022-03-01T00:45:00,,0.0006,0,0,0,0,1,1
,HES-TMAP20210525,08093497036,WB15001459,2022-03-01T01:00:00,,0.0007,0,0.0019,0,0,0,1,1
,HES-TMAP20210525,08093497036,WB15001459,2022-03-01T01:15:00,,0.0006,0,0.0020,0,0,0,1,1
'''.strip() # 加上 .strip() 去除起始和結束的換行字符



expected_fields = ["FAN_ID","SOURCE","CUST_ID","METER_ID","READ_TIME","REC_NO","DEL_KWH","REC_KWH","DEL_KVARH_LAG","DEL_KVARH_LEAD","REC_KVARH_LAG","REC_KVARH_LEAD","INTERVAL","NOTE"]

f = StringIO(csv_string)

header_reader = csv.reader(f)


# row = next(header_reader)
# for row in header_reader:
#     print(len(row)==len(expected_fields))  

# # 檢查欄位是否一致
# if row == expected_fields:
#     print("欄位一致")
# else:
#     print("欄位不一致")

f.seek(0)  # 將檔案指標歸零

reader = csv.DictReader(f)
data_list = list(reader)
# print(data_list)

def check_dict_for_none_values(data):
    return any(value is None for value in data.values())

print(len(data_list))
for i, dict in enumerate(data_list):
    # if any(value is None for value in dict.values()):
    print(dict)


5
{'FAN_ID': '', 'SOURCE': 'HES-TMAP20210525', 'CUST_ID': '08093497036', 'METER_ID': 'WB15001459', 'READ_TIME': '2022-03-01T00:15:00', 'REC_NO': '', 'DEL_KWH': '0.0006', 'REC_KWH': '0', 'DEL_KVARH_LAG': '0.0020', 'DEL_KVARH_LEAD': '0', 'REC_KVARH_LAG': '0', 'REC_KVARH_LEAD': '0', 'INTERVAL': '1', 'NOTE': None}
{'FAN_ID': '', 'SOURCE': 'HES-TMAP20210525', 'CUST_ID': '08093497036', 'METER_ID': 'WB15001459', 'READ_TIME': '2022-03-01T00:30:00', 'REC_NO': '', 'DEL_KWH': '0.0006', 'REC_KWH': '0', 'DEL_KVARH_LAG': '0.0019', 'DEL_KVARH_LEAD': '0', 'REC_KVARH_LAG': '0', 'REC_KVARH_LEAD': '0', 'INTERVAL': None, 'NOTE': None}
{'FAN_ID': '', 'SOURCE': 'HES-TMAP20210525', 'CUST_ID': '08093497036', 'METER_ID': 'WB15001459', 'READ_TIME': '2022-03-01T00:45:00', 'REC_NO': '', 'DEL_KWH': '0.0006', 'REC_KWH': '0', 'DEL_KVARH_LAG': '0', 'DEL_KVARH_LEAD': '0', 'REC_KVARH_LAG': '0', 'REC_KVARH_LEAD': '1', 'INTERVAL': '1', 'NOTE': None}
{'FAN_ID': '', 'SOURCE': 'HES-TMAP20210525', 'CUST_ID': '08093497036', '

In [10]:
def count_filled_fields(data):
    fields_to_check = ['DEL_KWH', 'REC_KWH', 'DEL_KVARH_LAG', 'DEL_KVARH_LEAD', 'REC_KVARH_LAG', 'REC_KVARH_LEAD']
    return sum(1 for field in fields_to_check if field in data and data[field])


data = {
    'DEL_KWH': 123,
    'DEL_KVARH_LAG': 456,
    'DEL_KVARH_LEAD': 789,
    'OTHER_FIELD': 'abc',
    'test': 1,
    "try": 2

}

print(len(data))

result = count_filled_fields(data)
print(result)  # 輸出 2



6
3


In [21]:
def check_dict_for_none_values(data):
    return any(value is None for value in data.values())

data1 = {'a': 1, 'b': 2, 'c': None}
result1 = check_dict_for_none_values(data1)
print(result1)  # 輸出 True

data2 = {'x': 'abc', 'y': 123, 'z': 'xyz'}
result2 = check_dict_for_none_values(data2)
print(result2)  # 輸出 False


True
False


In [39]:
dict1 = {"key1": "value1", "key2": "value2"}
dict2 = {"key2": "new_value", "key3": "value3"}
dict1["key3"] = "value3"

# dict1.update(dict2)
print(dict1)

print(hash("WT20069447")%4)

{'key1': 'value1', 'key2': 'value2', 'key3': 'value3'}
3


20230526 17:02:35


TypeError: 'str' object is not callable