In [30]:
import os

from google.colab import drive
drive.mount('/content/drive')


Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [2]:
import pandas as pd
pd.set_option('mode.chained_assignment', None)

import numpy as np

colnames = ['Unnamed:0', '章碼', '章名', '6碼', '6碼名稱', '7碼', '7碼名稱', '8碼', '8碼名稱', '9碼',
       '9碼名稱', '10碼', '10碼名稱', '備註']

def get_field_code(df):
    field = df[['章碼', '章名']]
    code = '{:05}'.format(int(field['章碼'][0]))
    name = field['章名'][0]
    return code, name

def get_culumns(df):
    column = []
    for i in range(6, 11):
        field = df[f'{i}碼名稱']
        column.append(field[0])
    return column

def parse_code(code):
    if isinstance(code, np.floating) or isinstance(code, float):
        return str(int(code))
    return str(code)

def append_items(df, code, lis):
    for i in range(6, 11):
        field = df.iloc[1:][[f'{i}碼',f'{i}碼名稱']]
        field = field[pd.notna(field[f'{i}碼'])]
        field[f'{i}碼名稱'] = field[f'{i}碼名稱'].fillna('')
        for j, row in field.iterrows():
            lis.append([code, j, i, parse_code(row[f'{i}碼']), row[f'{i}碼名稱']])



def get_group(df, th):
    field = df[[f'{th}碼', f'{th}碼名稱']]
    group_info = []
    continue_lis = []
    for i in range(len(field)):
        if i in continue_lis:
            continue
        if pd.notna(field.iloc[i][f'{th}碼']):
            start_idx = i
            group = []
            idx = start_idx
            contain_zero = False
            while idx <len(field) and (pd.notna(field.iloc[idx][f'{th}碼'])):
                # print(type(field.iloc[idx][f'{th}碼']))

                if isinstance(field.iloc[idx][f'{th}碼'], np.floating):
                    code = str(int(field.iloc[idx][f'{th}碼']))
                else:
                    code = str(field.iloc[idx][f'{th}碼'])

                if code == '0' and not contain_zero:
                    contain_zero = True
                elif code == '0' and  contain_zero:
                    break

                group.append({
                    'code': code,
                    'name': field.iloc[idx][f'{th}碼名稱'],
                    'idx': idx,
                    'digit': th,
                })
                continue_lis.append(idx)
                idx += 1
            group_info.append({
                'start_idx': start_idx,
                'group': group
            })

    if len(group_info) == 0:
      return group_info

    for i, group in enumerate(group_info[:-1]):
        group_info[i]['end_idx'] = group_info[i+1]['start_idx']-1
    group_info[-1]['end_idx'] = 999

    return group_info

def cross_map(chapter, ml, mr):
    links = []
    for il in ml:
        for ir in mr:
            links.append([chapter, il['idx'], il['digit'], il['code'], il['name'], ir['idx'], ir['digit'], ir['code'], ir['name']])
    return links

def get_link(df, code, col1, col2, exam=False):
    if exam:
        print('========================')
    info_l = get_group(df, col1)
    info_r = get_group(df, col2)
    while len(info_r)==0 and col2 <= 9:
      col2 += 1
      info_r = get_group(df, col2)

    if len(info_l)==0 or col2 >= 10:
      return []

    gp_idx = 0
    links = []

    for g in info_l:
        if exam:
            print(f'''left : {g['start_idx']} ~ {g['end_idx']}''')
        map_left = g['group']
        map_idx = g['end_idx']

        map_right = info_r[gp_idx]['group']
        tmp_idx = gp_idx
        while tmp_idx+1 < len(info_r) and info_r[tmp_idx+1]['end_idx'] <= map_idx:
            map_right +=  info_r[tmp_idx+1]['group']
            tmp_idx += 1

        if exam:
            print(f'''right : {info_r[gp_idx]['start_idx']} ~ {info_r[tmp_idx]['end_idx']}''')

        new_links = cross_map(code, map_left, map_right)
        if exam:
            print(len(new_links))
        links += new_links
        if map_idx == info_r[tmp_idx]['end_idx']:
            gp_idx = tmp_idx+1
    # print(len(links))
    return links

