In [None]:
import os
work_dir = os.path.abspath('..')
print(work_dir)
os.chdir(work_dir)
from aiphy import Theorist
from aiphy.experiment.basic import motion0_3d_config, motion_3d_config, collision_y_config, collision_nh_config
from aiphy.experiment.ground_laboratory import gravity_config
from aiphy.experiment.cosmos import celestial_2_xy_config, celestial_3_config
from aiphy.core import Exp
import aiphy.core as core
from aiphy.parsing import *
from aiphy.dataplot import plot_data, plot_normaldata
import sympy as sp
# from aiphy.experiment.oscillation_2spring import oscillation_2spring_config
import random

EXAMPLE_PATH = "data/test_cases/example_1"

INIT_TIME_LIMIT = 5
MUL = 3
MAX_ACTIONS = 7
BIAS = 5
ID = 5

FILE_NAME = f"example_1-{INIT_TIME_LIMIT}m{MUL}a{MAX_ACTIONS}p{BIAS}".replace(".", "-")

# Iterately create the path
FILE_PATH_PARENT = EXAMPLE_PATH + "/" + FILE_NAME
if ID is None:
    FILE_PATH = FILE_PATH_PARENT + '/' + FILE_NAME
else:
    FILE_PATH = FILE_PATH_PARENT + '/' + FILE_NAME + f"-{ID}"

theorist = Theorist.read_from_file(FILE_PATH)


In [None]:
import sympy as sp
import numpy as np
import re
from aiphy.core import sentence, Concept
exp_name = "motion_3d"
spm = theorist.specific[exp_name]
knowledge = theorist.knowledge
general = theorist.general
invalid_exps = lambda x: set(knowledge.fetch_exps) - set(general.general_laws[x].valid_experiments)
calc = lambda x: theorist.calc_expr(Exp(x), exp_name)
spexp = lambda x: spm._sympy_of_raw_defi(spm.expand_exp(Exp(x)))

### compensate concepts and gcs

In [None]:
exprs = ['D[A[1]]/D[t[0]] * E[2]', 'D[B[1]]/D[t[0]] * F[2]']
pats = {"$1$": {'A', 'B', 'C'}, "$2$": {'E', 'F', 'G'}}
extract_expression_pattern(exprs, pats, 2)

In [None]:
symmetry_collections = {"$1$": {"posx": "posx", "posy": "posy", "posz": "posz"},
                        "$2$": {"C_01": "posx", "C_04": "posy", "C_27": "posz"},
                        "$3$": {"C_136": "posx", "C_99": "posy", "C_28": "posz"}}

In [None]:
def check_single_relevant_direction(expr: Exp, symmetry_collections: dict[str, dict[str, str]]) -> dict[str: set[str]]:
    # symmetry_collections: {pattern: {atom: direction}}
    all_atom_names: set[str] = {atm.name for atm in expr.all_atoms}
    relevant_directions = {}  # {direction: set[pattern]}
    for atom in all_atom_names - {"t"} - {intr for intr in knowledge.fetch_intrinsic_concepts.keys()
        if "pos" not in str(spm._sympy_of_raw_defi(spm.expand_exp(Exp(str(knowledge.fetch_intrinsic_by_name(intr)).split("|- ")[1][:-1]))))}:
        flag = False
        for patt, equivs in symmetry_collections.items():
            if atom in equivs:
                if equivs[atom] not in relevant_directions:
                    relevant_directions[equivs[atom]] = set()
                relevant_directions[equivs[atom]].add(patt)
                flag = True
                break
        if not flag:
            relevant_directions = {}
            break
        if len(relevant_directions) > 1:
            break
    return relevant_directions

In [None]:
{intr for intr in knowledge.fetch_intrinsic_concepts.keys()
 if "pos" not in str(spm._sympy_of_raw_defi(spm.expand_exp(Exp(str(knowledge.fetch_intrinsic_by_name(intr)).split("|- ")[1][:-1]))))}

