In [4]:
import os
import re
import json
import pandas as pd

def extract_text_from_sheet(sheet_df):
    text_list = []
    for value in sheet_df.values.flatten():
        if pd.notna(value) and isinstance(value, str):
            value = value.replace('\uff08', '(').replace('\uff09', ')').replace('\uff1a', ':')
            value = re.sub(r'\(.*?\)', '', value)
            text_list.extend([text.strip() for text in value.split('\n') if text.strip()])
    return text_list

def process_excel_to_json(file_path, output_folder):
    xl = pd.ExcelFile(file_path)
    base_name = os.path.splitext(os.path.basename(file_path))[0]
    specific_output_folder = os.path.join(output_folder, base_name)
    os.makedirs(specific_output_folder, exist_ok=True)
    
    all_text_data = {}
    for sheet_name in xl.sheet_names:
        if "Programming Details" in sheet_name:
            df = xl.parse(sheet_name)
            all_text_data["programming details"] = extract_text_from_sheet(df)
    
    if not all_text_data:
        return None
    
    json_output_path = os.path.join(specific_output_folder, 'input_data.json')
    with open(json_output_path, 'w') as json_file:
        json.dump(all_text_data, json_file, indent=4)
    
    return json_output_path

DevicesInSceneControl = {
    "Dimmer Type": [
        "KBSKTDIM", "D300IB", "D300IB2", "DH10VIB", 
        "DM300BH", "D0-10IB", "DDAL"
    ],
    "Relay Type": [
        "KBSKTREL", "S2400IB2", "RM1440BH", "KBSKTR", "Z2"
    ],
    "Curtain Type": [
        "C300IBH"
    ],
    "Fan Type": [
        "FC150A2"
    ],
    "RGB Type": [
        "KB8RGBG", "KB36RGBS", "KB9TWG", "KB12RGBD", 
        "KB12RGBG"
    ],
    "PowerPoint Type": {
        "Single-Way": [
            "H1PPWVBX"
        ],
        "Two-Way": [
            "K2PPHB", "H2PPHB", "H2PPWHB"
        ]
    }
}
 
# 判断场景设备输出格式
device_name_to_type = {} # key:设备名称; value: 设备类型

def reset_device_name_to_type():
    global device_name_to_type
    device_name_to_type = {}

def process_devices(split_data, output_folder):
    devices_content = split_data.get("devices", [])
    devices_data = []
    current_shortname = None # 当前型号

    global device_name_to_type

    for line in devices_content:
        line = line.strip() # 并去除行首和行尾的空白字符

        if line.startswith("NAME:"):
            current_shortname = line.replace("NAME:", "").strip()
            continue
        
        if line.startswith("QTY:"):
            continue

        device_type = None
        # dtype == key, models == value
        for dtype, models in DevicesInSceneControl.items():
            # models == dict? for powerpoint device
            if isinstance(models, dict):
                for sub_type, sub_models in models.items():
                    for model in sub_models:
                        if model in current_shortname or current_shortname in model:
                            device_type = f"{dtype} ({sub_type})"
                            break
                    if device_type:
                        break
            # models == list
            else:
                # 在型号列表中寻找匹配项
                for model in models:
                    if model in current_shortname or current_shortname in model:
                        device_type = dtype
                        break
            # 当前的dtype属于DevicesInSceneControl里的 ? break!
            if device_type:
                break

        if current_shortname:
            device_info = {
                "appearanceShortname": current_shortname,
                "deviceName": line
            }
            # 存入device_name_to_type
            if device_type:
                device_info["deviceType"] = device_type
                device_name_to_type[line] = device_type
            devices_data.append(device_info)

    devices_output_path = os.path.join(output_folder, "devices.json")
    with open(devices_output_path, 'w') as file:
        json.dump({"devices": devices_data}, file, indent=4)

    device_name_to_type_output_path = os.path.join(output_folder, "device_name_to_type.json")
    with open(device_name_to_type_output_path, 'w') as file:
        json.dump(device_name_to_type, file, indent=4)