def get_head_link(df, code, name, exam=False):

    info_r = get_group(df, 6)
    map_right = []
    for g in info_r:
        map_right +=  g['group']
    map_left = [{
        'idx': 0,
        'digit': 5,
        'name': name,
        'code': code,
    }]
    links = cross_map(code, map_left, map_right)
    if exam:
        print(len(links))
    return links

def map_link(infile, outfile, exam=False, return_df=False):

    df = pd.read_excel(infile, skiprows=[0], names=colnames)
    field = df[['章碼', '章名']]
    code = '{:05}'.format(int(field['章碼'][0]))
    name = field['章名'][0]

    links = []
    links += get_head_link(df, code, name, exam=exam)
    links += get_link(df, code, 6, 7, exam=exam)
    links += get_link(df, code, 7, 8, exam=exam)
    links += get_link(df, code, 8, 9, exam=exam)

    df_out = pd.DataFrame(links, columns =['Chapter', 'Row', 'Digit', 'Code', 'Name', 'ChildRow', 'ChildDigit', 'ChildCode', 'ChildName'])
    if return_df:
      return df_out
    df_out.to_csv(outfile, index=False, encoding='utf-8-sig')

def parse_element(infile, outfile, return_df=False):
    df = pd.read_excel(infile, skiprows=[0], names=colnames)
    df_trans = pd.DataFrame(columns=['code', 'name'])

    element_data = []
    code, name = get_field_code(df)
    element_data.append([code, 0, 5, 'FieldCode', name])

    cols = get_culumns(df)
    for i, col in enumerate(cols):
        element_data.append([code, 0, i + 6, 'Column', col])

    append_items(df, code, element_data)
    df_out = pd.DataFrame(element_data, columns =['Chapter', 'Row', 'Digit', 'Code', 'Name'])

    if return_df:
        return df_out

    if outfile.endswith('csv'):
        df_out.to_csv(outfile, index=False, encoding='utf-8-sig')
    elif outfile.endswith('xlsx'):
        df_out.to_excel(outfile, index=False)


In [4]:
import os
all_xlsx = []
for subdir, dirs, files in os.walk('/content/drive/MyDrive/軟體小組/資料庫/工程會編碼/rawPCCES'):
    for file in files:
        filepath = subdir + os.sep + file
        if filepath.endswith(".xls") or filepath.endswith(".xlsx"):
            all_xlsx.append(filepath)

print(f"get total {len(all_xlsx)} file")



get total 854 file


In [12]:
import pandas as pd
import numpy as np

# Suppress chained assignment warnings
pd.set_option('mode.chained_assignment', None)

# Column names for data processing
COLUMN_NAMES = ['Unnamed:0', '章碼', '章名', '6碼', '6碼名稱', '7碼', '7碼名稱', '8碼', '8碼名稱', '9碼',
                '9碼名稱', '10碼', '10碼名稱', '備註']

def extract_field_info(df):
    """Extracts the chapter code and name from the dataframe."""
    code = f"{int(df['章碼'][0]):05}"
    name = df['章名'][0]
    return code, name

def extract_column_headers(df):
    """Extracts column headers for codes from 6碼名稱 to 10碼名稱."""
    return [df[f'{i}碼名稱'][0] for i in range(6, 11)]

def parse_code(value):
    """Converts a numerical value to string format, ensuring integer representation if applicable."""
    if isinstance(value, (np.floating, float)):
        return str(int(value))
    return str(value)

def append_items(df, code, records):
    """Appends item information to the records list."""
    for i in range(6, 11):
        field_data = df.iloc[1:][[f'{i}碼', f'{i}碼名稱']].dropna(subset=[f'{i}碼'])
        field_data[f'{i}碼名稱'] = field_data[f'{i}碼名稱'].fillna('')

        for index, row in field_data.iterrows():
            records.append([code, index, i, parse_code(row[f'{i}碼']), row[f'{i}碼名稱']])