In [None]:
concept_to_gen = {}
for name in knowledge.fetch_concepts.keys():
    concept = knowledge.fetch_concept_by_name(name)
    if concept is None:
        continue
    res = check_single_relevant_direction(concept.exp, symmetry_collections)
    if len(res) == 1:
        concept_to_gen[name] = res
concept_to_gen

In [None]:
symmetry_collections_inversed = {patt: {direction: atom for atom, direction in equivs.items()} for patt, equivs in symmetry_collections.items()}
print(symmetry_collections_inversed)
for name, patt_dict in concept_to_gen.items():
    exist_concept = knowledge.fetch_concept_by_name(name)
    patts: set[str] = list(patt_dict.values())[0]
    dir2patt: dict[str, set[str]] = {direction: patts for direction in {"posx", "posy", "posz"} - set(patt_dict.keys())}
    template = extract_expression_pattern([str(exist_concept.exp)],
                                        {patt: set(direc.keys()) for patt, direc in symmetry_collections.items()})[0]
    if template is None:
        continue
    print('-'*10)
    print(name, ":")
    print(template)
    print(exist_concept.exp)
    for direction, set_patts in dir2patt.items():
        concept_exp_str: str = template
        for patt in set_patts:
            concept_exp_str = concept_exp_str.replace(patt, symmetry_collections_inversed[patt][direction])
        concept_exp: Exp = Exp(concept_exp_str)
        concept = knowledge.generalize_to_normal_concept(exp_name, concept_exp)
        if exist_concept.is_sum:
            concept = Concept.Mksum(list(exist_concept.objtype_id_map.keys())[0], concept)
        concept_name = theorist.register_concept(concept, spm)
        if concept_name is not None:
            print(concept_name)
            print(knowledge.fetch_concept_by_name(concept_name))

In [None]:
symmetry_collections = {"$1$": {"posx": "posx", "posy": "posy", "posz": "posz"},
                        "$2$": {"C_01": "posx", "C_04": "posy", "C_27": "posz"},
                        "$3$": {"C_136": "posx", "C_99": "posy", "C_28": "posz"},
                        "$4$": {"C_142": "posx", "C_100": "posy", "C_1814": "posz"}}
symmetry_collections_inversed = {patt: {direction: atom for atom, direction in equivs.items()} for patt, equivs in symmetry_collections.items()}

In [None]:
gc_list = list(general.general_conclusions.values())
for gc in gc_list:
    gc_exp_str = str(gc.relevant_concept.exp) if gc.indication == "ForAll" else str(gc.relevant_concept).split("|- ")[1]
    patt_matched = extract_expression_pattern(expand_expression(gc_exp_str),
                                              {patt: set(direc.keys()) for patt, direc in symmetry_collections.items()},
                                              minimal_terms=2)
    if patt_matched[0] is not None:
        print(gc)
        print(patt_matched)
        if len(patt_matched[1]) > 1:
            raise ValueError("More than one pattern matched")
        if gc.indication == "Sum":
            pattern = patt_matched[0]
            exist_set = list(patt_matched[1].values())[0]
            if len(exist_set) != 2:
                continue
            concept_name_to_append = list(set(symmetry_collections[pattern].keys()) - exist_set)[0]
            print("concept_name_to_append:", concept_name_to_append)
            concept_to_append = knowledge.gen_atom_concept(concept_name_to_append)
            if concept_to_append is None:
                continue
            new_concept = concept_to_append + gc.relevant_concept
            print(new_concept)
            general.create_general_law_from_old(exp_name, gc,
                                                       new_concept,
                                                       reset_father=True)

In [None]:
general.general_conclusions

### other test

In [None]:
theorist.theoretical_analysis(exp_name, ver='ver3',
                              name_list=['C_61', 'cx'],
                              max_actions=7)

In [None]:
import re
import matplotlib.pyplot as plt
import chardet

# 检测文件编码
def detect_encoding(file_path: str) -> str:
    with open(file_path, 'rb') as file:
        raw_data = file.read()
    result = chardet.detect(raw_data)
    return result['encoding']