def process_groups(split_data, output_folder):
    groups_content = split_data.get("groups", [])
    groups_data = []
    current_group = None

    for line in groups_content:
        line = line.strip()

        if line.startswith("NAME:"):
            current_group = line.replace("NAME:", "").strip()
            continue
        
        if line.startswith("DEVICE CONTROL:"):
            continue

        if current_group:
            groups_data.append({
                "groupName": current_group,
                "devices": line
            })

    groups_output_path = os.path.join(output_folder, "groups.json")
    with open(groups_output_path, 'w') as file:
        json.dump({"groups": groups_data}, file, indent=4)

scene_output_templates = {
    "Relay Type": lambda name, status: {
        "name": name,
        "status": status,
        "statusConditions": {}
    },
    "Curtain Type": lambda name, status: {
        "name": name,
        "status": status,
        "statusConditions": {
            "position": 100 if status == "OPEN" else 0
        }
    },
    "Dimmer Type": lambda name, status, level=100: {
        "name": name,
        "status": status,
        "statusConditions": {
            "level": level 
        }
    },
    "Fan Type": lambda name, status, relay_status, speed: {
        "name": name,
        "status": status,
        "statusConditions": {
            "relay": relay_status,
            "speed": speed
        }
    },
    "PowerPoint Type": {
        "Two-Way": lambda name, left_power, right_power: {
            "name": name,
            "statusConditions": {
                "leftPowerOnOff": left_power,
                "rightPowerOnOff": right_power
            }
        },
        "Single-Way": lambda name, power: {
            "name": name,
            "statusConditions": {
                "rightPowerOnOff": power
            }
        }
    }
}

def handle_fan_type(parts):
    device_name = parts[0]
    status = parts[1]
    relay_status = parts[3]
    speed = int(parts[5])
    return [scene_output_templates["Fan Type"](device_name, status, relay_status, speed)]

def handle_dimmer_type(parts):
    contents = []

    device_entries = " ".join(parts).split(",")

    last_entry_parts = device_entries[-1].strip().split()
    status = last_entry_parts[1] if len(last_entry_parts) > 1 else "OFF"
    level = 100
    
    if status.startswith("ON") and len(last_entry_parts) > 2:
        try:
            level_part = last_entry_parts[2].replace("+", "").replace("%", "").strip()
            level = int(level_part)
        except ValueError:
            level = 100
    elif status == "OFF":
        level = 0

    for entry in device_entries[:-1]:
        device_name = entry.strip()
        contents.append({
            "name": device_name,
            "status": status,
            "statusConditions": {
                "level": level
            }
        })

    device_name = last_entry_parts[0]
    contents.append({
        "name": device_name,
        "status": status,
        "statusConditions": {
            "level": level
        }
    })

    return contents

def handle_relay_type(parts):
    status = parts[-1].strip(',').upper() if parts[-1] in ["ON", "OFF"] else parts[-2].strip(',').upper()
    device_names = " ".join(parts[:-1] if parts[-1] in ["ON", "OFF"] else parts[:-2]).split(",")
    contents = []
    for device_name in device_names:
        clean_device_name = device_name.strip().strip(',').upper()
        if clean_device_name in device_name_to_type:
            contents.append(scene_output_templates["Relay Type"](clean_device_name, status))
        else:
            raise ValueError(f"无法确定设备类型：{clean_device_name}")
    return contents

def handle_curtain_type(parts):
    status = parts[-1].strip(',').upper() if parts[-1] in ["OPEN", "CLOSE"] else parts[-2].strip(',').upper()
    device_names = " ".join(parts[:-1] if parts[-1] in ["OPEN", "CLOSE"] else parts[:-2]).split(",")
    contents = []
    for device_name in device_names:
        clean_device_name = device_name.strip().strip(',').upper()
        if clean_device_name in device_name_to_type:
            contents.append(scene_output_templates["Curtain Type"](clean_device_name, status))
        else:
            raise ValueError(f"无法确定设备类型：{clean_device_name}")
    return contents