def extract_groups(df, digit):
    """Extracts groups of codes based on the given digit level."""
    field = df[[f'{digit}碼', f'{digit}碼名稱']]
    groups, skip_indices = [], set()

    for i, row in field.iterrows():
        if i in skip_indices or pd.isna(row[f'{digit}碼']):
            continue

        group, idx, has_zero = [], i, False
        while idx < len(field) and pd.notna(field.iloc[idx][f'{digit}碼']):
            code = parse_code(field.iloc[idx][f'{digit}碼'])

            if code == '0':
                if has_zero:
                    break
                has_zero = True

            group.append({'code': code, 'name': field.iloc[idx][f'{digit}碼名稱'], 'idx': idx, 'digit': digit})
            skip_indices.add(idx)
            idx += 1

        groups.append({'start_idx': i, 'group': group})

    for i in range(len(groups) - 1):
        groups[i]['end_idx'] = groups[i + 1]['start_idx'] - 1
    if groups:
        groups[-1]['end_idx'] = len(field) - 1

    return groups

def cross_map(chapter, left_group, right_group):
    """Creates mappings between left and right code groups."""
    return [
        [chapter, l['idx'], l['digit'], l['code'], l['name'], r['idx'], r['digit'], r['code'], r['name']]
        for l in left_group for r in right_group
    ]

def generate_links(df, code, left_digit, right_digit, verbose=False):
    """Generates linked mapping between two digit levels."""
    left_groups, right_groups = extract_groups(df, left_digit), extract_groups(df, right_digit)
    while not right_groups and right_digit <= 9:
        right_digit += 1
        right_groups = extract_groups(df, right_digit)

    if not left_groups or right_digit >= 10:
        return []

    links, right_idx = [], 0
    for left_group in left_groups:
        map_left = left_group['group']
        map_right = right_groups[right_idx]['group']
        tmp_idx = right_idx

        while tmp_idx + 1 < len(right_groups) and right_groups[tmp_idx + 1]['end_idx'] <= left_group['end_idx']:
            map_right += right_groups[tmp_idx + 1]['group']
            tmp_idx += 1

        links.extend(cross_map(code, map_left, map_right))
        if left_group['end_idx'] == right_groups[tmp_idx]['end_idx']:
            right_idx = tmp_idx + 1

    return links

def generate_head_links(df, code, name):
    """Generates top-level links for the field code."""
    right_groups = extract_groups(df, 6)
    map_right = [item for group in right_groups for item in group['group']]
    map_left = [{'idx': 0, 'digit': 5, 'name': name, 'code': code}]
    return cross_map(code, map_left, map_right)

def process_mapping(input_file, output_file, verbose=False, return_df=False):
    """Processes the mapping of codes from an input file and writes to an output file."""
    df = pd.read_excel(input_file, skiprows=[0], names=COLUMN_NAMES)
    code, name = extract_field_info(df)

    links = generate_head_links(df, code, name)
    for left, right in [(6, 7), (7, 8), (8, 9)]:
        links += generate_links(df, code, left, right, verbose)

    df_out = pd.DataFrame(links, columns=['Chapter', 'Row', 'Digit', 'Code', 'Name', 'ChildRow', 'ChildDigit', 'ChildCode', 'ChildName'])
    if return_df:
        return df_out
    df_out.to_csv(output_file, index=False, encoding='utf-8-sig')

def process_elements(input_file, output_file, return_df=False):
    """Processes elements from the input file and writes to the output file."""
    df = pd.read_excel(input_file, skiprows=[0], names=COLUMN_NAMES)
    code, name = extract_field_info(df)
    elements = [[code, 0, 5, 'FieldCode', name]]

    for i, col in enumerate(extract_column_headers(df), start=6):
        elements.append([code, 0, i, 'Column', col])
    print(elements)
    assert False

    append_items(df, code, elements)
    df_out = pd.DataFrame(elements, columns=['Chapter', 'Row', 'Digit', 'Code', 'Name'])

    if return_df:
        return df_out

    if output_file.endswith('csv'):
        df_out.to_csv(output_file, index=False, encoding='utf-8-sig')
    elif output_file.endswith('xlsx'):
        df_out.to_excel(output_file, index=False)


In [13]:
test_file = '/content/drive/MyDrive/軟體小組/資料庫/工程會編碼/rawPCCES/Code02-table/02983_codev20.xls'
process_elements(test_file, '', return_df=True)

[['02983', 0, 5, 'FieldCode', '機坪道肩之底層'], ['02983', 0, 6, 'Column', '材料'], ['02983', 0, 7, 'Column', '總厚度'], ['02983', 0, 8, 'Column', nan], ['02983', 0, 9, 'Column', nan], ['02983', 0, 10, 'Column', '估價用單位']]


