In [None]:
import pandas as pd
import numpy as np
from rdkit import Chem
from rdkit.Chem import AllChem
from rdkit.Chem import Draw
from rdkit.Chem.Draw import IPythonConsole
from rdkit.Chem import BRICS
from rdkit.Chem.EnumerateStereoisomers import EnumerateStereoisomers, StereoEnumerationOptions
import random
from typing import Optional, List, Union, Dict, Any
from tqdm import tqdm

def augment_smiles_dataset(df: pd.DataFrame,
                          smiles_column: str = 'SMILES',
                          augmentation_strategies: List[str] = ['enumeration', 'kekulize', 'stereo_enum', 'fragment', 'rotate'],
                          n_augmentations: int = 10,  # 减少增强次数，提高质量
                          preserve_original: bool = True,
                          random_seed: Optional[int] = None,
                          max_fragment_combinations: int = 2,
                          verbose: bool = True) -> pd.DataFrame:
    """
    增强的SMILES数据集生成器
    """
    if random_seed is not None:
        random.seed(random_seed)
        np.random.seed(random_seed)
    
    def debug_print(msg: str):
        if verbose:
            print(msg)
    
    def sanitize_smiles(smiles: str) -> Optional[str]:
        """净化并验证SMILES字符串"""
        try:
            mol = Chem.MolFromSmiles(smiles)
            if mol is not None:
                return Chem.MolToSmiles(mol, canonical=True)
        except Exception as e:
            debug_print(f"SMILES净化失败: {smiles}, 错误: {str(e)}")
        return None
    
    def apply_augmentation_strategy(smiles: str, strategy: str) -> List[str]:
        debug_print(f"\n尝试应用{strategy}策略到SMILES: {smiles}")
        try:
            mol = Chem.MolFromSmiles(smiles)
            if mol is None:
                debug_print(f"无法从SMILES创建分子: {smiles}")
                return []
            
            augmented = set()
            
            if strategy == 'enumeration':
                # SMILES枚举
                for _ in range(n_augmentations):
                    try:
                        enum_smiles = Chem.MolToSmiles(mol, 
                                                     canonical=False, 
                                                     doRandom=True,
                                                     isomericSmiles=True)
                        sanitized = sanitize_smiles(enum_smiles)
                        if sanitized and sanitized != smiles:
                            augmented.add(sanitized)
                    except Exception as e:
                        debug_print(f"枚举失败: {str(e)}")
            
            elif strategy == 'kekulize':
                # Kekulization变体
                try:
                    Chem.Kekulize(mol)
                    kek_smiles = Chem.MolToSmiles(mol, kekuleSmiles=True)
                    sanitized = sanitize_smiles(kek_smiles)
                    if sanitized and sanitized != smiles:
                        augmented.add(sanitized)
                    
                    # 添加带有显式芳香性的版本
                    aromatic_smiles = Chem.MolToSmiles(mol, kekuleSmiles=False)
                    sanitized = sanitize_smiles(aromatic_smiles)
                    if sanitized and sanitized != smiles:
                        augmented.add(sanitized)
                except Exception as e:
                    debug_print(f"Kekulization失败: {str(e)}")
            
            elif strategy == 'stereo_enum':
                try:
                    # 使用RDKit的立体异构体枚举器
                    opts = StereoEnumerationOptions(unique=True)
                    isomers = tuple(EnumerateStereoisomers(mol, options=opts))
                    for isomer in isomers:
                        iso_smiles = Chem.MolToSmiles(isomer, isomericSmiles=True)
                        sanitized = sanitize_smiles(iso_smiles)
                        if sanitized and sanitized != smiles:
                            augmented.add(sanitized)
                except Exception as e:
                    debug_print(f"立体异构体枚举失败: {str(e)}")
            
            elif strategy == 'fragment':
                try:
                    # BRICS分解和重组
                    fragments = list(BRICS.BRICSDecompose(mol))
                    debug_print(f"BRICS分解得到{len(fragments)}个片段")
                    if len(fragments) > 1:
                        for _ in range(min(max_fragment_combinations, n_augmentations)):
                            try:
                                n_frags = random.randint(2, min(len(fragments), 3))
                                selected_frags = random.sample(fragments, n_frags)
                                new_mol = BRICS.BRICSBuild([Chem.MolFromSmiles(f) for f in selected_frags])
                                if new_mol:
                                    new_smiles = Chem.MolToSmiles(new_mol)
                                    sanitized = sanitize_smiles(new_smiles)
                                    if sanitized and sanitized != smiles:
                                        augmented.add(sanitized)
                            except Exception as e:
                                debug_print(f"片段重组失败: {str(e)}")
                except Exception as e:
                    debug_print(f"BRICS分解失败: {str(e)}")
            
            elif strategy == 'rotate':
                try:
                    for i in range(mol.GetNumAtoms()):
                        try:
                            rot_smiles = Chem.MolToSmiles(mol, 
                                                         doRandom=True,
                                                         canonical=False,
                                                         rootedAtAtom=i)
                            sanitized = sanitize_smiles(rot_smiles)
                            if sanitized and sanitized != smiles:
                                augmented.add(sanitized)
                        except Exception as e:
                            debug_print(f"原子{i}旋转失败: {str(e)}")
                except Exception as e:
                    debug_print(f"分子旋转失败: {str(e)}")
            
            result = list(augmented)
            debug_print(f"{strategy}策略生成了{len(result)}个新SMILES")
            return result
            
        except Exception as e:
            debug_print(f"策略{strategy}整体失败: {str(e)}")
            return []
    
    augmented_rows = []
    stats: Dict[str, int] = {strategy: 0 for strategy in augmentation_strategies}
    stats['original'] = 0
    
    for idx, row in tqdm(df.iterrows(), total=len(df), desc="增强SMILES"):
        original_smiles = row[smiles_column]
        debug_print(f"\n处理SMILES {idx+1}/{len(df)}: {original_smiles}")
        
        if preserve_original:
            original_row = row.to_dict()
            original_row['augmentation_strategy'] = 'original'
            original_row['is_augmented'] = False
            augmented_rows.append(original_row)
            stats['original'] += 1
        
        for strategy in augmentation_strategies:
            strategy_smiles = apply_augmentation_strategy(original_smiles, strategy)
            
            for aug_smiles in strategy_smiles:
                if aug_smiles != original_smiles:
                    new_row = row.to_dict().copy()
                    new_row[smiles_column] = aug_smiles
                    new_row['augmentation_strategy'] = strategy
                    new_row['is_augmented'] = True
                    augmented_rows.append(new_row)
                    stats[strategy] += 1
    
    augmented_df = pd.DataFrame(augmented_rows)
    augmented_df = augmented_df.reset_index(drop=True)
    
    # 打印详细统计信息
    print("\n增强结果统计:")
    print(f"原始数据集大小: {len(df)}")
    print(f"增强后数据集大小: {len(augmented_df)}")
    print(f"总体增强倍数: {len(augmented_df) / len(df):.2f}x")
    print("\n各策略贡献:")
    for strategy, count in stats.items():
        print(f"- {strategy}: {count}条记录 ({count/len(df):.2f}x)")
    
    return augmented_df

# 让我们看看test数据的内容
print("原始test数据的前几行:")
print(test.head())
print("\n开始数据增强...")
test = augment_smiles_dataset(test, verbose=True)


In [None]:
import pandas as pd
import numpy as np
from rdkit import Chem
from rdkit.Chem import AllChem
from rdkit.Chem import Draw
from rdkit.Chem.Draw import IPythonConsole
from rdkit.Chem import BRICS
import random
from typing import Optional, List, Union, Dict, Any
from tqdm import tqdm