def handle_powerpoint_type(parts, device_type):
    contents = []
    
    if "Two-Way" in device_type:
        if len(parts) == 3:
            device_name = parts[0]
            left_power = parts[1]
            right_power = parts[2]
            contents.append(scene_output_templates["PowerPoint Type"]["Two-Way"](device_name, left_power, right_power))
        else:
            raise ValueError(f"Two-Way PowerPoint '{parts[0]}' input format is incorrect. Expected format: '<device_name> <left_power> <right_power>'. Got: {parts}")
        
    elif "Single-Way" in device_type:
        if len(parts) == 2:
            device_name = parts[0]
            power = parts[1]
            contents.append(scene_output_templates["PowerPoint Type"]["Single-Way"](device_name, power))
        else:
            raise ValueError(f"Single-Way PowerPoint '{parts[0]}' input format is incorrect. Expected format: '<device_name> <power_state>'. Got: {parts}")
    
    return contents

def determine_device_type(device_name):
    original_device_name = device_name.strip().strip(',')
    clean_device_name = original_device_name
    
    if not clean_device_name:
        print(f"Error: Detected empty or invalid device name: '{original_device_name}'")
        raise ValueError("设备名称不能为空。")

    device_type = device_name_to_type.get(clean_device_name)
    
    if device_type:
        print(f"Device name '{original_device_name}' identified as '{device_type}'")
        return device_type
    else:
        print(f"Unknown device name: '{original_device_name}' (Processed as: '{clean_device_name}')")
        print("Current device_name_to_type mapping:", device_name_to_type)
        raise ValueError(f"无法确定设备类型：'{original_device_name}'")

def parse_scene_content(scene_name, content_lines):
    contents = []
    
    for line in content_lines:
        parts = line.split()
        if len(parts) < 2:
            print(f"Skipping line due to insufficient parts: {line}")
            continue

        print(f"Processing line: {line}")

        if "RELAY" in line and "SPEED" in line:
            contents.extend(handle_fan_type(parts))
        else:
            try:
                device_type = determine_device_type(parts[0])
            except ValueError as e:
                print(f"Error in determining device type for line: {line}")
                continue
            
            if device_type == "Relay Type":
                contents.extend(handle_relay_type(parts))
            elif device_type == "Curtain Type":
                contents.extend(handle_curtain_type(parts))
            elif device_type == "Dimmer Type":
                contents.extend(handle_dimmer_type(parts))
            elif "PowerPoint Type" in device_type:
                if "Two-Way" in device_type:
                    contents.extend(handle_powerpoint_type(parts, "Two-Way PowerPoint Type"))
                elif "Single-Way" in device_type:
                    contents.extend(handle_powerpoint_type(parts, "Single-Way PowerPoint Type"))
            else:
                print(f"Unknown device type encountered: {device_type} for line: {line}")
    
    return contents

def process_scenes(split_data, output_folder):
    scenes_content = split_data.get("scenes", [])
    scenes_data = {}
    current_scene = None

    for i, line in enumerate(scenes_content):
        line = line.strip()
        
        if line.startswith("CONTROL CONTENT:"):
            continue
        
        if line.startswith("NAME:"):
            if current_scene and current_scene in scenes_data:
                scenes_data[current_scene] = scenes_data[current_scene]

            current_scene = line.replace("NAME:", "").strip()
            if current_scene not in scenes_data:
                scenes_data[current_scene] = []
        elif current_scene:
            try:
                scenes_data[current_scene].extend(parse_scene_content(current_scene, [line]))
            except ValueError as e:
                print(f"Skipping line due to error: {e}")

    if current_scene and current_scene in scenes_data:
        scenes_data[current_scene] = scenes_data[current_scene]

    scenes_output = [{"sceneName": scene_name, "contents": contents} for scene_name, contents in scenes_data.items()]

    scenes_output_path = os.path.join(output_folder, "scenes.json")
    with open(scenes_output_path, 'w') as file:
        json.dump({"scenes": scenes_output}, file, indent=4)