AssertionError: 

In [None]:
df_element = []
df_link = []
success_file = []
for f in all_xlsx:
  try:
    df_element.append(parse_element(f, '', return_df=True))
    df_link.append(map_link(f, '', exam=False, return_df=True))
    success_file.append(f)
  except KeyError:
    print("KEY ERROR : ", f)
  except:
    print("OTHER ERROR : ", f)

   Unnamed:0      章碼       章名   6碼    6碼名稱   7碼   7碼名稱   8碼  8碼名稱   9碼  9碼名稱  \
0        NaN  2983.0  機坪道肩之底層  NaN      材料  NaN    總厚度  NaN   NaN  NaN   NaN   
1        NaN     NaN      NaN  0.0     NaN  0.0    NaN  0.0   NaN  0.0   NaN   
2        NaN     NaN      NaN  1.0  碎石級配粒料  1.0  200mm  NaN   NaN  NaN   NaN   
3        NaN     NaN      NaN  NaN     NaN  2.0  250mm  NaN   NaN  NaN   NaN   
4        NaN     NaN      NaN  NaN     NaN  3.0  300mm  NaN   NaN  NaN   NaN   

   10碼  10碼名稱        備註  
0  NaN  估價用單位       NaN  
1  1.0      M  0:不分類時使用  
2  2.0     M2       NaN  
3  3.0     M3       NaN  
4  4.0      式       NaN  
OTHER ERROR :  /content/drive/MyDrive/軟體小組/資料庫/工程會編碼/rawPCCES/Code02-table/02983_codev20.xls
  Unnamed:0      章碼            章名   6碼 6碼名稱   7碼    7碼名稱   8碼  8碼名稱   9碼  \
0         M  2054.0  借土區及採石場之材料生產  NaN  大類別  NaN      土壤  NaN   NaN  NaN   
1       NaN     NaN           NaN  0.0  NaN  0.0     NaN  0.0   NaN  0.0   
2       NaN     NaN           NaN  1.0  借土

In [24]:
element_path = "/content/drive/MyDrive/軟體小組/資料庫/工程會編碼/element.xlsx"
df = pd.read_excel(element_path)
df = df[df['Rank'] == 5]
df = df[df['Code'] != '0']
df = df.drop(columns=['Rank'])

df.head()
df.to_csv("/content/drive/MyDrive/軟體小組/資料庫/工程會編碼/unit.csv", index=False, encoding="utf-8")
df.to_excel("/content/drive/MyDrive/軟體小組/資料庫/工程會編碼/unit.xlsx", index=False)



In [33]:
df_path = "/content/drive/MyDrive/軟體小組/資料庫/工程會編碼/unit.xlsx"
df = pd.read_excel(df_path)
# 生成唯一键（拼接 Chapter 和 Code）
df["UniqueKey"] = df["Chapter"].astype(str) + "_" + df["Code"].astype(str)

# 找出重复的唯一键
duplicates = df[df.duplicated("UniqueKey", keep=False)]

for duplicate in duplicates.iterrows():
    print(duplicate)


(1288, Chapter             15511
Code                    1
Name         熱水供應量<50公升/時
UniqueKey         15511_1
Name: 1288, dtype: object)
(1289, Chapter                     15511
Code                            2
Name         50公升/時≦熱水供應量<100公升/時
UniqueKey                 15511_2
Name: 1289, dtype: object)
(1290, Chapter                      15511
Code                             3
Name         100公升/時≦熱水供應量<130公升/時
UniqueKey                  15511_3
Name: 1290, dtype: object)
(1291, Chapter                      15511
Code                             4
Name         130公升/時≦熱水供應量<160公升/時
UniqueKey                  15511_4
Name: 1291, dtype: object)
(1292, Chapter                      15511
Code                             5
Name         160公升/時≦熱水供應量<200公升/時
UniqueKey                  15511_5
Name: 1292, dtype: object)
(1293, Chapter                      15511
Code                             6
Name         200公升/時≦熱水供應量<280公升/時
UniqueKey                  15511_6
Name: 1293, dtype: obje