def augment_smiles_dataset(df: pd.DataFrame,
                          smiles_column: str = 'SMILES',
                          augmentation_strategies: List[str] = ['enumeration', 'kekulize', 'stereo_enum', 'fragment', 'rotate'],
                          n_augmentations: int = 100,
                          preserve_original: bool = True,
                          random_seed: Optional[int] = None,
                          max_fragment_combinations: int = 2) -> pd.DataFrame:
    """
    增强的SMILES数据集生成器
    
    参数:
    - df: 输入的DataFrame
    - smiles_column: SMILES列的名称
    - augmentation_strategies: 增强策略列表
    - n_augmentations: 每个策略的增强次数
    - preserve_original: 是否保留原始SMILES
    - random_seed: 随机种子
    - max_fragment_combinations: 片段重组的最大组合数
    
    返回:
    - 增强后的DataFrame
    """
    if random_seed is not None:
        random.seed(random_seed)
        np.random.seed(random_seed)
    
    def sanitize_smiles(smiles: str) -> str:
        """净化SMILES字符串"""
        try:
            mol = Chem.MolFromSmiles(smiles)
            if mol is not None:
                return Chem.MolToSmiles(mol, canonical=True)
        except:
            pass
        return smiles
    
    def apply_augmentation_strategy(smiles: str, strategy: str) -> List[str]:
        try:
            mol = Chem.MolFromSmiles(smiles)
            if mol is None:
                return [smiles]
            
            augmented = set()  # 使用集合避免重复
            
            if strategy == 'enumeration':
                # 增强的SMILES枚举
                for _ in range(n_augmentations):
                    enum_smiles = Chem.MolToSmiles(mol, 
                                                 canonical=False, 
                                                 doRandom=True,
                                                 isomericSmiles=True)
                    augmented.add(sanitize_smiles(enum_smiles))
            
            elif strategy == 'kekulize':
                # Kekulization变体
                try:
                    Chem.Kekulize(mol)
                    kek_smiles = Chem.MolToSmiles(mol, kekuleSmiles=True)
                    augmented.add(sanitize_smiles(kek_smiles))
                    
                    # 添加不同的Kekulization形式
                    for _ in range(min(5, n_augmentations)):
                        new_mol = Chem.MolFromSmiles(kek_smiles)
                        if new_mol:
                            Chem.Kekulize(new_mol)
                            new_smiles = Chem.MolToSmiles(new_mol, kekuleSmiles=True, doRandom=True)
                            augmented.add(sanitize_smiles(new_smiles))
                except:
                    pass
            
            elif strategy == 'stereo_enum':
                # 立体化学枚举
                try:
                    # 移除立体化学
                    Chem.RemoveStereochemistry(mol)
                    no_stereo = Chem.MolToSmiles(mol)
                    augmented.add(sanitize_smiles(no_stereo))
                    
                    # 添加随机立体中心
                    for _ in range(n_augmentations // 2):
                        new_mol = Chem.MolFromSmiles(no_stereo)
                        if new_mol:
                            Chem.AssignStereochemistry(new_mol, force=True, cleanIt=True)
                            stereo_smiles = Chem.MolToSmiles(new_mol, isomericSmiles=True)
                            augmented.add(sanitize_smiles(stereo_smiles))
                except:
                    pass
            
            elif strategy == 'fragment':
                # BRICS分解和重组
                try:
                    fragments = list(BRICS.BRICSDecompose(mol))
                    if len(fragments) > 1:
                        # 随机组合片段
                        for _ in range(min(max_fragment_combinations, n_augmentations)):
                            n_frags = random.randint(2, min(len(fragments), 3))
                            selected_frags = random.sample(fragments, n_frags)
                            new_mol = BRICS.BRICSBuild([Chem.MolFromSmiles(f) for f in selected_frags])
                            if new_mol:
                                new_smiles = Chem.MolToSmiles(new_mol)
                                augmented.add(sanitize_smiles(new_smiles))
                except:
                    pass
            
            elif strategy == 'rotate':
                # 原子顺序旋转
                try:
                    for _ in range(n_augmentations // 2):
                        rot_smiles = Chem.MolToSmiles(mol, 
                                                     doRandom=True,
                                                     canonical=False,
                                                     rootedAtAtom=random.randint(0, mol.GetNumAtoms()-1))
                        augmented.add(sanitize_smiles(rot_smiles))
                except:
                    pass
            
            return list(augmented)
            
        except Exception as e:
            print(f"警告: {strategy}处理{smiles}时出错: {e}")
            return [smiles]
    
    augmented_rows = []
    stats: Dict[str, int] = {strategy: 0 for strategy in augmentation_strategies}
    stats['original'] = 0
    
    for idx, row in tqdm(df.iterrows(), total=len(df), desc="增强SMILES"):
        original_smiles = row[smiles_column]
        
        if preserve_original:
            original_row = row.to_dict()
            original_row['augmentation_strategy'] = 'original'
            original_row['is_augmented'] = False
            augmented_rows.append(original_row)
            stats['original'] += 1
        
        for strategy in augmentation_strategies:
            strategy_smiles = apply_augmentation_strategy(original_smiles, strategy)
            
            for aug_smiles in strategy_smiles:
                if aug_smiles != original_smiles:
                    new_row = row.to_dict().copy()
                    new_row[smiles_column] = aug_smiles
                    new_row['augmentation_strategy'] = strategy
                    new_row['is_augmented'] = True
                    augmented_rows.append(new_row)
                    stats[strategy] += 1
    
    augmented_df = pd.DataFrame(augmented_rows)
    augmented_df = augmented_df.reset_index(drop=True)
    
    # 打印详细统计信息
    print("\n增强结果统计:")
    print(f"原始数据集大小: {len(df)}")
    print(f"增强后数据集大小: {len(augmented_df)}")
    print(f"总体增强倍数: {len(augmented_df) / len(df):.2f}x")
    print("\n各策略贡献:")
    for strategy, count in stats.items():
        print(f"- {strategy}: {count}条记录 ({count/len(df):.2f}x)")
    
    return augmented_df

test = augment_smiles_dataset(test)


In [None]:
import pandas as pd
import numpy as np
from rdkit import Chem
from rdkit.Chem import AllChem
from rdkit.Chem import Draw
from rdkit.Chem.Draw import IPythonConsole
from rdkit.Chem import BRICS
import random
from typing import Optional, List, Union, Dict, Any
from tqdm import tqdm

def augment_smiles_dataset(df: pd.DataFrame,
                          smiles_column: str = 'SMILES',
                          augmentation_strategies: List[str] = ['enumeration', 'kekulize', 'stereo_enum', 'fragment', 'rotate'],
                          n_augmentations: int = 100,
                          preserve_original: bool = True,
                          random_seed: Optional[int] = None,
                          max_fragment_combinations: int = 2) -> pd.DataFrame:
    """
    增强的SMILES数据集生成器
    
    参数:
    - df: 输入的DataFrame
    - smiles_column: SMILES列的名称
    - augmentation_strategies: 增强策略列表
    - n_augmentations: 每个策略的增强次数
    - preserve_original: 是否保留原始SMILES
    - random_seed: 随机种子
    - max_fragment_combinations: 片段重组的最大组合数
    
    返回:
    - 增强后的DataFrame
    """
    if random_seed is not None:
        random.seed(random_seed)
        np.random.seed(random_seed)
    
    def sanitize_smiles(smiles: str) -> str:
        """净化SMILES字符串"""
        try:
            mol = Chem.MolFromSmiles(smiles)
            if mol is not None:
                return Chem.MolToSmiles(mol, canonical=True)
        except:
            pass
        return smiles
    
    def apply_augmentation_strategy(smiles: str, strategy: str) -> List[str]:
        try:
            mol = Chem.MolFromSmiles(smiles)
            if mol is None:
                return [smiles]
            
            augmented = set()  # 使用集合避免重复
            
            if strategy == 'enumeration':
                # 增强的SMILES枚举
                for _ in range(n_augmentations):
                    enum_smiles = Chem.MolToSmiles(mol, 
                                                 canonical=False, 
                                                 doRandom=True,
                                                 isomericSmiles=True)
                    augmented.add(sanitize_smiles(enum_smiles))
            
            elif strategy == 'kekulize':
                # Kekulization变体
                try:
                    Chem.Kekulize(mol)
                    kek_smiles = Chem.MolToSmiles(mol, kekuleSmiles=True)
                    augmented.add(sanitize_smiles(kek_smiles))
                    
                    # 添加不同的Kekulization形式
                    for _ in range(min(5, n_augmentations)):
                        new_mol = Chem.MolFromSmiles(kek_smiles)
                        if new_mol:
                            Chem.Kekulize(new_mol)
                            new_smiles = Chem.MolToSmiles(new_mol, kekuleSmiles=True, doRandom=True)
                            augmented.add(sanitize_smiles(new_smiles))
                except:
                    pass
            
            elif strategy == 'stereo_enum':
                # 立体化学枚举
                try:
                    # 移除立体化学
                    Chem.RemoveStereochemistry(mol)
                    no_stereo = Chem.MolToSmiles(mol)
                    augmented.add(sanitize_smiles(no_stereo))
                    
                    # 添加随机立体中心
                    for _ in range(n_augmentations // 2):
                        new_mol = Chem.MolFromSmiles(no_stereo)
                        if new_mol:
                            Chem.AssignStereochemistry(new_mol, force=True, cleanIt=True)
                            stereo_smiles = Chem.MolToSmiles(new_mol, isomericSmiles=True)
                            augmented.add(sanitize_smiles(stereo_smiles))
                except:
                    pass
            
            elif strategy == 'fragment':
                # BRICS分解和重组
                try:
                    fragments = list(BRICS.BRICSDecompose(mol))
                    if len(fragments) > 1:
                        # 随机组合片段
                        for _ in range(min(max_fragment_combinations, n_augmentations)):
                            n_frags = random.randint(2, min(len(fragments), 3))
                            selected_frags = random.sample(fragments, n_frags)
                            new_mol = BRICS.BRICSBuild([Chem.MolFromSmiles(f) for f in selected_frags])
                            if new_mol:
                                new_smiles = Chem.MolToSmiles(new_mol)
                                augmented.add(sanitize_smiles(new_smiles))
                except:
                    pass
            
            elif strategy == 'rotate':
                # 原子顺序旋转
                try:
                    for _ in range(n_augmentations // 2):
                        rot_smiles = Chem.MolToSmiles(mol, 
                                                     doRandom=True,
                                                     canonical=False,
                                                     rootedAtAtom=random.randint(0, mol.GetNumAtoms()-1))
                        augmented.add(sanitize_smiles(rot_smiles))
                except:
                    pass
            
            return list(augmented)
            
        except Exception as e:
            print(f"警告: {strategy}处理{smiles}时出错: {e}")
            return [smiles]
    
    augmented_rows = []
    stats: Dict[str, int] = {strategy: 0 for strategy in augmentation_strategies}
    stats['original'] = 0
    
    for idx, row in tqdm(df.iterrows(), total=len(df), desc="增强SMILES"):
        original_smiles = row[smiles_column]
        
        if preserve_original:
            original_row = row.to_dict()
            original_row['augmentation_strategy'] = 'original'
            original_row['is_augmented'] = False
            augmented_rows.append(original_row)
            stats['original'] += 1
        
        for strategy in augmentation_strategies:
            strategy_smiles = apply_augmentation_strategy(original_smiles, strategy)
            
            for aug_smiles in strategy_smiles:
                if aug_smiles != original_smiles:
                    new_row = row.to_dict().copy()
                    new_row[smiles_column] = aug_smiles
                    new_row['augmentation_strategy'] = strategy
                    new_row['is_augmented'] = True
                    augmented_rows.append(new_row)
                    stats[strategy] += 1
    
    augmented_df = pd.DataFrame(augmented_rows)
    augmented_df = augmented_df.reset_index(drop=True)
    
    # 打印详细统计信息
    print("\n增强结果统计:")
    print(f"原始数据集大小: {len(df)}")
    print(f"增强后数据集大小: {len(augmented_df)}")
    print(f"总体增强倍数: {len(augmented_df) / len(df):.2f}x")
    print("\n各策略贡献:")
    for strategy, count in stats.items():
        print(f"- {strategy}: {count}条记录 ({count/len(df):.2f}x)")
    
    return augmented_df

test = augment_smiles_dataset(test)


In [None]:
import pandas as pd
import numpy as np
from rdkit import Chem
from rdkit.Chem import AllChem
from rdkit.Chem import Draw
from rdkit.Chem.Draw import IPythonConsole
from rdkit.Chem import BRICS
import random
from typing import Optional, List, Union, Dict, Any
from tqdm import tqdm

def augment_smiles_dataset(df: pd.DataFrame,
                          smiles_column: str = 'SMILES',
                          augmentation_strategies: List[str] = ['enumeration', 'kekulize', 'stereo_enum', 'fragment', 'rotate'],
                          n_augmentations: int = 100,
                          preserve_original: bool = True,
                          random_seed: Optional[int] = None,
                          max_fragment_combinations: int = 2) -> pd.DataFrame:
    """
    增强的SMILES数据集生成器
    
    参数:
    - df: 输入的DataFrame
    - smiles_column: SMILES列的名称
    - augmentation_strategies: 增强策略列表
    - n_augmentations: 每个策略的增强次数
    - preserve_original: 是否保留原始SMILES
    - random_seed: 随机种子
    - max_fragment_combinations: 片段重组的最大组合数
    
    返回:
    - 增强后的DataFrame
    """
    if random_seed is not None:
        random.seed(random_seed)
        np.random.seed(random_seed)
    
    def sanitize_smiles(smiles: str) -> str:
        """净化SMILES字符串"""
        try:
            mol = Chem.MolFromSmiles(smiles)
            if mol is not None:
                return Chem.MolToSmiles(mol, canonical=True)
        except:
            pass
        return smiles
    
    def apply_augmentation_strategy(smiles: str, strategy: str) -> List[str]:
        try:
            mol = Chem.MolFromSmiles(smiles)
            if mol is None:
                return [smiles]
            
            augmented = set()  # 使用集合避免重复
            
            if strategy == 'enumeration':
                # 增强的SMILES枚举
                for _ in range(n_augmentations):
                    enum_smiles = Chem.MolToSmiles(mol, 
                                                 canonical=False, 
                                                 doRandom=True,
                                                 isomericSmiles=True)
                    augmented.add(sanitize_smiles(enum_smiles))
            
            elif strategy == 'kekulize':
                # Kekulization变体
                try:
                    Chem.Kekulize(mol)
                    kek_smiles = Chem.MolToSmiles(mol, kekuleSmiles=True)
                    augmented.add(sanitize_smiles(kek_smiles))
                    
                    # 添加不同的Kekulization形式
                    for _ in range(min(5, n_augmentations)):
                        new_mol = Chem.MolFromSmiles(kek_smiles)
                        if new_mol:
                            Chem.Kekulize(new_mol)
                            new_smiles = Chem.MolToSmiles(new_mol, kekuleSmiles=True, doRandom=True)
                            augmented.add(sanitize_smiles(new_smiles))
                except:
                    pass
            
            elif strategy == 'stereo_enum':
                # 立体化学枚举
                try:
                    # 移除立体化学
                    Chem.RemoveStereochemistry(mol)
                    no_stereo = Chem.MolToSmiles(mol)
                    augmented.add(sanitize_smiles(no_stereo))
                    
                    # 添加随机立体中心
                    for _ in range(n_augmentations // 2):
                        new_mol = Chem.MolFromSmiles(no_stereo)
                        if new_mol:
                            Chem.AssignStereochemistry(new_mol, force=True, cleanIt=True)
                            stereo_smiles = Chem.MolToSmiles(new_mol, isomericSmiles=True)
                            augmented.add(sanitize_smiles(stereo_smiles))
                except:
                    pass
            
            elif strategy == 'fragment':
                # BRICS分解和重组
                try:
                    fragments = list(BRICS.BRICSDecompose(mol))
                    if len(fragments) > 1:
                        # 随机组合片段
                        for _ in range(min(max_fragment_combinations, n_augmentations)):
                            n_frags = random.randint(2, min(len(fragments), 3))
                            selected_frags = random.sample(fragments, n_frags)
                            new_mol = BRICS.BRICSBuild([Chem.MolFromSmiles(f) for f in selected_frags])
                            if new_mol:
                                new_smiles = Chem.MolToSmiles(new_mol)
                                augmented.add(sanitize_smiles(new_smiles))
                except:
                    pass
            
            elif strategy == 'rotate':
                # 原子顺序旋转
                try:
                    for _ in range(n_augmentations // 2):
                        rot_smiles = Chem.MolToSmiles(mol, 
                                                     doRandom=True,
                                                     canonical=False,
                                                     rootedAtAtom=random.randint(0, mol.GetNumAtoms()-1))
                        augmented.add(sanitize_smiles(rot_smiles))
                except:
                    pass
            
            return list(augmented)
            
        except Exception as e:
            print(f"警告: {strategy}处理{smiles}时出错: {e}")
            return [smiles]
    
    augmented_rows = []
    stats: Dict[str, int] = {strategy: 0 for strategy in augmentation_strategies}
    stats['original'] = 0
    
    for idx, row in tqdm(df.iterrows(), total=len(df), desc="增强SMILES"):
        original_smiles = row[smiles_column]
        
        if preserve_original:
            original_row = row.to_dict()
            original_row['augmentation_strategy'] = 'original'
            original_row['is_augmented'] = False
            augmented_rows.append(original_row)
            stats['original'] += 1
        
        for strategy in augmentation_strategies:
            strategy_smiles = apply_augmentation_strategy(original_smiles, strategy)
            
            for aug_smiles in strategy_smiles:
                if aug_smiles != original_smiles:
                    new_row = row.to_dict().copy()
                    new_row[smiles_column] = aug_smiles
                    new_row['augmentation_strategy'] = strategy
                    new_row['is_augmented'] = True
                    augmented_rows.append(new_row)
                    stats[strategy] += 1
    
    augmented_df = pd.DataFrame(augmented_rows)
    augmented_df = augmented_df.reset_index(drop=True)
    
    # 打印详细统计信息
    print("\n增强结果统计:")
    print(f"原始数据集大小: {len(df)}")
    print(f"增强后数据集大小: {len(augmented_df)}")
    print(f"总体增强倍数: {len(augmented_df) / len(df):.2f}x")
    print("\n各策略贡献:")
    for strategy, count in stats.items():
        print(f"- {strategy}: {count}条记录 ({count/len(df):.2f}x)")
    
    return augmented_df

test = augment_smiles_dataset(test)


In [None]:
import pandas as pd
import numpy as np
from rdkit import Chem
from rdkit.Chem import AllChem
from rdkit.Chem import Draw
from rdkit.Chem.Draw import IPythonConsole
from rdkit.Chem import BRICS
import random
from typing import Optional, List, Union, Dict, Any
from tqdm import tqdm

def augment_smiles_dataset(df: pd.DataFrame,
                          smiles_column: str = 'SMILES',
                          augmentation_strategies: List[str] = ['enumeration', 'kekulize', 'stereo_enum', 'fragment', 'rotate'],
                          n_augmentations: int = 100,
                          preserve_original: bool = True,
                          random_seed: Optional[int] = None,
                          max_fragment_combinations: int = 2) -> pd.DataFrame:
    """
    增强的SMILES数据集生成器
    
    参数:
    - df: 输入的DataFrame
    - smiles_column: SMILES列的名称
    - augmentation_strategies: 增强策略列表
    - n_augmentations: 每个策略的增强次数
    - preserve_original: 是否保留原始SMILES
    - random_seed: 随机种子
    - max_fragment_combinations: 片段重组的最大组合数
    
    返回:
    - 增强后的DataFrame
    """
    if random_seed is not None:
        random.seed(random_seed)
        np.random.seed(random_seed)
    
    def sanitize_smiles(smiles: str) -> str:
        """净化SMILES字符串"""
        try:
            mol = Chem.MolFromSmiles(smiles)
            if mol is not None:
                return Chem.MolToSmiles(mol, canonical=True)
        except:
            pass
        return smiles
    
    def apply_augmentation_strategy(smiles: str, strategy: str) -> List[str]:
        try:
            mol = Chem.MolFromSmiles(smiles)
            if mol is None:
                return [smiles]
            
            augmented = set()  # 使用集合避免重复
            
            if strategy == 'enumeration':
                # 增强的SMILES枚举
                for _ in range(n_augmentations):
                    enum_smiles = Chem.MolToSmiles(mol, 
                                                 canonical=False, 
                                                 doRandom=True,
                                                 isomericSmiles=True)
                    augmented.add(sanitize_smiles(enum_smiles))
            
            elif strategy == 'kekulize':
                # Kekulization变体
                try:
                    Chem.Kekulize(mol)
                    kek_smiles = Chem.MolToSmiles(mol, kekuleSmiles=True)
                    augmented.add(sanitize_smiles(kek_smiles))
                    
                    # 添加不同的Kekulization形式
                    for _ in range(min(5, n_augmentations)):
                        new_mol = Chem.MolFromSmiles(kek_smiles)
                        if new_mol:
                            Chem.Kekulize(new_mol)
                            new_smiles = Chem.MolToSmiles(new_mol, kekuleSmiles=True, doRandom=True)
                            augmented.add(sanitize_smiles(new_smiles))
                except:
                    pass
            
            elif strategy == 'stereo_enum':
                # 立体化学枚举
                try:
                    # 移除立体化学
                    Chem.RemoveStereochemistry(mol)
                    no_stereo = Chem.MolToSmiles(mol)
                    augmented.add(sanitize_smiles(no_stereo))
                    
                    # 添加随机立体中心
                    for _ in range(n_augmentations // 2):
                        new_mol = Chem.MolFromSmiles(no_stereo)
                        if new_mol:
                            Chem.AssignStereochemistry(new_mol, force=True, cleanIt=True)
                            stereo_smiles = Chem.MolToSmiles(new_mol, isomericSmiles=True)
                            augmented.add(sanitize_smiles(stereo_smiles))
                except:
                    pass
            
            elif strategy == 'fragment':
                # BRICS分解和重组
                try:
                    fragments = list(BRICS.BRICSDecompose(mol))
                    if len(fragments) > 1:
                        # 随机组合片段
                        for _ in range(min(max_fragment_combinations, n_augmentations)):
                            n_frags = random.randint(2, min(len(fragments), 3))
                            selected_frags = random.sample(fragments, n_frags)
                            new_mol = BRICS.BRICSBuild([Chem.MolFromSmiles(f) for f in selected_frags])
                            if new_mol:
                                new_smiles = Chem.MolToSmiles(new_mol)
                                augmented.add(sanitize_smiles(new_smiles))
                except:
                    pass
            
            elif strategy == 'rotate':
                # 原子顺序旋转
                try:
                    for _ in range(n_augmentations // 2):
                        rot_smiles = Chem.MolToSmiles(mol, 
                                                     doRandom=True,
                                                     canonical=False,
                                                     rootedAtAtom=random.randint(0, mol.GetNumAtoms()-1))
                        augmented.add(sanitize_smiles(rot_smiles))
                except:
                    pass
            
            return list(augmented)
            
        except Exception as e:
            print(f"警告: {strategy}处理{smiles}时出错: {e}")
            return [smiles]
    
    augmented_rows = []
    stats: Dict[str, int] = {strategy: 0 for strategy in augmentation_strategies}
    stats['original'] = 0
    
    for idx, row in tqdm(df.iterrows(), total=len(df), desc="增强SMILES"):
        original_smiles = row[smiles_column]
        
        if preserve_original:
            original_row = row.to_dict()
            original_row['augmentation_strategy'] = 'original'
            original_row['is_augmented'] = False
            augmented_rows.append(original_row)
            stats['original'] += 1
        
        for strategy in augmentation_strategies:
            strategy_smiles = apply_augmentation_strategy(original_smiles, strategy)
            
            for aug_smiles in strategy_smiles:
                if aug_smiles != original_smiles:
                    new_row = row.to_dict().copy()
                    new_row[smiles_column] = aug_smiles
                    new_row['augmentation_strategy'] = strategy
                    new_row['is_augmented'] = True
                    augmented_rows.append(new_row)
                    stats[strategy] += 1
    
    augmented_df = pd.DataFrame(augmented_rows)
    augmented_df = augmented_df.reset_index(drop=True)
    
    # 打印详细统计信息
    print("\n增强结果统计:")
    print(f"原始数据集大小: {len(df)}")
    print(f"增强后数据集大小: {len(augmented_df)}")
    print(f"总体增强倍数: {len(augmented_df) / len(df):.2f}x")
    print("\n各策略贡献:")
    for strategy, count in stats.items():
        print(f"- {strategy}: {count}条记录 ({count/len(df):.2f}x)")
    
    return augmented_df

test = augment_smiles_dataset(test)


In [None]:
import pandas as pd
import numpy as np
from rdkit import Chem
from rdkit.Chem import AllChem
from rdkit.Chem import Draw
from rdkit.Chem.Draw import IPythonConsole
from rdkit.Chem import BRICS
import random
from typing import Optional, List, Union, Dict, Any
from tqdm import tqdm

def augment_smiles_dataset(df: pd.DataFrame,
                          smiles_column: str = 'SMILES',
                          augmentation_strategies: List[str] = ['enumeration', 'kekulize', 'stereo_enum', 'fragment', 'rotate'],
                          n_augmentations: int = 100,
                          preserve_original: bool = True,
                          random_seed: Optional[int] = None,
                          max_fragment_combinations: int = 2) -> pd.DataFrame:
    """
    增强的SMILES数据集生成器
    
    参数:
    - df: 输入的DataFrame
    - smiles_column: SMILES列的名称
    - augmentation_strategies: 增强策略列表
    - n_augmentations: 每个策略的增强次数
    - preserve_original: 是否保留原始SMILES
    - random_seed: 随机种子
    - max_fragment_combinations: 片段重组的最大组合数
    
    返回:
    - 增强后的DataFrame
    """
    if random_seed is not None:
        random.seed(random_seed)
        np.random.seed(random_seed)
    
    def sanitize_smiles(smiles: str) -> str:
        """净化SMILES字符串"""
        try:
            mol = Chem.MolFromSmiles(smiles)
            if mol is not None:
                return Chem.MolToSmiles(mol, canonical=True)
        except:
            pass
        return smiles
    
    def apply_augmentation_strategy(smiles: str, strategy: str) -> List[str]:
        try:
            mol = Chem.MolFromSmiles(smiles)
            if mol is None:
                return [smiles]
            
            augmented = set()  # 使用集合避免重复
            
            if strategy == 'enumeration':
                # 增强的SMILES枚举
                for _ in range(n_augmentations):
                    enum_smiles = Chem.MolToSmiles(mol, 
                                                 canonical=False, 
                                                 doRandom=True,
                                                 isomericSmiles=True)
                    augmented.add(sanitize_smiles(enum_smiles))
            
            elif strategy == 'kekulize':
                # Kekulization变体
                try:
                    Chem.Kekulize(mol)
                    kek_smiles = Chem.MolToSmiles(mol, kekuleSmiles=True)
                    augmented.add(sanitize_smiles(kek_smiles))
                    
                    # 添加不同的Kekulization形式
                    for _ in range(min(5, n_augmentations)):
                        new_mol = Chem.MolFromSmiles(kek_smiles)
                        if new_mol:
                            Chem.Kekulize(new_mol)
                            new_smiles = Chem.MolToSmiles(new_mol, kekuleSmiles=True, doRandom=True)
                            augmented.add(sanitize_smiles(new_smiles))
                except:
                    pass
            
            elif strategy == 'stereo_enum':
                # 立体化学枚举
                try:
                    # 移除立体化学
                    Chem.RemoveStereochemistry(mol)
                    no_stereo = Chem.MolToSmiles(mol)
                    augmented.add(sanitize_smiles(no_stereo))
                    
                    # 添加随机立体中心
                    for _ in range(n_augmentations // 2):
                        new_mol = Chem.MolFromSmiles(no_stereo)
                        if new_mol:
                            Chem.AssignStereochemistry(new_mol, force=True, cleanIt=True)
                            stereo_smiles = Chem.MolToSmiles(new_mol, isomericSmiles=True)
                            augmented.add(sanitize_smiles(stereo_smiles))
                except:
                    pass
            
            elif strategy == 'fragment':
                # BRICS分解和重组
                try:
                    fragments = list(BRICS.BRICSDecompose(mol))
                    if len(fragments) > 1:
                        # 随机组合片段
                        for _ in range(min(max_fragment_combinations, n_augmentations)):
                            n_frags = random.randint(2, min(len(fragments), 3))
                            selected_frags = random.sample(fragments, n_frags)
                            new_mol = BRICS.BRICSBuild([Chem.MolFromSmiles(f) for f in selected_frags])
                            if new_mol:
                                new_smiles = Chem.MolToSmiles(new_mol)
                                augmented.add(sanitize_smiles(new_smiles))
                except:
                    pass
            
            elif strategy == 'rotate':
                # 原子顺序旋转
                try:
                    for _ in range(n_augmentations // 2):
                        rot_smiles = Chem.MolToSmiles(mol, 
                                                     doRandom=True,
                                                     canonical=False,
                                                     rootedAtAtom=random.randint(0, mol.GetNumAtoms()-1))
                        augmented.add(sanitize_smiles(rot_smiles))
                except:
                    pass
            
            return list(augmented)
            
        except Exception as e:
            print(f"警告: {strategy}处理{smiles}时出错: {e}")
            return [smiles]
    
    augmented_rows = []
    stats: Dict[str, int] = {strategy: 0 for strategy in augmentation_strategies}
    stats['original'] = 0
    
    for idx, row in tqdm(df.iterrows(), total=len(df), desc="增强SMILES"):
        original_smiles = row[smiles_column]
        
        if preserve_original:
            original_row = row.to_dict()
            original_row['augmentation_strategy'] = 'original'
            original_row['is_augmented'] = False
            augmented_rows.append(original_row)
            stats['original'] += 1
        
        for strategy in augmentation_strategies:
            strategy_smiles = apply_augmentation_strategy(original_smiles, strategy)
            
            for aug_smiles in strategy_smiles:
                if aug_smiles != original_smiles:
                    new_row = row.to_dict().copy()
                    new_row[smiles_column] = aug_smiles
                    new_row['augmentation_strategy'] = strategy
                    new_row['is_augmented'] = True
                    augmented_rows.append(new_row)
                    stats[strategy] += 1
    
    augmented_df = pd.DataFrame(augmented_rows)
    augmented_df = augmented_df.reset_index(drop=True)
    
    # 打印详细统计信息
    print("\n增强结果统计:")
    print(f"原始数据集大小: {len(df)}")
    print(f"增强后数据集大小: {len(augmented_df)}")
    print(f"总体增强倍数: {len(augmented_df) / len(df):.2f}x")
    print("\n各策略贡献:")
    for strategy, count in stats.items():
        print(f"- {strategy}: {count}条记录 ({count/len(df):.2f}x)")
    
    return augmented_df

test = augment_smiles_dataset(test)


In [None]:
import pandas as pd
import numpy as np
from rdkit import Chem
from rdkit.Chem import AllChem
from rdkit.Chem import Draw
from rdkit.Chem.Draw import IPythonConsole
from rdkit.Chem import BRICS
import random
from typing import Optional, List, Union, Dict, Any
from tqdm import tqdm

def augment_smiles_dataset(df: pd.DataFrame,
                          smiles_column: str = 'SMILES',
                          augmentation_strategies: List[str] = ['enumeration', 'kekulize', 'stereo_enum', 'fragment', 'rotate'],
                          n_augmentations: int = 100,
                          preserve_original: bool = True,
                          random_seed: Optional[int] = None,
                          max_fragment_combinations: int = 2) -> pd.DataFrame:
    """
    增强的SMILES数据集生成器
    
    参数:
    - df: 输入的DataFrame
    - smiles_column: SMILES列的名称
    - augmentation_strategies: 增强策略列表
    - n_augmentations: 每个策略的增强次数
    - preserve_original: 是否保留原始SMILES
    - random_seed: 随机种子
    - max_fragment_combinations: 片段重组的最大组合数
    
    返回:
    - 增强后的DataFrame
    """
    if random_seed is not None:
        random.seed(random_seed)
        np.random.seed(random_seed)
    
    def sanitize_smiles(smiles: str) -> str:
        """净化SMILES字符串"""
        try:
            mol = Chem.MolFromSmiles(smiles)
            if mol is not None:
                return Chem.MolToSmiles(mol, canonical=True)
        except:
            pass
        return smiles
    
    def apply_augmentation_strategy(smiles: str, strategy: str) -> List[str]:
        try:
            mol = Chem.MolFromSmiles(smiles)
            if mol is None:
                return [smiles]
            
            augmented = set()  # 使用集合避免重复
            
            if strategy == 'enumeration':
                # 增强的SMILES枚举
                for _ in range(n_augmentations):
                    enum_smiles = Chem.MolToSmiles(mol, 
                                                 canonical=False, 
                                                 doRandom=True,
                                                 isomericSmiles=True)
                    augmented.add(sanitize_smiles(enum_smiles))
            
            elif strategy == 'kekulize':
                # Kekulization变体
                try:
                    Chem.Kekulize(mol)
                    kek_smiles = Chem.MolToSmiles(mol, kekuleSmiles=True)
                    augmented.add(sanitize_smiles(kek_smiles))
                    
                    # 添加不同的Kekulization形式
                    for _ in range(min(5, n_augmentations)):
                        new_mol = Chem.MolFromSmiles(kek_smiles)
                        if new_mol:
                            Chem.Kekulize(new_mol)
                            new_smiles = Chem.MolToSmiles(new_mol, kekuleSmiles=True, doRandom=True)
                            augmented.add(sanitize_smiles(new_smiles))
                except:
                    pass
            
            elif strategy == 'stereo_enum':
                # 立体化学枚举
                try:
                    # 移除立体化学
                    Chem.RemoveStereochemistry(mol)
                    no_stereo = Chem.MolToSmiles(mol)
                    augmented.add(sanitize_smiles(no_stereo))
                    
                    # 添加随机立体中心
                    for _ in range(n_augmentations // 2):
                        new_mol = Chem.MolFromSmiles(no_stereo)
                        if new_mol:
                            Chem.AssignStereochemistry(new_mol, force=True, cleanIt=True)
                            stereo_smiles = Chem.MolToSmiles(new_mol, isomericSmiles=True)
                            augmented.add(sanitize_smiles(stereo_smiles))
                except:
                    pass
            
            elif strategy == 'fragment':
                # BRICS分解和重组
                try:
                    fragments = list(BRICS.BRICSDecompose(mol))
                    if len(fragments) > 1:
                        # 随机组合片段
                        for _ in range(min(max_fragment_combinations, n_augmentations)):
                            n_frags = random.randint(2, min(len(fragments), 3))
                            selected_frags = random.sample(fragments, n_frags)
                            new_mol = BRICS.BRICSBuild([Chem.MolFromSmiles(f) for f in selected_frags])
                            if new_mol:
                                new_smiles = Chem.MolToSmiles(new_mol)
                                augmented.add(sanitize_smiles(new_smiles))
                except:
                    pass
            
            elif strategy == 'rotate':
                # 原子顺序旋转
                try:
                    for _ in range(n_augmentations // 2):
                        rot_smiles = Chem.MolToSmiles(mol, 
                                                     doRandom=True,
                                                     canonical=False,
                                                     rootedAtAtom=random.randint(0, mol.GetNumAtoms()-1))
                        augmented.add(sanitize_smiles(rot_smiles))
                except:
                    pass
            
            return list(augmented)
            
        except Exception as e:
            print(f"警告: {strategy}处理{smiles}时出错: {e}")
            return [smiles]
    
    augmented_rows = []
    stats: Dict[str, int] = {strategy: 0 for strategy in augmentation_strategies}
    stats['original'] = 0
    
    for idx, row in tqdm(df.iterrows(), total=len(df), desc="增强SMILES"):
        original_smiles = row[smiles_column]
        
        if preserve_original:
            original_row = row.to_dict()
            original_row['augmentation_strategy'] = 'original'
            original_row['is_augmented'] = False
            augmented_rows.append(original_row)
            stats['original'] += 1
        
        for strategy in augmentation_strategies:
            strategy_smiles = apply_augmentation_strategy(original_smiles, strategy)
            
            for aug_smiles in strategy_smiles:
                if aug_smiles != original_smiles:
                    new_row = row.to_dict().copy()
                    new_row[smiles_column] = aug_smiles
                    new_row['augmentation_strategy'] = strategy
                    new_row['is_augmented'] = True
                    augmented_rows.append(new_row)
                    stats[strategy] += 1
    
    augmented_df = pd.DataFrame(augmented_rows)
    augmented_df = augmented_df.reset_index(drop=True)
    
    # 打印详细统计信息
    print("\n增强结果统计:")
    print(f"原始数据集大小: {len(df)}")
    print(f"增强后数据集大小: {len(augmented_df)}")
    print(f"总体增强倍数: {len(augmented_df) / len(df):.2f}x")
    print("\n各策略贡献:")
    for strategy, count in stats.items():
        print(f"- {strategy}: {count}条记录 ({count/len(df):.2f}x)")
    
    return augmented_df

test = augment_smiles_dataset(test)


In [None]:
!pip install /kaggle/input/rdkit-2025-3-3-cp311/rdkit-2025.3.3-cp311-cp311-manylinux_2_28_x86_64.whl

huggingface/tokenizers: The current process just got forked, after parallelism has already been used. Disabling parallelism to avoid deadlocks...
	- Avoid using `tokenizers` before the fork if possible
	- Explicitly set the environment variable TOKENIZERS_PARALLELISM=(true | false)


Processing /kaggle/input/rdkit-2025-3-3-cp311/rdkit-2025.3.3-cp311-cp311-manylinux_2_28_x86_64.whl
rdkit is already installed with the same version as the provided wheel. Use --force-reinstall to force an installation of the wheel.


In [None]:
import torch
import pandas as pd
import joblib
from transformers import PreTrainedModel, AutoConfig, BertModel, BertTokenizerFast, BertConfig, AutoModel, AutoTokenizer
from sklearn.metrics import mean_absolute_error
from torch import nn
from transformers.activations import ACT2FN
from tqdm import tqdm
import numpy as np

class ContextPooler(nn.Module):
    def __init__(self, config):
        super().__init__()
        pooler_size = getattr(config, 'pooler_hidden_size', config.hidden_size)
        self.dense = nn.Linear(pooler_size, pooler_size)
        
        dropout_prob = getattr(config, 'pooler_dropout', config.hidden_dropout_prob)
        self.dropout = nn.Dropout(dropout_prob)
        
        self.activation = getattr(config, 'pooler_hidden_act', config.hidden_act)
        self.config = config

    def forward(self, hidden_states):
        context_token = hidden_states[:, 0] # CLS token
        context_token = self.dropout(context_token)
        pooled_output = self.dense(context_token)
        pooled_output = ACT2FN[self.activation](pooled_output)
        return pooled_output

class CustomModel(PreTrainedModel):
    def __init__(self, config):
        super().__init__(config)
        self.backbone = AutoModel.from_config(config)
        
        self.pooler = ContextPooler(config)

        pooler_output_dim = getattr(config, 'pooler_hidden_size', config.hidden_size)
        self.output = torch.nn.Linear(pooler_output_dim, 1) # Still predicting one label at a time. Kinda stupid

    def forward(
        self,
        input_ids,
        scaler,
        attention_mask=None,
        token_type_ids=None,
        position_ids=None,
        labels=None,
    ):
        outputs = self.backbone(
            input_ids,
            attention_mask=attention_mask,
            token_type_ids=token_type_ids,
            position_ids=position_ids,
        )

        pooled_output = self.pooler(outputs.last_hidden_state)
        
        # Final regression output
        regression_output = self.output(pooled_output)

        loss = None
        true_loss = None
        if labels is not None:
            loss_fn = torch.nn.MSELoss()

            unscaled_labels = scaler.inverse_transform(labels.cpu().numpy())
            unscaled_outputs = scaler.inverse_transform(regression_output.cpu().detach().numpy())
            
            loss = loss_fn(regression_output, labels)
            true_loss = mean_absolute_error(unscaled_outputs, unscaled_labels)

        return {
            "loss": loss,
            "logits": regression_output,
            "true_loss": true_loss
        }

In [None]:
BATCH_SIZE = 16

def tokenize_smiles(seq):
    seq = [tokenizer.cls_token + smiles for smiles in seq] # If we pass a string, tokenizer will smartly think we want to create a sequence for each symbol
    tokenized = tokenizer(seq, padding='max_length', truncation=True, max_length=512, return_tensors='pt')
    return tokenized

def load_model(path):
    config = AutoConfig.from_pretrained('/kaggle/input/smiles-deberta77m-tokenizer')
    model = CustomModel(config).cuda()
    checkpoint = torch.load(model_path)
    model.load_state_dict(checkpoint)
    return model


def make_predictions(model, scaler, smiles_seq):
    aggregated_preds = []
    for smiles in smiles_seq:
        smiles = [smiles]
        smiles_tokenized = tokenize_smiles(smiles)

        input_ids = smiles_tokenized['input_ids'].cuda()
        attention_mask = smiles_tokenized['attention_mask'].cuda()
        with torch.no_grad():
            preds = model(input_ids=input_ids, scaler=scaler, attention_mask=attention_mask)['logits'].cpu().numpy()
        
        true_preds = scaler.inverse_transform(preds).flatten()
        aggregated_preds.append(true_preds.tolist())
    return np.array(aggregated_preds)


test = pd.read_csv('/kaggle/input/neurips-open-polymer-prediction-2025/test.csv')
test_copy = test.copy()

smiles_test = test['SMILES'].to_list()

targets = ['Tg', 'FFV', 'Tc', 'Density', 'Rg']

scalers = joblib.load('/kaggle/input/smiles-bert-models/target_scalers.pkl')
tokenizer = AutoTokenizer.from_pretrained('/kaggle/input/smiles-deberta77m-tokenizer')

NameError: name 'test' is not defined

In [None]:
import pandas as pd
import numpy as np
from rdkit import Chem
import random
from typing import Optional, List, Union

def augment_smiles_dataset(df: pd.DataFrame,
                               smiles_column: str = 'SMILES',
                               augmentation_strategies: List[str] = ['enumeration', 'kekulize', 'stereo_enum'],
                               n_augmentations: int = 100,
                               preserve_original: bool = True,
                               random_seed: Optional[int] = None) -> pd.DataFrame:
    if random_seed is not None:
        random.seed(random_seed)
        np.random.seed(random_seed)
    
    def apply_augmentation_strategy(smiles: str, strategy: str) -> List[str]:
        try:
            mol = Chem.MolFromSmiles(smiles)
            if mol is None:
                return [smiles]
            
            augmented = []
            
            if strategy == 'enumeration':
                # Standard SMILES enumeration
                for _ in range(n_augmentations):
                    enum_smiles = Chem.MolToSmiles(mol, 
                                                 canonical=False, 
                                                 doRandom=True,
                                                 isomericSmiles=True)
                    augmented.append(enum_smiles)
            
            elif strategy == 'kekulize':
                # Kekulization variants
                try:
                    Chem.Kekulize(mol)
                    kek_smiles = Chem.MolToSmiles(mol, kekuleSmiles=True)
                    augmented.append(kek_smiles)
                except:
                    pass
            
            elif strategy == 'stereo_enum':
                # Stereochemistry enumeration
                for _ in range(n_augmentations // 2):
                    # Remove stereochemistry
                    Chem.RemoveStereochemistry(mol)
                    no_stereo = Chem.MolToSmiles(mol)
                    augmented.append(no_stereo)
            
            return list(set(augmented))  # Remove duplicates
            
        except Exception as e:
            print(f"Error in {strategy} for {smiles}: {e}")
            return [smiles]
    
    augmented_rows = []
    
    for idx, row in tqdm(df.iterrows(), total=len(df)):
        original_smiles = row[smiles_column]
        
        if preserve_original:
            original_row = row.to_dict()
            original_row['augmentation_strategy'] = 'original'
            original_row['is_augmented'] = False
            augmented_rows.append(original_row)
        
        for strategy in augmentation_strategies:
            strategy_smiles = apply_augmentation_strategy(original_smiles, strategy)
            
            for aug_smiles in strategy_smiles:
                if aug_smiles != original_smiles:
                    new_row = row.to_dict().copy()
                    new_row[smiles_column] = aug_smiles
                    new_row['augmentation_strategy'] = strategy
                    new_row['is_augmented'] = True
                    augmented_rows.append(new_row)
    
    augmented_df = pd.DataFrame(augmented_rows)
    augmented_df = augmented_df.reset_index(drop=True)
    
    print(f"Original size: {len(df)}, Augmented size: {len(augmented_df)}")
    print(f"Augmentation factor: {len(augmented_df) / len(df):.2f}x")
    
    return augmented_df

test = augment_smiles_dataset(test)

In [None]:
preds_mapping = {}

for i in tqdm(range(len(targets))):
    target = targets[i]
    scaler = scalers[i]

    model_path = f'/kaggle/input/smiles-bert-models/trained_smiles_model_{target}_target.pth' # Very sophisticated staff
    model = load_model(model_path)
    true_preds = []

    for i, data in test.groupby('id'):
        test_smiles = data['SMILES'].to_list()
        augmented_preds = make_predictions(model, scaler, test_smiles)
    
        average_pred = np.median(augmented_preds)
    
        true_preds.append(float(average_pred.flatten()[0]))

    preds_mapping[target] = true_preds

In [None]:
submission = pd.DataFrame(preds_mapping)
submission['id'] = test_copy['id']
submission.to_csv('submission.csv', index=False)

Training notebook: [here](https://www.kaggle.com/code/defdet/polymer-bert-train?scriptVersionId=246123151)

In [None]:
!pip install /kaggle/input/rdkit-2025-3-3-cp311/rdkit-2025.3.3-cp311-cp311-manylinux_2_28_x86_64.whl

huggingface/tokenizers: The current process just got forked, after parallelism has already been used. Disabling parallelism to avoid deadlocks...
	- Avoid using `tokenizers` before the fork if possible
	- Explicitly set the environment variable TOKENIZERS_PARALLELISM=(true | false)


Processing /kaggle/input/rdkit-2025-3-3-cp311/rdkit-2025.3.3-cp311-cp311-manylinux_2_28_x86_64.whl
rdkit is already installed with the same version as the provided wheel. Use --force-reinstall to force an installation of the wheel.


In [None]:
import torch
import pandas as pd
import joblib
from transformers import PreTrainedModel, AutoConfig, BertModel, BertTokenizerFast, BertConfig, AutoModel, AutoTokenizer
from sklearn.metrics import mean_absolute_error
from torch import nn
from transformers.activations import ACT2FN
from tqdm import tqdm
import numpy as np

class ContextPooler(nn.Module):
    def __init__(self, config):
        super().__init__()
        pooler_size = getattr(config, 'pooler_hidden_size', config.hidden_size)
        self.dense = nn.Linear(pooler_size, pooler_size)
        
        dropout_prob = getattr(config, 'pooler_dropout', config.hidden_dropout_prob)
        self.dropout = nn.Dropout(dropout_prob)
        
        self.activation = getattr(config, 'pooler_hidden_act', config.hidden_act)
        self.config = config

    def forward(self, hidden_states):
        context_token = hidden_states[:, 0] # CLS token
        context_token = self.dropout(context_token)
        pooled_output = self.dense(context_token)
        pooled_output = ACT2FN[self.activation](pooled_output)
        return pooled_output

class CustomModel(PreTrainedModel):
    def __init__(self, config):
        super().__init__(config)
        self.backbone = AutoModel.from_config(config)
        
        self.pooler = ContextPooler(config)

        pooler_output_dim = getattr(config, 'pooler_hidden_size', config.hidden_size)
        self.output = torch.nn.Linear(pooler_output_dim, 1) # Still predicting one label at a time. Kinda stupid

    def forward(
        self,
        input_ids,
        scaler,
        attention_mask=None,
        token_type_ids=None,
        position_ids=None,
        labels=None,
    ):
        outputs = self.backbone(
            input_ids,
            attention_mask=attention_mask,
            token_type_ids=token_type_ids,
            position_ids=position_ids,
        )

        pooled_output = self.pooler(outputs.last_hidden_state)
        
        # Final regression output
        regression_output = self.output(pooled_output)

        loss = None
        true_loss = None
        if labels is not None:
            loss_fn = torch.nn.MSELoss()

            unscaled_labels = scaler.inverse_transform(labels.cpu().numpy())
            unscaled_outputs = scaler.inverse_transform(regression_output.cpu().detach().numpy())
            
            loss = loss_fn(regression_output, labels)
            true_loss = mean_absolute_error(unscaled_outputs, unscaled_labels)

        return {
            "loss": loss,
            "logits": regression_output,
            "true_loss": true_loss
        }

In [None]:
BATCH_SIZE = 16

def tokenize_smiles(seq):
    seq = [tokenizer.cls_token + smiles for smiles in seq] # If we pass a string, tokenizer will smartly think we want to create a sequence for each symbol
    tokenized = tokenizer(seq, padding='max_length', truncation=True, max_length=512, return_tensors='pt')
    return tokenized

def load_model(path):
    config = AutoConfig.from_pretrained('/kaggle/input/smiles-deberta77m-tokenizer')
    model = CustomModel(config).cuda()
    checkpoint = torch.load(model_path)
    model.load_state_dict(checkpoint)
    return model


def make_predictions(model, scaler, smiles_seq):
    aggregated_preds = []
    for smiles in smiles_seq:
        smiles = [smiles]
        smiles_tokenized = tokenize_smiles(smiles)

        input_ids = smiles_tokenized['input_ids'].cuda()
        attention_mask = smiles_tokenized['attention_mask'].cuda()
        with torch.no_grad():
            preds = model(input_ids=input_ids, scaler=scaler, attention_mask=attention_mask)['logits'].cpu().numpy()
        
        true_preds = scaler.inverse_transform(preds).flatten()
        aggregated_preds.append(true_preds.tolist())
    return np.array(aggregated_preds)


test = pd.read_csv('/kaggle/input/neurips-open-polymer-prediction-2025/test.csv')
test_copy = test.copy()

smiles_test = test['SMILES'].to_list()

targets = ['Tg', 'FFV', 'Tc', 'Density', 'Rg']

scalers = joblib.load('/kaggle/input/smiles-bert-models/target_scalers.pkl')
tokenizer = AutoTokenizer.from_pretrained('/kaggle/input/smiles-deberta77m-tokenizer')

NameError: name 'test' is not defined

In [None]:
import pandas as pd
import numpy as np
from rdkit import Chem
import random
from typing import Optional, List, Union

def augment_smiles_dataset(df: pd.DataFrame,
                               smiles_column: str = 'SMILES',
                               augmentation_strategies: List[str] = ['enumeration', 'kekulize', 'stereo_enum'],
                               n_augmentations: int = 100,
                               preserve_original: bool = True,
                               random_seed: Optional[int] = None) -> pd.DataFrame:
    if random_seed is not None:
        random.seed(random_seed)
        np.random.seed(random_seed)
    
    def apply_augmentation_strategy(smiles: str, strategy: str) -> List[str]:
        try:
            mol = Chem.MolFromSmiles(smiles)
            if mol is None:
                return [smiles]
            
            augmented = []
            
            if strategy == 'enumeration':
                # Standard SMILES enumeration
                for _ in range(n_augmentations):
                    enum_smiles = Chem.MolToSmiles(mol, 
                                                 canonical=False, 
                                                 doRandom=True,
                                                 isomericSmiles=True)
                    augmented.append(enum_smiles)
            
            elif strategy == 'kekulize':
                # Kekulization variants
                try:
                    Chem.Kekulize(mol)
                    kek_smiles = Chem.MolToSmiles(mol, kekuleSmiles=True)
                    augmented.append(kek_smiles)
                except:
                    pass
            
            elif strategy == 'stereo_enum':
                # Stereochemistry enumeration
                for _ in range(n_augmentations // 2):
                    # Remove stereochemistry
                    Chem.RemoveStereochemistry(mol)
                    no_stereo = Chem.MolToSmiles(mol)
                    augmented.append(no_stereo)
            
            return list(set(augmented))  # Remove duplicates
            
        except Exception as e:
            print(f"Error in {strategy} for {smiles}: {e}")
            return [smiles]
    
    augmented_rows = []
    
    for idx, row in tqdm(df.iterrows(), total=len(df)):
        original_smiles = row[smiles_column]
        
        if preserve_original:
            original_row = row.to_dict()
            original_row['augmentation_strategy'] = 'original'
            original_row['is_augmented'] = False
            augmented_rows.append(original_row)
        
        for strategy in augmentation_strategies:
            strategy_smiles = apply_augmentation_strategy(original_smiles, strategy)
            
            for aug_smiles in strategy_smiles:
                if aug_smiles != original_smiles:
                    new_row = row.to_dict().copy()
                    new_row[smiles_column] = aug_smiles
                    new_row['augmentation_strategy'] = strategy
                    new_row['is_augmented'] = True
                    augmented_rows.append(new_row)
    
    augmented_df = pd.DataFrame(augmented_rows)
    augmented_df = augmented_df.reset_index(drop=True)
    
    print(f"Original size: {len(df)}, Augmented size: {len(augmented_df)}")
    print(f"Augmentation factor: {len(augmented_df) / len(df):.2f}x")
    
    return augmented_df

test = augment_smiles_dataset(test)

In [None]:
preds_mapping = {}

for i in tqdm(range(len(targets))):
    target = targets[i]
    scaler = scalers[i]

    model_path = f'/kaggle/input/smiles-bert-models/trained_smiles_model_{target}_target.pth' # Very sophisticated staff
    model = load_model(model_path)
    true_preds = []

    for i, data in test.groupby('id'):
        test_smiles = data['SMILES'].to_list()
        augmented_preds = make_predictions(model, scaler, test_smiles)
    
        average_pred = np.median(augmented_preds)
    
        true_preds.append(float(average_pred.flatten()[0]))

    preds_mapping[target] = true_preds

In [None]:
submission = pd.DataFrame(preds_mapping)
submission['id'] = test_copy['id']
submission.to_csv('submission.csv', index=False)

Training notebook: [here](https://www.kaggle.com/code/defdet/polymer-bert-train?scriptVersionId=246123151)

In [None]:
!pip install /kaggle/input/rdkit-2025-3-3-cp311/rdkit-2025.3.3-cp311-cp311-manylinux_2_28_x86_64.whl

huggingface/tokenizers: The current process just got forked, after parallelism has already been used. Disabling parallelism to avoid deadlocks...
	- Avoid using `tokenizers` before the fork if possible
	- Explicitly set the environment variable TOKENIZERS_PARALLELISM=(true | false)


Processing /kaggle/input/rdkit-2025-3-3-cp311/rdkit-2025.3.3-cp311-cp311-manylinux_2_28_x86_64.whl
rdkit is already installed with the same version as the provided wheel. Use --force-reinstall to force an installation of the wheel.


In [None]:
import torch
import pandas as pd
import joblib
from transformers import PreTrainedModel, AutoConfig, BertModel, BertTokenizerFast, BertConfig, AutoModel, AutoTokenizer
from sklearn.metrics import mean_absolute_error
from torch import nn
from transformers.activations import ACT2FN
from tqdm import tqdm
import numpy as np

class ContextPooler(nn.Module):
    def __init__(self, config):
        super().__init__()
        pooler_size = getattr(config, 'pooler_hidden_size', config.hidden_size)
        self.dense = nn.Linear(pooler_size, pooler_size)
        
        dropout_prob = getattr(config, 'pooler_dropout', config.hidden_dropout_prob)
        self.dropout = nn.Dropout(dropout_prob)
        
        self.activation = getattr(config, 'pooler_hidden_act', config.hidden_act)
        self.config = config

    def forward(self, hidden_states):
        context_token = hidden_states[:, 0] # CLS token
        context_token = self.dropout(context_token)
        pooled_output = self.dense(context_token)
        pooled_output = ACT2FN[self.activation](pooled_output)
        return pooled_output

class CustomModel(PreTrainedModel):
    def __init__(self, config):
        super().__init__(config)
        self.backbone = AutoModel.from_config(config)
        
        self.pooler = ContextPooler(config)

        pooler_output_dim = getattr(config, 'pooler_hidden_size', config.hidden_size)
        self.output = torch.nn.Linear(pooler_output_dim, 1) # Still predicting one label at a time. Kinda stupid

    def forward(
        self,
        input_ids,
        scaler,
        attention_mask=None,
        token_type_ids=None,
        position_ids=None,
        labels=None,
    ):
        outputs = self.backbone(
            input_ids,
            attention_mask=attention_mask,
            token_type_ids=token_type_ids,
            position_ids=position_ids,
        )

        pooled_output = self.pooler(outputs.last_hidden_state)
        
        # Final regression output
        regression_output = self.output(pooled_output)

        loss = None
        true_loss = None
        if labels is not None:
            loss_fn = torch.nn.MSELoss()

            unscaled_labels = scaler.inverse_transform(labels.cpu().numpy())
            unscaled_outputs = scaler.inverse_transform(regression_output.cpu().detach().numpy())
            
            loss = loss_fn(regression_output, labels)
            true_loss = mean_absolute_error(unscaled_outputs, unscaled_labels)

        return {
            "loss": loss,
            "logits": regression_output,
            "true_loss": true_loss
        }

In [None]:
BATCH_SIZE = 16

def tokenize_smiles(seq):
    seq = [tokenizer.cls_token + smiles for smiles in seq] # If we pass a string, tokenizer will smartly think we want to create a sequence for each symbol
    tokenized = tokenizer(seq, padding='max_length', truncation=True, max_length=512, return_tensors='pt')
    return tokenized

def load_model(path):
    config = AutoConfig.from_pretrained('/kaggle/input/smiles-deberta77m-tokenizer')
    model = CustomModel(config).cuda()
    checkpoint = torch.load(model_path)
    model.load_state_dict(checkpoint)
    return model


def make_predictions(model, scaler, smiles_seq):
    aggregated_preds = []
    for smiles in smiles_seq:
        smiles = [smiles]
        smiles_tokenized = tokenize_smiles(smiles)

        input_ids = smiles_tokenized['input_ids'].cuda()
        attention_mask = smiles_tokenized['attention_mask'].cuda()
        with torch.no_grad():
            preds = model(input_ids=input_ids, scaler=scaler, attention_mask=attention_mask)['logits'].cpu().numpy()
        
        true_preds = scaler.inverse_transform(preds).flatten()
        aggregated_preds.append(true_preds.tolist())
    return np.array(aggregated_preds)


test = pd.read_csv('/kaggle/input/neurips-open-polymer-prediction-2025/test.csv')
test_copy = test.copy()

smiles_test = test['SMILES'].to_list()

targets = ['Tg', 'FFV', 'Tc', 'Density', 'Rg']

scalers = joblib.load('/kaggle/input/smiles-bert-models/target_scalers.pkl')
tokenizer = AutoTokenizer.from_pretrained('/kaggle/input/smiles-deberta77m-tokenizer')

NameError: name 'test' is not defined

In [None]:
import pandas as pd
import numpy as np
from rdkit import Chem
import random
from typing import Optional, List, Union

def augment_smiles_dataset(df: pd.DataFrame,
                               smiles_column: str = 'SMILES',
                               augmentation_strategies: List[str] = ['enumeration', 'kekulize', 'stereo_enum'],
                               n_augmentations: int = 100,
                               preserve_original: bool = True,
                               random_seed: Optional[int] = None) -> pd.DataFrame:
    if random_seed is not None:
        random.seed(random_seed)
        np.random.seed(random_seed)
    
    def apply_augmentation_strategy(smiles: str, strategy: str) -> List[str]:
        try:
            mol = Chem.MolFromSmiles(smiles)
            if mol is None:
                return [smiles]
            
            augmented = []
            
            if strategy == 'enumeration':
                # Standard SMILES enumeration
                for _ in range(n_augmentations):
                    enum_smiles = Chem.MolToSmiles(mol, 
                                                 canonical=False, 
                                                 doRandom=True,
                                                 isomericSmiles=True)
                    augmented.append(enum_smiles)
            
            elif strategy == 'kekulize':
                # Kekulization variants
                try:
                    Chem.Kekulize(mol)
                    kek_smiles = Chem.MolToSmiles(mol, kekuleSmiles=True)
                    augmented.append(kek_smiles)
                except:
                    pass
            
            elif strategy == 'stereo_enum':
                # Stereochemistry enumeration
                for _ in range(n_augmentations // 2):
                    # Remove stereochemistry
                    Chem.RemoveStereochemistry(mol)
                    no_stereo = Chem.MolToSmiles(mol)
                    augmented.append(no_stereo)
            
            return list(set(augmented))  # Remove duplicates
            
        except Exception as e:
            print(f"Error in {strategy} for {smiles}: {e}")
            return [smiles]
    
    augmented_rows = []
    
    for idx, row in tqdm(df.iterrows(), total=len(df)):
        original_smiles = row[smiles_column]
        
        if preserve_original:
            original_row = row.to_dict()
            original_row['augmentation_strategy'] = 'original'
            original_row['is_augmented'] = False
            augmented_rows.append(original_row)
        
        for strategy in augmentation_strategies:
            strategy_smiles = apply_augmentation_strategy(original_smiles, strategy)
            
            for aug_smiles in strategy_smiles:
                if aug_smiles != original_smiles:
                    new_row = row.to_dict().copy()
                    new_row[smiles_column] = aug_smiles
                    new_row['augmentation_strategy'] = strategy
                    new_row['is_augmented'] = True
                    augmented_rows.append(new_row)
    
    augmented_df = pd.DataFrame(augmented_rows)
    augmented_df = augmented_df.reset_index(drop=True)
    
    print(f"Original size: {len(df)}, Augmented size: {len(augmented_df)}")
    print(f"Augmentation factor: {len(augmented_df) / len(df):.2f}x")
    
    return augmented_df

test = augment_smiles_dataset(test)

In [None]:
preds_mapping = {}

for i in tqdm(range(len(targets))):
    target = targets[i]
    scaler = scalers[i]

    model_path = f'/kaggle/input/smiles-bert-models/trained_smiles_model_{target}_target.pth' # Very sophisticated staff
    model = load_model(model_path)
    true_preds = []

    for i, data in test.groupby('id'):
        test_smiles = data['SMILES'].to_list()
        augmented_preds = make_predictions(model, scaler, test_smiles)
    
        average_pred = np.median(augmented_preds)
    
        true_preds.append(float(average_pred.flatten()[0]))

    preds_mapping[target] = true_preds

In [None]:
submission = pd.DataFrame(preds_mapping)
submission['id'] = test_copy['id']
submission.to_csv('submission.csv', index=False)

Training notebook: [here](https://www.kaggle.com/code/defdet/polymer-bert-train?scriptVersionId=246123151)

In [None]:
!pip install /kaggle/input/rdkit-2025-3-3-cp311/rdkit-2025.3.3-cp311-cp311-manylinux_2_28_x86_64.whl

huggingface/tokenizers: The current process just got forked, after parallelism has already been used. Disabling parallelism to avoid deadlocks...
	- Avoid using `tokenizers` before the fork if possible
	- Explicitly set the environment variable TOKENIZERS_PARALLELISM=(true | false)


Processing /kaggle/input/rdkit-2025-3-3-cp311/rdkit-2025.3.3-cp311-cp311-manylinux_2_28_x86_64.whl
rdkit is already installed with the same version as the provided wheel. Use --force-reinstall to force an installation of the wheel.


In [None]:
import torch
import pandas as pd
import joblib
from transformers import PreTrainedModel, AutoConfig, BertModel, BertTokenizerFast, BertConfig, AutoModel, AutoTokenizer
from sklearn.metrics import mean_absolute_error
from torch import nn
from transformers.activations import ACT2FN
from tqdm import tqdm
import numpy as np

class ContextPooler(nn.Module):
    def __init__(self, config):
        super().__init__()
        pooler_size = getattr(config, 'pooler_hidden_size', config.hidden_size)
        self.dense = nn.Linear(pooler_size, pooler_size)
        
        dropout_prob = getattr(config, 'pooler_dropout', config.hidden_dropout_prob)
        self.dropout = nn.Dropout(dropout_prob)
        
        self.activation = getattr(config, 'pooler_hidden_act', config.hidden_act)
        self.config = config

    def forward(self, hidden_states):
        context_token = hidden_states[:, 0] # CLS token
        context_token = self.dropout(context_token)
        pooled_output = self.dense(context_token)
        pooled_output = ACT2FN[self.activation](pooled_output)
        return pooled_output

class CustomModel(PreTrainedModel):
    def __init__(self, config):
        super().__init__(config)
        self.backbone = AutoModel.from_config(config)
        
        self.pooler = ContextPooler(config)

        pooler_output_dim = getattr(config, 'pooler_hidden_size', config.hidden_size)
        self.output = torch.nn.Linear(pooler_output_dim, 1) # Still predicting one label at a time. Kinda stupid

    def forward(
        self,
        input_ids,
        scaler,
        attention_mask=None,
        token_type_ids=None,
        position_ids=None,
        labels=None,
    ):
        outputs = self.backbone(
            input_ids,
            attention_mask=attention_mask,
            token_type_ids=token_type_ids,
            position_ids=position_ids,
        )

        pooled_output = self.pooler(outputs.last_hidden_state)
        
        # Final regression output
        regression_output = self.output(pooled_output)

        loss = None
        true_loss = None
        if labels is not None:
            loss_fn = torch.nn.MSELoss()

            unscaled_labels = scaler.inverse_transform(labels.cpu().numpy())
            unscaled_outputs = scaler.inverse_transform(regression_output.cpu().detach().numpy())
            
            loss = loss_fn(regression_output, labels)
            true_loss = mean_absolute_error(unscaled_outputs, unscaled_labels)

        return {
            "loss": loss,
            "logits": regression_output,
            "true_loss": true_loss
        }

In [None]:
BATCH_SIZE = 16

def tokenize_smiles(seq):
    seq = [tokenizer.cls_token + smiles for smiles in seq] # If we pass a string, tokenizer will smartly think we want to create a sequence for each symbol
    tokenized = tokenizer(seq, padding='max_length', truncation=True, max_length=512, return_tensors='pt')
    return tokenized

def load_model(path):
    config = AutoConfig.from_pretrained('/kaggle/input/smiles-deberta77m-tokenizer')
    model = CustomModel(config).cuda()
    checkpoint = torch.load(model_path)
    model.load_state_dict(checkpoint)
    return model


def make_predictions(model, scaler, smiles_seq):
    aggregated_preds = []
    for smiles in smiles_seq:
        smiles = [smiles]
        smiles_tokenized = tokenize_smiles(smiles)

        input_ids = smiles_tokenized['input_ids'].cuda()
        attention_mask = smiles_tokenized['attention_mask'].cuda()
        with torch.no_grad():
            preds = model(input_ids=input_ids, scaler=scaler, attention_mask=attention_mask)['logits'].cpu().numpy()
        
        true_preds = scaler.inverse_transform(preds).flatten()
        aggregated_preds.append(true_preds.tolist())
    return np.array(aggregated_preds)


test = pd.read_csv('/kaggle/input/neurips-open-polymer-prediction-2025/test.csv')
test_copy = test.copy()

smiles_test = test['SMILES'].to_list()

targets = ['Tg', 'FFV', 'Tc', 'Density', 'Rg']

scalers = joblib.load('/kaggle/input/smiles-bert-models/target_scalers.pkl')
tokenizer = AutoTokenizer.from_pretrained('/kaggle/input/smiles-deberta77m-tokenizer')

NameError: name 'test' is not defined

In [None]:
import pandas as pd
import numpy as np
from rdkit import Chem
import random
from typing import Optional, List, Union

def augment_smiles_dataset(df: pd.DataFrame,
                               smiles_column: str = 'SMILES',
                               augmentation_strategies: List[str] = ['enumeration', 'kekulize', 'stereo_enum'],
                               n_augmentations: int = 100,
                               preserve_original: bool = True,
                               random_seed: Optional[int] = None) -> pd.DataFrame:
    if random_seed is not None:
        random.seed(random_seed)
        np.random.seed(random_seed)
    
    def apply_augmentation_strategy(smiles: str, strategy: str) -> List[str]:
        try:
            mol = Chem.MolFromSmiles(smiles)
            if mol is None:
                return [smiles]
            
            augmented = []
            
            if strategy == 'enumeration':
                # Standard SMILES enumeration
                for _ in range(n_augmentations):
                    enum_smiles = Chem.MolToSmiles(mol, 
                                                 canonical=False, 
                                                 doRandom=True,
                                                 isomericSmiles=True)
                    augmented.append(enum_smiles)
            
            elif strategy == 'kekulize':
                # Kekulization variants
                try:
                    Chem.Kekulize(mol)
                    kek_smiles = Chem.MolToSmiles(mol, kekuleSmiles=True)
                    augmented.append(kek_smiles)
                except:
                    pass
            
            elif strategy == 'stereo_enum':
                # Stereochemistry enumeration
                for _ in range(n_augmentations // 2):
                    # Remove stereochemistry
                    Chem.RemoveStereochemistry(mol)
                    no_stereo = Chem.MolToSmiles(mol)
                    augmented.append(no_stereo)
            
            return list(set(augmented))  # Remove duplicates
            
        except Exception as e:
            print(f"Error in {strategy} for {smiles}: {e}")
            return [smiles]
    
    augmented_rows = []
    
    for idx, row in tqdm(df.iterrows(), total=len(df)):
        original_smiles = row[smiles_column]
        
        if preserve_original:
            original_row = row.to_dict()
            original_row['augmentation_strategy'] = 'original'
            original_row['is_augmented'] = False
            augmented_rows.append(original_row)
        
        for strategy in augmentation_strategies:
            strategy_smiles = apply_augmentation_strategy(original_smiles, strategy)
            
            for aug_smiles in strategy_smiles:
                if aug_smiles != original_smiles:
                    new_row = row.to_dict().copy()
                    new_row[smiles_column] = aug_smiles
                    new_row['augmentation_strategy'] = strategy
                    new_row['is_augmented'] = True
                    augmented_rows.append(new_row)
    
    augmented_df = pd.DataFrame(augmented_rows)
    augmented_df = augmented_df.reset_index(drop=True)
    
    print(f"Original size: {len(df)}, Augmented size: {len(augmented_df)}")
    print(f"Augmentation factor: {len(augmented_df) / len(df):.2f}x")
    
    return augmented_df

test = augment_smiles_dataset(test)

In [None]:
preds_mapping = {}

for i in tqdm(range(len(targets))):
    target = targets[i]
    scaler = scalers[i]

    model_path = f'/kaggle/input/smiles-bert-models/trained_smiles_model_{target}_target.pth' # Very sophisticated staff
    model = load_model(model_path)
    true_preds = []

    for i, data in test.groupby('id'):
        test_smiles = data['SMILES'].to_list()
        augmented_preds = make_predictions(model, scaler, test_smiles)
    
        average_pred = np.median(augmented_preds)
    
        true_preds.append(float(average_pred.flatten()[0]))

    preds_mapping[target] = true_preds

In [None]:
submission = pd.DataFrame(preds_mapping)
submission['id'] = test_copy['id']
submission.to_csv('submission.csv', index=False)

Training notebook: [here](https://www.kaggle.com/code/defdet/polymer-bert-train?scriptVersionId=246123151)

In [None]:
!pip install /kaggle/input/rdkit-2025-3-3-cp311/rdkit-2025.3.3-cp311-cp311-manylinux_2_28_x86_64.whl

huggingface/tokenizers: The current process just got forked, after parallelism has already been used. Disabling parallelism to avoid deadlocks...
	- Avoid using `tokenizers` before the fork if possible
	- Explicitly set the environment variable TOKENIZERS_PARALLELISM=(true | false)


Processing /kaggle/input/rdkit-2025-3-3-cp311/rdkit-2025.3.3-cp311-cp311-manylinux_2_28_x86_64.whl
rdkit is already installed with the same version as the provided wheel. Use --force-reinstall to force an installation of the wheel.


In [None]:
import torch
import pandas as pd
import joblib
from transformers import PreTrainedModel, AutoConfig, BertModel, BertTokenizerFast, BertConfig, AutoModel, AutoTokenizer
from sklearn.metrics import mean_absolute_error
from torch import nn
from transformers.activations import ACT2FN
from tqdm import tqdm
import numpy as np

class ContextPooler(nn.Module):
    def __init__(self, config):
        super().__init__()
        pooler_size = getattr(config, 'pooler_hidden_size', config.hidden_size)
        self.dense = nn.Linear(pooler_size, pooler_size)
        
        dropout_prob = getattr(config, 'pooler_dropout', config.hidden_dropout_prob)
        self.dropout = nn.Dropout(dropout_prob)
        
        self.activation = getattr(config, 'pooler_hidden_act', config.hidden_act)
        self.config = config

    def forward(self, hidden_states):
        context_token = hidden_states[:, 0] # CLS token
        context_token = self.dropout(context_token)
        pooled_output = self.dense(context_token)
        pooled_output = ACT2FN[self.activation](pooled_output)
        return pooled_output

class CustomModel(PreTrainedModel):
    def __init__(self, config):
        super().__init__(config)
        self.backbone = AutoModel.from_config(config)
        
        self.pooler = ContextPooler(config)

        pooler_output_dim = getattr(config, 'pooler_hidden_size', config.hidden_size)
        self.output = torch.nn.Linear(pooler_output_dim, 1) # Still predicting one label at a time. Kinda stupid

    def forward(
        self,
        input_ids,
        scaler,
        attention_mask=None,
        token_type_ids=None,
        position_ids=None,
        labels=None,
    ):
        outputs = self.backbone(
            input_ids,
            attention_mask=attention_mask,
            token_type_ids=token_type_ids,
            position_ids=position_ids,
        )

        pooled_output = self.pooler(outputs.last_hidden_state)
        
        # Final regression output
        regression_output = self.output(pooled_output)

        loss = None
        true_loss = None
        if labels is not None:
            loss_fn = torch.nn.MSELoss()

            unscaled_labels = scaler.inverse_transform(labels.cpu().numpy())
            unscaled_outputs = scaler.inverse_transform(regression_output.cpu().detach().numpy())
            
            loss = loss_fn(regression_output, labels)
            true_loss = mean_absolute_error(unscaled_outputs, unscaled_labels)

        return {
            "loss": loss,
            "logits": regression_output,
            "true_loss": true_loss
        }

In [None]:
BATCH_SIZE = 16

def tokenize_smiles(seq):
    seq = [tokenizer.cls_token + smiles for smiles in seq] # If we pass a string, tokenizer will smartly think we want to create a sequence for each symbol
    tokenized = tokenizer(seq, padding='max_length', truncation=True, max_length=512, return_tensors='pt')
    return tokenized

def load_model(path):
    config = AutoConfig.from_pretrained('/kaggle/input/smiles-deberta77m-tokenizer')
    model = CustomModel(config).cuda()
    checkpoint = torch.load(model_path)
    model.load_state_dict(checkpoint)
    return model


def make_predictions(model, scaler, smiles_seq):
    aggregated_preds = []
    for smiles in smiles_seq:
        smiles = [smiles]
        smiles_tokenized = tokenize_smiles(smiles)

        input_ids = smiles_tokenized['input_ids'].cuda()
        attention_mask = smiles_tokenized['attention_mask'].cuda()
        with torch.no_grad():
            preds = model(input_ids=input_ids, scaler=scaler, attention_mask=attention_mask)['logits'].cpu().numpy()
        
        true_preds = scaler.inverse_transform(preds).flatten()
        aggregated_preds.append(true_preds.tolist())
    return np.array(aggregated_preds)


test = pd.read_csv('/kaggle/input/neurips-open-polymer-prediction-2025/test.csv')
test_copy = test.copy()

smiles_test = test['SMILES'].to_list()

targets = ['Tg', 'FFV', 'Tc', 'Density', 'Rg']

scalers = joblib.load('/kaggle/input/smiles-bert-models/target_scalers.pkl')
tokenizer = AutoTokenizer.from_pretrained('/kaggle/input/smiles-deberta77m-tokenizer')

NameError: name 'test' is not defined

In [None]:
import pandas as pd
import numpy as np
from rdkit import Chem
import random
from typing import Optional, List, Union

def augment_smiles_dataset(df: pd.DataFrame,
                               smiles_column: str = 'SMILES',
                               augmentation_strategies: List[str] = ['enumeration', 'kekulize', 'stereo_enum'],
                               n_augmentations: int = 100,
                               preserve_original: bool = True,
                               random_seed: Optional[int] = None) -> pd.DataFrame:
    if random_seed is not None:
        random.seed(random_seed)
        np.random.seed(random_seed)
    
    def apply_augmentation_strategy(smiles: str, strategy: str) -> List[str]:
        try:
            mol = Chem.MolFromSmiles(smiles)
            if mol is None:
                return [smiles]
            
            augmented = []
            
            if strategy == 'enumeration':
                # Standard SMILES enumeration
                for _ in range(n_augmentations):
                    enum_smiles = Chem.MolToSmiles(mol, 
                                                 canonical=False, 
                                                 doRandom=True,
                                                 isomericSmiles=True)
                    augmented.append(enum_smiles)
            
            elif strategy == 'kekulize':
                # Kekulization variants
                try:
                    Chem.Kekulize(mol)
                    kek_smiles = Chem.MolToSmiles(mol, kekuleSmiles=True)
                    augmented.append(kek_smiles)
                except:
                    pass
            
            elif strategy == 'stereo_enum':
                # Stereochemistry enumeration
                for _ in range(n_augmentations // 2):
                    # Remove stereochemistry
                    Chem.RemoveStereochemistry(mol)
                    no_stereo = Chem.MolToSmiles(mol)
                    augmented.append(no_stereo)
            
            return list(set(augmented))  # Remove duplicates
            
        except Exception as e:
            print(f"Error in {strategy} for {smiles}: {e}")
            return [smiles]
    
    augmented_rows = []
    
    for idx, row in tqdm(df.iterrows(), total=len(df)):
        original_smiles = row[smiles_column]
        
        if preserve_original:
            original_row = row.to_dict()
            original_row['augmentation_strategy'] = 'original'
            original_row['is_augmented'] = False
            augmented_rows.append(original_row)
        
        for strategy in augmentation_strategies:
            strategy_smiles = apply_augmentation_strategy(original_smiles, strategy)
            
            for aug_smiles in strategy_smiles:
                if aug_smiles != original_smiles:
                    new_row = row.to_dict().copy()
                    new_row[smiles_column] = aug_smiles
                    new_row['augmentation_strategy'] = strategy
                    new_row['is_augmented'] = True
                    augmented_rows.append(new_row)
    
    augmented_df = pd.DataFrame(augmented_rows)
    augmented_df = augmented_df.reset_index(drop=True)
    
    print(f"Original size: {len(df)}, Augmented size: {len(augmented_df)}")
    print(f"Augmentation factor: {len(augmented_df) / len(df):.2f}x")
    
    return augmented_df

test = augment_smiles_dataset(test)

In [None]:
preds_mapping = {}

for i in tqdm(range(len(targets))):
    target = targets[i]
    scaler = scalers[i]

    model_path = f'/kaggle/input/smiles-bert-models/trained_smiles_model_{target}_target.pth' # Very sophisticated staff
    model = load_model(model_path)
    true_preds = []

    for i, data in test.groupby('id'):
        test_smiles = data['SMILES'].to_list()
        augmented_preds = make_predictions(model, scaler, test_smiles)
    
        average_pred = np.median(augmented_preds)
    
        true_preds.append(float(average_pred.flatten()[0]))

    preds_mapping[target] = true_preds

In [None]:
submission = pd.DataFrame(preds_mapping)
submission['id'] = test_copy['id']
submission.to_csv('submission.csv', index=False)

Training notebook: [here](https://www.kaggle.com/code/defdet/polymer-bert-train?scriptVersionId=246123151)

In [5]:
!pip install /kaggle/input/rdkit-2025-3-3-cp311/rdkit-2025.3.3-cp311-cp311-manylinux_2_28_x86_64.whl

huggingface/tokenizers: The current process just got forked, after parallelism has already been used. Disabling parallelism to avoid deadlocks...
	- Avoid using `tokenizers` before the fork if possible
	- Explicitly set the environment variable TOKENIZERS_PARALLELISM=(true | false)


Processing /kaggle/input/rdkit-2025-3-3-cp311/rdkit-2025.3.3-cp311-cp311-manylinux_2_28_x86_64.whl
rdkit is already installed with the same version as the provided wheel. Use --force-reinstall to force an installation of the wheel.


In [6]:
import torch
import pandas as pd
import joblib
from transformers import PreTrainedModel, AutoConfig, BertModel, BertTokenizerFast, BertConfig, AutoModel, AutoTokenizer
from sklearn.metrics import mean_absolute_error
from torch import nn
from transformers.activations import ACT2FN
from tqdm import tqdm
import numpy as np

class ContextPooler(nn.Module):
    def __init__(self, config):
        super().__init__()
        pooler_size = getattr(config, 'pooler_hidden_size', config.hidden_size)
        self.dense = nn.Linear(pooler_size, pooler_size)
        
        dropout_prob = getattr(config, 'pooler_dropout', config.hidden_dropout_prob)
        self.dropout = nn.Dropout(dropout_prob)
        
        self.activation = getattr(config, 'pooler_hidden_act', config.hidden_act)
        self.config = config

    def forward(self, hidden_states):
        context_token = hidden_states[:, 0] # CLS token
        context_token = self.dropout(context_token)
        pooled_output = self.dense(context_token)
        pooled_output = ACT2FN[self.activation](pooled_output)
        return pooled_output

class CustomModel(PreTrainedModel):
    def __init__(self, config):
        super().__init__(config)
        self.backbone = AutoModel.from_config(config)
        
        self.pooler = ContextPooler(config)

        pooler_output_dim = getattr(config, 'pooler_hidden_size', config.hidden_size)
        self.output = torch.nn.Linear(pooler_output_dim, 1) # Still predicting one label at a time. Kinda stupid

    def forward(
        self,
        input_ids,
        scaler,
        attention_mask=None,
        token_type_ids=None,
        position_ids=None,
        labels=None,
    ):
        outputs = self.backbone(
            input_ids,
            attention_mask=attention_mask,
            token_type_ids=token_type_ids,
            position_ids=position_ids,
        )

        pooled_output = self.pooler(outputs.last_hidden_state)
        
        # Final regression output
        regression_output = self.output(pooled_output)

        loss = None
        true_loss = None
        if labels is not None:
            loss_fn = torch.nn.MSELoss()

            unscaled_labels = scaler.inverse_transform(labels.cpu().numpy())
            unscaled_outputs = scaler.inverse_transform(regression_output.cpu().detach().numpy())
            
            loss = loss_fn(regression_output, labels)
            true_loss = mean_absolute_error(unscaled_outputs, unscaled_labels)

        return {
            "loss": loss,
            "logits": regression_output,
            "true_loss": true_loss
        }

In [7]:
BATCH_SIZE = 16

def tokenize_smiles(seq):
    seq = [tokenizer.cls_token + smiles for smiles in seq] # If we pass a string, tokenizer will smartly think we want to create a sequence for each symbol
    tokenized = tokenizer(seq, padding='max_length', truncation=True, max_length=512, return_tensors='pt')
    return tokenized

def load_model(path):
    config = AutoConfig.from_pretrained('/kaggle/input/smiles-deberta77m-tokenizer')
    model = CustomModel(config).cuda()
    checkpoint = torch.load(model_path)
    model.load_state_dict(checkpoint)
    return model


def make_predictions(model, scaler, smiles_seq):
    aggregated_preds = []
    for smiles in smiles_seq:
        smiles = [smiles]
        smiles_tokenized = tokenize_smiles(smiles)

        input_ids = smiles_tokenized['input_ids'].cuda()
        attention_mask = smiles_tokenized['attention_mask'].cuda()
        with torch.no_grad():
            preds = model(input_ids=input_ids, scaler=scaler, attention_mask=attention_mask)['logits'].cpu().numpy()
        
        true_preds = scaler.inverse_transform(preds).flatten()
        aggregated_preds.append(true_preds.tolist())
    return np.array(aggregated_preds)


test = pd.read_csv('/kaggle/input/neurips-open-polymer-prediction-2025/test.csv')
test_copy = test.copy()

smiles_test = test['SMILES'].to_list()

targets = ['Tg', 'FFV', 'Tc', 'Density', 'Rg']

scalers = joblib.load('/kaggle/input/smiles-bert-models/target_scalers.pkl')
tokenizer = AutoTokenizer.from_pretrained('/kaggle/input/smiles-deberta77m-tokenizer')

NameError: name 'test' is not defined

In [None]:
import pandas as pd
import numpy as np
from rdkit import Chem
import random
from typing import Optional, List, Union

def augment_smiles_dataset(df: pd.DataFrame,
                               smiles_column: str = 'SMILES',
                               augmentation_strategies: List[str] = ['enumeration', 'kekulize', 'stereo_enum'],
                               n_augmentations: int = 100,
                               preserve_original: bool = True,
                               random_seed: Optional[int] = None) -> pd.DataFrame:
    if random_seed is not None:
        random.seed(random_seed)
        np.random.seed(random_seed)
    
    def apply_augmentation_strategy(smiles: str, strategy: str) -> List[str]:
        try:
            mol = Chem.MolFromSmiles(smiles)
            if mol is None:
                return [smiles]
            
            augmented = []
            
            if strategy == 'enumeration':
                # Standard SMILES enumeration
                for _ in range(n_augmentations):
                    enum_smiles = Chem.MolToSmiles(mol, 
                                                 canonical=False, 
                                                 doRandom=True,
                                                 isomericSmiles=True)
                    augmented.append(enum_smiles)
            
            elif strategy == 'kekulize':
                # Kekulization variants
                try:
                    Chem.Kekulize(mol)
                    kek_smiles = Chem.MolToSmiles(mol, kekuleSmiles=True)
                    augmented.append(kek_smiles)
                except:
                    pass
            
            elif strategy == 'stereo_enum':
                # Stereochemistry enumeration
                for _ in range(n_augmentations // 2):
                    # Remove stereochemistry
                    Chem.RemoveStereochemistry(mol)
                    no_stereo = Chem.MolToSmiles(mol)
                    augmented.append(no_stereo)
            
            return list(set(augmented))  # Remove duplicates
            
        except Exception as e:
            print(f"Error in {strategy} for {smiles}: {e}")
            return [smiles]
    
    augmented_rows = []
    
    for idx, row in tqdm(df.iterrows(), total=len(df)):
        original_smiles = row[smiles_column]
        
        if preserve_original:
            original_row = row.to_dict()
            original_row['augmentation_strategy'] = 'original'
            original_row['is_augmented'] = False
            augmented_rows.append(original_row)
        
        for strategy in augmentation_strategies:
            strategy_smiles = apply_augmentation_strategy(original_smiles, strategy)
            
            for aug_smiles in strategy_smiles:
                if aug_smiles != original_smiles:
                    new_row = row.to_dict().copy()
                    new_row[smiles_column] = aug_smiles
                    new_row['augmentation_strategy'] = strategy
                    new_row['is_augmented'] = True
                    augmented_rows.append(new_row)
    
    augmented_df = pd.DataFrame(augmented_rows)
    augmented_df = augmented_df.reset_index(drop=True)
    
    print(f"Original size: {len(df)}, Augmented size: {len(augmented_df)}")
    print(f"Augmentation factor: {len(augmented_df) / len(df):.2f}x")
    
    return augmented_df

test = augment_smiles_dataset(test)

In [None]:
preds_mapping = {}

for i in tqdm(range(len(targets))):
    target = targets[i]
    scaler = scalers[i]

    model_path = f'/kaggle/input/smiles-bert-models/trained_smiles_model_{target}_target.pth' # Very sophisticated staff
    model = load_model(model_path)
    true_preds = []

    for i, data in test.groupby('id'):
        test_smiles = data['SMILES'].to_list()
        augmented_preds = make_predictions(model, scaler, test_smiles)
    
        average_pred = np.median(augmented_preds)
    
        true_preds.append(float(average_pred.flatten()[0]))

    preds_mapping[target] = true_preds

In [None]:
submission = pd.DataFrame(preds_mapping)
submission['id'] = test_copy['id']
submission.to_csv('submission.csv', index=False)

增强SMILES: 100%|██████████| 3/3 [00:00<00:00,  5.36it/s]

增强结果统计:
原始数据集大小: 3
增强后数据集大小: 3
总体增强倍数: 1.00x

各策略贡献:
- enumeration: 0条记录 (0.00x)
- kekulize: 0条记录 (0.00x)
- stereo_enum: 0条记录 (0.00x)
- fragment: 0条记录 (0.00x)
- rotate: 0条记录 (0.00x)
- original: 3条记录 (1.00x)

In [None]:
import pandas as pd
import numpy as np
from rdkit import Chem
from rdkit.Chem import AllChem
from rdkit.Chem import Draw
from rdkit.Chem.Draw import IPythonConsole
from rdkit.Chem import BRICS
import random
from typing import Optional, List, Union, Dict, Any
from tqdm import tqdm

def augment_smiles_dataset(df: pd.DataFrame,
                          smiles_column: str = 'SMILES',
                          augmentation_strategies: List[str] = ['enumeration', 'kekulize', 'stereo_enum', 'fragment', 'rotate'],
                          n_augmentations: int = 100,
                          preserve_original: bool = True,
                          random_seed: Optional[int] = None,
                          max_fragment_combinations: int = 2) -> pd.DataFrame:
    """
    增强的SMILES数据集生成器
    
    参数:
    - df: 输入的DataFrame
    - smiles_column: SMILES列的名称
    - augmentation_strategies: 增强策略列表
    - n_augmentations: 每个策略的增强次数
    - preserve_original: 是否保留原始SMILES
    - random_seed: 随机种子
    - max_fragment_combinations: 片段重组的最大组合数
    
    返回:
    - 增强后的DataFrame
    """
    if random_seed is not None:
        random.seed(random_seed)
        np.random.seed(random_seed)
    
    def sanitize_smiles(smiles: str) -> str:
        """净化SMILES字符串"""
        try:
            mol = Chem.MolFromSmiles(smiles)
            if mol is not None:
                return Chem.MolToSmiles(mol, canonical=True)
        except:
            pass
        return smiles
    
    def apply_augmentation_strategy(smiles: str, strategy: str) -> List[str]:
        try:
            mol = Chem.MolFromSmiles(smiles)
            if mol is None:
                return [smiles]
            
            augmented = set()  # 使用集合避免重复
            
            if strategy == 'enumeration':
                # 增强的SMILES枚举
                for _ in range(n_augmentations):
                    enum_smiles = Chem.MolToSmiles(mol, 
                                                 canonical=False, 
                                                 doRandom=True,
                                                 isomericSmiles=True)
                    augmented.add(sanitize_smiles(enum_smiles))
            
            elif strategy == 'kekulize':
                # Kekulization变体
                try:
                    Chem.Kekulize(mol)
                    kek_smiles = Chem.MolToSmiles(mol, kekuleSmiles=True)
                    augmented.add(sanitize_smiles(kek_smiles))
                    
                    # 添加不同的Kekulization形式
                    for _ in range(min(5, n_augmentations)):
                        new_mol = Chem.MolFromSmiles(kek_smiles)
                        if new_mol:
                            Chem.Kekulize(new_mol)
                            new_smiles = Chem.MolToSmiles(new_mol, kekuleSmiles=True, doRandom=True)
                            augmented.add(sanitize_smiles(new_smiles))
                except:
                    pass
            
            elif strategy == 'stereo_enum':
                # 立体化学枚举
                try:
                    # 移除立体化学
                    Chem.RemoveStereochemistry(mol)
                    no_stereo = Chem.MolToSmiles(mol)
                    augmented.add(sanitize_smiles(no_stereo))
                    
                    # 添加随机立体中心
                    for _ in range(n_augmentations // 2):
                        new_mol = Chem.MolFromSmiles(no_stereo)
                        if new_mol:
                            Chem.AssignStereochemistry(new_mol, force=True, cleanIt=True)
                            stereo_smiles = Chem.MolToSmiles(new_mol, isomericSmiles=True)
                            augmented.add(sanitize_smiles(stereo_smiles))
                except:
                    pass
            
            elif strategy == 'fragment':
                # BRICS分解和重组
                try:
                    fragments = list(BRICS.BRICSDecompose(mol))
                    if len(fragments) > 1:
                        # 随机组合片段
                        for _ in range(min(max_fragment_combinations, n_augmentations)):
                            n_frags = random.randint(2, min(len(fragments), 3))
                            selected_frags = random.sample(fragments, n_frags)
                            new_mol = BRICS.BRICSBuild([Chem.MolFromSmiles(f) for f in selected_frags])
                            if new_mol:
                                new_smiles = Chem.MolToSmiles(new_mol)
                                augmented.add(sanitize_smiles(new_smiles))
                except:
                    pass
            
            elif strategy == 'rotate':
                # 原子顺序旋转
                try:
                    for _ in range(n_augmentations // 2):
                        rot_smiles = Chem.MolToSmiles(mol, 
                                                     doRandom=True,
                                                     canonical=False,
                                                     rootedAtAtom=random.randint(0, mol.GetNumAtoms()-1))
                        augmented.add(sanitize_smiles(rot_smiles))
                except:
                    pass
            
            return list(augmented)
            
        except Exception as e:
            print(f"警告: {strategy}处理{smiles}时出错: {e}")
            return [smiles]
    
    augmented_rows = []
    stats: Dict[str, int] = {strategy: 0 for strategy in augmentation_strategies}
    stats['original'] = 0
    
    for idx, row in tqdm(df.iterrows(), total=len(df), desc="增强SMILES"):
        original_smiles = row[smiles_column]
        
        if preserve_original:
            original_row = row.to_dict()
            original_row['augmentation_strategy'] = 'original'
            original_row['is_augmented'] = False
            augmented_rows.append(original_row)
            stats['original'] += 1
        
        for strategy in augmentation_strategies:
            strategy_smiles = apply_augmentation_strategy(original_smiles, strategy)
            
            for aug_smiles in strategy_smiles:
                if aug_smiles != original_smiles:
                    new_row = row.to_dict().copy()
                    new_row[smiles_column] = aug_smiles
                    new_row['augmentation_strategy'] = strategy
                    new_row['is_augmented'] = True
                    augmented_rows.append(new_row)
                    stats[strategy] += 1
    
    augmented_df = pd.DataFrame(augmented_rows)
    augmented_df = augmented_df.reset_index(drop=True)
    
    # 打印详细统计信息
    print("\n增强结果统计:")
    print(f"原始数据集大小: {len(df)}")
    print(f"增强后数据集大小: {len(augmented_df)}")
    print(f"总体增强倍数: {len(augmented_df) / len(df):.2f}x")
    print("\n各策略贡献:")
    for strategy, count in stats.items():
        print(f"- {strategy}: {count}条记录 ({count/len(df):.2f}x)")
    
    return augmented_df

test = augment_smiles_dataset(test)


In [None]:
!pip install /kaggle/input/rdkit-2025-3-3-cp311/rdkit-2025.3.3-cp311-cp311-manylinux_2_28_x86_64.whl

huggingface/tokenizers: The current process just got forked, after parallelism has already been used. Disabling parallelism to avoid deadlocks...
	- Avoid using `tokenizers` before the fork if possible
	- Explicitly set the environment variable TOKENIZERS_PARALLELISM=(true | false)


Processing /kaggle/input/rdkit-2025-3-3-cp311/rdkit-2025.3.3-cp311-cp311-manylinux_2_28_x86_64.whl
rdkit is already installed with the same version as the provided wheel. Use --force-reinstall to force an installation of the wheel.


In [None]:
import torch
import pandas as pd
import joblib
from transformers import PreTrainedModel, AutoConfig, BertModel, BertTokenizerFast, BertConfig, AutoModel, AutoTokenizer
from sklearn.metrics import mean_absolute_error
from torch import nn
from transformers.activations import ACT2FN
from tqdm import tqdm
import numpy as np

class ContextPooler(nn.Module):
    def __init__(self, config):
        super().__init__()
        pooler_size = getattr(config, 'pooler_hidden_size', config.hidden_size)
        self.dense = nn.Linear(pooler_size, pooler_size)
        
        dropout_prob = getattr(config, 'pooler_dropout', config.hidden_dropout_prob)
        self.dropout = nn.Dropout(dropout_prob)
        
        self.activation = getattr(config, 'pooler_hidden_act', config.hidden_act)
        self.config = config

    def forward(self, hidden_states):
        context_token = hidden_states[:, 0] # CLS token
        context_token = self.dropout(context_token)
        pooled_output = self.dense(context_token)
        pooled_output = ACT2FN[self.activation](pooled_output)
        return pooled_output

class CustomModel(PreTrainedModel):
    def __init__(self, config):
        super().__init__(config)
        self.backbone = AutoModel.from_config(config)
        
        self.pooler = ContextPooler(config)

        pooler_output_dim = getattr(config, 'pooler_hidden_size', config.hidden_size)
        self.output = torch.nn.Linear(pooler_output_dim, 1) # Still predicting one label at a time. Kinda stupid

    def forward(
        self,
        input_ids,
        scaler,
        attention_mask=None,
        token_type_ids=None,
        position_ids=None,
        labels=None,
    ):
        outputs = self.backbone(
            input_ids,
            attention_mask=attention_mask,
            token_type_ids=token_type_ids,
            position_ids=position_ids,
        )

        pooled_output = self.pooler(outputs.last_hidden_state)
        
        # Final regression output
        regression_output = self.output(pooled_output)

        loss = None
        true_loss = None
        if labels is not None:
            loss_fn = torch.nn.MSELoss()

            unscaled_labels = scaler.inverse_transform(labels.cpu().numpy())
            unscaled_outputs = scaler.inverse_transform(regression_output.cpu().detach().numpy())
            
            loss = loss_fn(regression_output, labels)
            true_loss = mean_absolute_error(unscaled_outputs, unscaled_labels)

        return {
            "loss": loss,
            "logits": regression_output,
            "true_loss": true_loss
        }

In [None]:
BATCH_SIZE = 16

def tokenize_smiles(seq):
    seq = [tokenizer.cls_token + smiles for smiles in seq] # If we pass a string, tokenizer will smartly think we want to create a sequence for each symbol
    tokenized = tokenizer(seq, padding='max_length', truncation=True, max_length=512, return_tensors='pt')
    return tokenized

def load_model(path):
    config = AutoConfig.from_pretrained('/kaggle/input/smiles-deberta77m-tokenizer')
    model = CustomModel(config).cuda()
    checkpoint = torch.load(model_path)
    model.load_state_dict(checkpoint)
    return model


def make_predictions(model, scaler, smiles_seq):
    aggregated_preds = []
    for smiles in smiles_seq:
        smiles = [smiles]
        smiles_tokenized = tokenize_smiles(smiles)

        input_ids = smiles_tokenized['input_ids'].cuda()
        attention_mask = smiles_tokenized['attention_mask'].cuda()
        with torch.no_grad():
            preds = model(input_ids=input_ids, scaler=scaler, attention_mask=attention_mask)['logits'].cpu().numpy()
        
        true_preds = scaler.inverse_transform(preds).flatten()
        aggregated_preds.append(true_preds.tolist())
    return np.array(aggregated_preds)


test = pd.read_csv('/kaggle/input/neurips-open-polymer-prediction-2025/test.csv')
test_copy = test.copy()

smiles_test = test['SMILES'].to_list()

targets = ['Tg', 'FFV', 'Tc', 'Density', 'Rg']

scalers = joblib.load('/kaggle/input/smiles-bert-models/target_scalers.pkl')
tokenizer = AutoTokenizer.from_pretrained('/kaggle/input/smiles-deberta77m-tokenizer')

NameError: name 'test' is not defined

In [None]:
import pandas as pd
import numpy as np
from rdkit import Chem
import random
from typing import Optional, List, Union

def augment_smiles_dataset(df: pd.DataFrame,
                               smiles_column: str = 'SMILES',
                               augmentation_strategies: List[str] = ['enumeration', 'kekulize', 'stereo_enum'],
                               n_augmentations: int = 100,
                               preserve_original: bool = True,
                               random_seed: Optional[int] = None) -> pd.DataFrame:
    if random_seed is not None:
        random.seed(random_seed)
        np.random.seed(random_seed)
    
    def apply_augmentation_strategy(smiles: str, strategy: str) -> List[str]:
        try:
            mol = Chem.MolFromSmiles(smiles)
            if mol is None:
                return [smiles]
            
            augmented = []
            
            if strategy == 'enumeration':
                # Standard SMILES enumeration
                for _ in range(n_augmentations):
                    enum_smiles = Chem.MolToSmiles(mol, 
                                                 canonical=False, 
                                                 doRandom=True,
                                                 isomericSmiles=True)
                    augmented.append(enum_smiles)
            
            elif strategy == 'kekulize':
                # Kekulization variants
                try:
                    Chem.Kekulize(mol)
                    kek_smiles = Chem.MolToSmiles(mol, kekuleSmiles=True)
                    augmented.append(kek_smiles)
                except:
                    pass
            
            elif strategy == 'stereo_enum':
                # Stereochemistry enumeration
                for _ in range(n_augmentations // 2):
                    # Remove stereochemistry
                    Chem.RemoveStereochemistry(mol)
                    no_stereo = Chem.MolToSmiles(mol)
                    augmented.append(no_stereo)
            
            return list(set(augmented))  # Remove duplicates
            
        except Exception as e:
            print(f"Error in {strategy} for {smiles}: {e}")
            return [smiles]
    
    augmented_rows = []
    
    for idx, row in tqdm(df.iterrows(), total=len(df)):
        original_smiles = row[smiles_column]
        
        if preserve_original:
            original_row = row.to_dict()
            original_row['augmentation_strategy'] = 'original'
            original_row['is_augmented'] = False
            augmented_rows.append(original_row)
        
        for strategy in augmentation_strategies:
            strategy_smiles = apply_augmentation_strategy(original_smiles, strategy)
            
            for aug_smiles in strategy_smiles:
                if aug_smiles != original_smiles:
                    new_row = row.to_dict().copy()
                    new_row[smiles_column] = aug_smiles
                    new_row['augmentation_strategy'] = strategy
                    new_row['is_augmented'] = True
                    augmented_rows.append(new_row)
    
    augmented_df = pd.DataFrame(augmented_rows)
    augmented_df = augmented_df.reset_index(drop=True)
    
    print(f"Original size: {len(df)}, Augmented size: {len(augmented_df)}")
    print(f"Augmentation factor: {len(augmented_df) / len(df):.2f}x")
    
    return augmented_df

test = augment_smiles_dataset(test)

In [None]:
preds_mapping = {}

for i in tqdm(range(len(targets))):
    target = targets[i]
    scaler = scalers[i]

    model_path = f'/kaggle/input/smiles-bert-models/trained_smiles_model_{target}_target.pth' # Very sophisticated staff
    model = load_model(model_path)
    true_preds = []

    for i, data in test.groupby('id'):
        test_smiles = data['SMILES'].to_list()
        augmented_preds = make_predictions(model, scaler, test_smiles)
    
        average_pred = np.median(augmented_preds)
    
        true_preds.append(float(average_pred.flatten()[0]))

    preds_mapping[target] = true_preds

In [None]:
submission = pd.DataFrame(preds_mapping)
submission['id'] = test_copy['id']
submission.to_csv('submission.csv', index=False)

Training notebook: [here](https://www.kaggle.com/code/defdet/polymer-bert-train?scriptVersionId=246123151)

Training notebook: [here](https://www.kaggle.com/code/defdet/polymer-bert-train?scriptVersionId=246123151)