def process_remote_controls(split_data, output_folder):
    remote_controls_content = split_data.get("remoteControls", [])
    remote_controls_data = []
    current_remote = None
    current_links = []

    for line in remote_controls_content:
        line = line.strip()

        if line.startswith("TOTAL"):
            continue

        if line.startswith("NAME:"):
            if current_remote:
                remote_controls_data.append({
                    "remoteName": current_remote,
                    "links": current_links
                })
            current_remote = line.replace("NAME:", "").strip()
            current_links = []
        
        elif line.startswith("LINK:"):
            continue

        else:
            parts = line.split(":")
            if len(parts) < 2:
                continue

            link_index = int(parts[0].strip()) - 1
            link_description = parts[1].strip()

            action = "NORMAL"
            if " - " in link_description:
                link_description, action = link_description.rsplit(" - ", 1)
                action = action.strip().upper()

            if link_description.startswith("SCENE"):
                link_type = 2
                link_name = link_description.replace("SCENE", "").strip()
            elif link_description.startswith("GROUP"):
                link_type = 1
                link_name = link_description.replace("GROUP", "").strip()
            elif link_description.startswith("DEVICE"):
                link_type = 0
                link_name = link_description.replace("DEVICE", "").strip()
            else:
                continue

            current_links.append({
                "linkIndex": link_index,
                "linkType": link_type,
                "linkName": link_name,
                "action": action
            })

    if current_remote:
        remote_controls_data.append({
            "remoteName": current_remote,
            "links": current_links
        })

    remote_controls_output_path = os.path.join(output_folder, "remoteControls.json")
    with open(remote_controls_output_path, 'w') as file:
        json.dump({"remoteControls": remote_controls_data}, file, indent=4)

def split_json_file(input_file_path, output_folder):
    with open(input_file_path, 'r') as file:
        data = json.load(file)
    content = data.get("programming details", [])
    split_keywords = {
        "devices": "KASTA DEVICE",
        "groups": "KASTA GROUP",
        "scenes": "KASTA SCENE",
        "remoteControls": "REMOTE CONTROL LINK"
    }
    split_data = {
        "devices": [],
        "groups": [],
        "scenes": [],
        "remoteControls": []
    }
    current_key = None
    for line in content:
        if line in split_keywords.values():
            current_key = next(key for key, value in split_keywords.items() if value == line)
            continue
        if current_key:
            split_data[current_key].append(line)
    os.makedirs(output_folder, exist_ok=True)
    
    process_devices(split_data, output_folder)
    process_groups(split_data, output_folder)
    process_scenes(split_data, output_folder)
    process_remote_controls(split_data, output_folder)
    
    device_name_to_type_output_path = os.path.join(output_folder, "device_name_to_type.json")
    with open(device_name_to_type_output_path, 'w') as file:
        json.dump(device_name_to_type, file, indent=4)

def test_process_excel(input_folder, output_folder):
    reset_device_name_to_type()
    print("Initialized device_name_to_type for the test.")

    for file_name in os.listdir(input_folder):
        if file_name.endswith('.xlsx'):
            file_path = os.path.join(input_folder, file_name)
            result = process_excel_to_json(file_path, output_folder)
            if result:
                print(f"Processed {file_name} into {result}")
                split_json_file(result, os.path.dirname(result))
            else:
                print(f"No matching worksheets found in {file_name}")

    reset_device_name_to_type()
    print("Cleared device_name_to_type after the test.")

input_folder = 'test_input'
output_folder = 'test_output'

test_process_excel(input_folder, output_folder)

Initialized device_name_to_type for the test.
Processed testing2.xlsx into test_output\testing2\input_data.json
Processing line: d1 ON
Device name 'd1' identified as 'Dimmer Type'
Processing line: d2 OFF
Device name 'd2' identified as 'Dimmer Type'
Processing line: d3, d4 ON
Device name 'd3' identified as 'Dimmer Type'
Processing line: d4, d5 OFF
Device name 'd4' identified as 'Dimmer Type'
Processing line: d6 ON +60%
Device name 'd6' identified as 'Dimmer Type'
Processing line: d7, d8 ON +70%
Device name 'd7' identified as 'Dimmer Type'
Processing line: d9, d10, d11 OFF
Device name 'd9' identified as 'Dimmer Type'
Processing line: d12, d13, d14, d15 ON +20%
Device name 'd12' identified as 'Dimmer Type'
Processing line: r1 ON
Device name 'r1' identified as 'Relay Type'
Skipping line due to error: 无法确定设备类型：R1
Processing line: r2 OFF
Device name 'r2' identified as 'Relay Type'
Skipping line due to error: 无法确定设备类型：R2
Processing line: r3, r4, r5 ON
Device name 'r3' identified as 'Relay Typ

PermissionError: [Errno 13] Permission denied: 'test_input\\~$testing2.xlsx'