# 读取文件并解析数据
def parse_log_file(file_path: str):
    epochs = []
    losses = []

    # 检测文件编码
    encoding = detect_encoding(file_path)
    print(f"检测到的文件编码: {encoding}")

    # 使用检测到的编码打开文件
    with open(file_path, 'r', encoding=encoding) as file:
        for line in file:
            # 使用正则表达式提取 epoch 和 loss
            match = re.match(r"epoch\s*=\s*(\d+),\s*loss\s*=\s*([\d.e+-]+)", line)
            if match:
                epoch = int(match.group(1))
                loss = float(match.group(2))
                epochs.append(epoch)
                losses.append(loss)

    return epochs, losses

# 绘制 loss 关于 epoch 的变化图（对数坐标）
def plot_loss_vs_epoch(epochs, losses, log_scale: bool = True):
    plt.figure(figsize=(10, 6))
    plt.plot(epochs, losses, marker='o', linestyle='-', color='b', label='Loss')

    # 设置对数坐标
    if log_scale:
        plt.yscale('log')  # Y 轴使用对数坐标

    plt.xlabel('Epoch')
    plt.ylabel('Loss (log scale)' if log_scale else 'Loss')
    plt.title('Loss vs Epoch (Log Scale)' if log_scale else 'Loss vs Epoch')
    plt.grid(True, which="both", ls="--")  # 添加网格线
    plt.legend()
    plt.show()

# 主程序
file_path = "temp/nn_test_concept_grp_epoch_loss.txt"  # 替换为你的文件路径
epochs, losses = parse_log_file(file_path)
plot_loss_vs_epoch(epochs, losses, log_scale=True)  # 设置为 True 使用对数坐标

### plot concepts-epoch

In [None]:
import pandas as pd
import matplotlib.pyplot as plt
import numpy as np
import os
work_dir = os.path.abspath('..')
os.chdir(work_dir)

# 读取CSV文件
df = pd.read_csv('data/concepts-epoch.csv', header=None)

# 初始化存储结果的列表
concepts = []
means = []
mins = []
maxs = []

# 逐行处理数据
for index, row in df.iterrows():
    # 第一列是概念名（带双引号），需要去掉双引号
    concept = row[0].strip('"')
    
    # 后面的列是epoch数据，忽略最后一个空值（因为每行末尾有逗号）
    epochs = row[1:-1].astype(int)
    
    # 计算平均值、标准差、最小值和最大值
    mean = np.mean(epochs)
    std = np.std(epochs)
    min_epoch = mean - std
    max_epoch = mean + std
    
    # 存储结果
    concepts.append(concept)
    means.append(mean)
    mins.append(min_epoch)
    maxs.append(max_epoch)

# 将结果存储在DataFrame中
results = pd.DataFrame({
    'Concept': concepts,
    'Mean': means,
    'Min': mins,
    'Max': maxs
})

# 按照平均值排序
results = results.sort_values(by='Mean')

# 绘制统计图
plt.figure(figsize=(10, len(results) * 0.5))

# 绘制每个概念的横线和error bar
for i, (concept, mean, min_epoch, max_epoch) in enumerate(zip(results['Concept'], results['Mean'], results['Min'], results['Max'])):
    # 绘制横线
    plt.hlines(y=i, xmin=min_epoch, xmax=max_epoch, colors='blue', lw=2)
    
    # 绘制error bar的短竖线
    plt.vlines(x=min_epoch, ymin=i-0.1, ymax=i+0.1, colors='black', lw=2)  # 最小值竖线
    plt.vlines(x=max_epoch, ymin=i-0.1, ymax=i+0.1, colors='black', lw=2)  # 最大值竖线
    
    # 标注平均值
    plt.plot(mean, i, 'ro', markersize=8)

# 设置纵轴标签为数学符号形式
plt.yticks(range(len(results)), [f'${concept}$' for concept in results['Concept']])

# 设置横轴标签
plt.xlabel('Epoch')
plt.ylabel('Concepts')

# 设置标题
plt.title('Epoch Statistics for Important Concepts')

# 显示图形
plt.tight_layout()
plt.show()