# Preparing the environment
Loading models and tokenizers for CLAP and calculating SHA-256 hash of CLAP model

In [1]:
import torch.multiprocessing
import torch
import torch.nn.functional as F  
import json
from transformers import AutoModel, AutoTokenizer


device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

asm_tokenizer       = AutoTokenizer.from_pretrained("hustcw/clap-asm", trust_remote_code=True)
text_tokenizer      = AutoTokenizer.from_pretrained("hustcw/clap-text", trust_remote_code=True)
asm_encoder         = AutoModel.from_pretrained("hustcw/clap-asm", trust_remote_code=True).to(device)
text_encoder        = AutoModel.from_pretrained("hustcw/clap-text", trust_remote_code=True).to(device)

bubble_output       = "./CaseStudy/bubblesort.json"
malware_output      = "./CaseStudy/malware.json"
sha3              = "./CaseStudy/sha3.json"

  from .autonotebook import tqdm as notebook_tqdm


# 抗干扰的代码建模验证

#### Cos Sim

In [2]:
# 读取LinuxBinary.json文件  
with open("LinuxBinary.json") as fp:  
    data = json.load(fp)  

results = []

for data_item in data:
    result = {}
    # 获取第一个数组元素的“files”值  
    files = data_item["files"]  
    result['cve'] = data_item["folder_name"]
    result['results'] = []
    
    # 存储所有嵌入和相关信息  
    embeddings = {}  
    info = []  
    
    # 遍历“files”数组  
    for file_info in files:  
        state = file_info["state"]  
        interference = file_info.get("interference")  
        asm_value = file_info["asm"]  
    
        # 获取asm的嵌入表示  
        with torch.no_grad():  
            asm_input = asm_tokenizer([asm_value], padding=True, pad_to_multiple_of=8, return_tensors="pt", verbose=False)  
            asm_input = asm_input.to(device)  
            embedding = asm_encoder(**asm_input)  
            embedding_norm = F.normalize(embedding, dim=-1)  # L2归一化  
    
        # 存储嵌入和相关信息  
        embeddings[(state, interference)] = embedding_norm.cpu().numpy()  # 存储为numpy数组以便后续比较  
        info.append({  
            "state": state,  
            "interference": interference,  
            "asm": asm_value  # 这里只是保留原始asm值，实际上可能不需要在最终结果中打印  
        })  
    
    # 分离“interference”为None的项，并分别获取“state”为before和after的嵌入  
    none_interference_embeddings = {  
        "before": None,  
        "after": None  
    }  
    for info_item in info:  
        if info_item["interference"] == "none":  
            state = info_item["state"]  
            none_interference_embeddings[state] = embeddings[(state, "none")]  
    
    # 计算余弦相似度并确定“pred_state”和“success”  
    for info_item in info:  
        if info_item["interference"] != 'none':  
            state = info_item["state"]  
            embedding_norm_np = embeddings[(state, info_item["interference"])].squeeze()  
    
            # 计算与“interference”为None项的余弦相似度  
            cosine_sim_before = F.cosine_similarity(  
                torch.tensor(embedding_norm_np, dtype=torch.float32).to(device),  
                torch.tensor(none_interference_embeddings["before"], dtype=torch.float32).to(device)  
            ).item()  
            cosine_sim_after = F.cosine_similarity(  
                torch.tensor(embedding_norm_np, dtype=torch.float32).to(device),  
                torch.tensor(none_interference_embeddings["after"], dtype=torch.float32).to(device)  
            ).item()  
    
            # 确定“pred_state”  
            pred_state = "after" if cosine_sim_after > cosine_sim_before else "before"  
    
            # 确定“success”  
            success = "yes" if pred_state == state else "no"  
    
            # 添加到结果列表  
            info_item.update({  
                
            })  
            result['results'].append(
                {   
                    "interference": info_item["interference"],  
                    "state": info_item["state"],
                    "cosine_sim_before": cosine_sim_before,  
                    "cosine_sim_after": cosine_sim_after,  
                    "pred_state": pred_state,  
                    "success": success  
                }
            )
    results.append(result)
    # 打印结果表格  
    print("-" * 100)  
    print(f"CVE: {result['cve']}")
    print(f"{'Interference':<20} {'State':<10} {'Cosine Sim Before':<18} {'Cosine Sim After':<18} {'Pred State':<10} {'Success':<5}")  
    print("-" * 100)  
    for item in result['results']:  
        if item["interference"] != "none":  
            print(f"{item['interference']:<20} {item['state']:<10} {item['cosine_sim_before']:<18.6f} {item['cosine_sim_after']:<18.6f} {item['pred_state']:<10} {item['success']:<5}")

            
with open("Results_cos.json", "w") as f:
    json.dump(results, f, indent=4)

----------------------------------------------------------------------------------------------------
CVE: CVE-2024-47673
Interference         State      Cosine Sim Before  Cosine Sim After   Pred State Success
----------------------------------------------------------------------------------------------------
32                   after      0.756176           0.823715           after      yes  
arch64               after      0.764591           0.807831           after      yes  
clang                after      0.933223           0.991611           after      yes  
Os                   after      0.939576           1.000000           after      yes  
32                   before     0.761585           0.700692           before     yes  
arch64               before     0.697337           0.678633           before     yes  
clang                before     0.989663           0.935799           before     yes  
Os                   before     1.000000           0.939576           before    

#### Euc Dist

In [10]:
# 读取LinuxBinary.json文件  
with open("LinuxBinary.json") as fp:  
    data = json.load(fp)  

results = []

with open("LinuxBinary.json") as fp:
    data = json.load(fp)
 
results = []
 
for data_item in data:
    result = {}
    # 获取第一个数组元素的“files”值
    files = data_item["files"]
    result['cve'] = data_item["folder_name"]
    result['results'] = []
    
    # 存储所有嵌入和相关信息
    embeddings = {}
    info = []
    
    # 遍历“files”数组
    for file_info in files:
        state = file_info["state"]
        interference = file_info.get("interference")
        asm_value = file_info["asm"]
    
        # 获取asm的嵌入表示
        with torch.no_grad():
            asm_input = asm_tokenizer([asm_value], padding=True, pad_to_multiple_of=8, return_tensors="pt", verbose=False)
            asm_input = asm_input.to(device)
            embedding = asm_encoder(**asm_input)
            embedding_norm = F.normalize(embedding, dim=-1)  # L2归一化，但这里我们后续会用原始嵌入计算欧几里得距离
            # 注意：为了计算欧几里得距离，我们实际上不需要归一化，但保留归一化步骤也无妨，因为它不会改变距离排序（只要两个向量都归一化了）
    
        # 存储嵌入和相关信息
        # 注意：这里我们存储原始的嵌入，而不是归一化后的，因为欧几里得距离是基于原始值的
        embeddings[(state, interference)] = embedding.cpu().numpy()  # 存储为numpy数组以便后续比较
        info.append({
            "state": state,
            "interference": interference,
            "asm": asm_value  # 这里只是保留原始asm值，实际上可能不需要在最终结果中打印
        })
    
    # 分离“interference”为None的项，并分别获取“state”为before和after的嵌入
    none_interference_embeddings = {
        "before": None,
        "after": None
    }
    for info_item in info:
        if info_item["interference"] == "none":
            state = info_item["state"]
            none_interference_embeddings[state] = embeddings[(state, "none")]
    
    # 计算欧几里得距离并确定“pred_state”和“success”
    for info_item in info:
        if info_item["interference"] != 'none':
            state = info_item["state"]
            embedding_np = embeddings[(state, info_item["interference"])]
        
            # 将numpy数组转换为torch张量以计算欧几里得距离
            embedding_tensor = torch.tensor(embedding_np, dtype=torch.float32).to(device)
            
            # 计算与“interference”为None项的欧几里得距离
            dist_before = F.pairwise_distance(
                embedding_tensor,
                torch.tensor(none_interference_embeddings["before"], dtype=torch.float32).to(device)
            ).item()
            dist_after = F.pairwise_distance(
                embedding_tensor,
                torch.tensor(none_interference_embeddings["after"], dtype=torch.float32).to(device)
            ).item()
            
            # 确定“pred_state”
            pred_state = "after" if dist_after < dist_before else "before"
            
            # 确定“success”
            success = "yes" if pred_state == state else "no"
            
            # 添加到结果列表
            result['results'].append(
                {
                    "interference": info_item["interference"],
                    "state": info_item["state"],
                    "dist_before": dist_before,
                    "dist_after": dist_after,
                    "pred_state": pred_state,
                    "success": success
                }
            )
    results.append(result)
    # 打印结果表格  
    print("-" * 100)  
    print(f"CVE: {result['cve']}")
    print(f"{'Interference':<20} {'State':<10} {'Euc Dist Before':<18} {'Euc Dist After':<18} {'Pred State':<10} {'Success':<5}")  
    print("-" * 100)  
    for item in result['results']:  
        if item["interference"] != "none":  
            print(f"{item['interference']:<20} {item['state']:<10} {item['dist_before']:<18.6f} {item['dist_after']:<18.6f} {item['pred_state']:<10} {item['success']:<5}")

            
with open("Results_euc.json", "w") as f:
    json.dump(results, f, indent=4)


----------------------------------------------------------------------------------------------------
CVE: CVE-2024-47673
Interference         State      Euc Dist Before    Euc Dist After     Pred State Success
----------------------------------------------------------------------------------------------------
32                   after      0.698318           0.593776           after      yes  
arch64               after      0.686162           0.619950           after      yes  
clang                after      0.365452           0.129529           after      yes  
Os                   after      0.347633           0.000028           after      yes  
32                   before     0.690528           0.773703           before     yes  
arch64               before     0.778027           0.801707           before     yes  
clang                before     0.143786           0.358333           before     yes  
Os                   before     0.000028           0.347633           before    

#### Man Dist

In [12]:
with open("LinuxBinary.json") as fp:
    data = json.load(fp)
 
results = []
 
for data_item in data:
    result = {}
    # 获取第一个数组元素的“files”值
    files = data_item["files"]
    result['cve'] = data_item["folder_name"]
    result['results'] = []
    
    # 存储所有嵌入和相关信息
    embeddings = {}
    info = []
    
    # 遍历“files”数组
    for file_info in files:
        state = file_info["state"]
        interference = file_info.get("interference")
        asm_value = file_info["asm"]
    
        # 获取asm的嵌入表示
        with torch.no_grad():
            asm_input = asm_tokenizer([asm_value], padding=True, pad_to_multiple_of=8, return_tensors="pt", verbose=False)
            asm_input = asm_input.to(device)
            embedding = asm_encoder(**asm_input)
            embedding_norm = F.normalize(embedding, dim=-1)  # L2归一化，但这里我们后续会用原始嵌入计算曼哈顿距离
        
        # 存储嵌入和相关信息
        embeddings[(state, interference)] = embedding.cpu().numpy()  # 存储为numpy数组以便后续比较
        info.append({
            "state": state,
            "interference": interference,
            "asm": asm_value  # 这里只是保留原始asm值，实际上可能不需要在最终结果中打印
        })
    
    # 分离“interference”为None的项，并分别获取“state”为before和after的嵌入
    none_interference_embeddings = {
        "before": None,
        "after": None
    }
    for info_item in info:
        if info_item["interference"] == "none":
            state = info_item["state"]
            none_interference_embeddings[state] = embeddings[(state, "none")]
    
    # 计算曼哈顿距离并确定“pred_state”和“success”
    for info_item in info:
        if info_item["interference"] != 'none':
            state = info_item["state"]
            embedding_np = embeddings[(state, info_item["interference"])]
        
            # 将numpy数组转换为torch张量以计算曼哈顿距离
            embedding_tensor = torch.tensor(embedding_np, dtype=torch.float32).to(device)
            
            # 计算与“interference”为None项的曼哈顿距离
            dist_before = F.pairwise_distance(
                embedding_tensor,
                torch.tensor(none_interference_embeddings["before"], dtype=torch.float32).to(device),
                p=1  # 修改为曼哈顿距离
            ).item()
            dist_after = F.pairwise_distance(
                embedding_tensor,
                torch.tensor(none_interference_embeddings["after"], dtype=torch.float32).to(device),
                p=1  # 修改为曼哈顿距离
            ).item()
            
            # 确定“pred_state”
            pred_state = "after" if dist_after < dist_before else "before"
            
            # 确定“success”
            success = "yes" if pred_state == state else "no"
            
            # 添加到结果列表
            result['results'].append(
                {
                    "interference": info_item["interference"],
                    "state": info_item["state"],
                    "dist_before": dist_before,
                    "dist_after": dist_after,
                    "pred_state": pred_state,
                    "success": success
                }
            )
    results.append(result)
    # 打印结果表格
    print("-" * 100)
    print(f"CVE: {result['cve']}")
    print(f"{'Interference':<20} {'State':<10} {'Manhattan Dist Before':<22} {'Manhattan Dist After':<22} {'Pred State':<10} {'Success':<5}")
    print("-" * 100)
    for item in result['results']:
        if item["interference"] != "none":
            print(f"{item['interference']:<20} {item['state']:<10} {item['dist_before']:<22.6f} {item['dist_after']:<22.6f} {item['pred_state']:<10} {item['success']:<5}")
 
# 将结果保存到Results_dot.json
with open("Results_man.json", "w") as f:
    json.dump(results, f, indent=4)

----------------------------------------------------------------------------------------------------
CVE: CVE-2024-47673
Interference         State      Manhattan Dist Before  Manhattan Dist After   Pred State Success
----------------------------------------------------------------------------------------------------
32                   after      15.096949              12.823579              after      yes  
arch64               after      14.976973              13.278171              after      yes  
clang                after      8.006180               2.817078               after      yes  
Os                   after      7.588204               0.000768               after      yes  
32                   before     14.947784              16.752163              before     yes  
arch64               before     17.000275              17.244213              before     yes  
clang                before     3.081063               7.790029               before     yes  
Os              

## 结果统计

In [10]:
import json

# 读取 JSON 文件
with open('Results_cos.json', 'r') as file:
    data = json.load(file)

# 初始化字典来存储每个 state 的成功和总计数
state_counts = {}
interference_counts = {}
total_counts = {'success': 0, 'total': 0}
# 遍历数据
for item in data:
    for result in item['results']:
        state = result['state']
        interference = result['interference']
        success = result['success'] == 'yes'  # 将 success 转换为布尔值
        
        # 确保数据结构存在
        if state not in state_counts:
            state_counts[state] = {'success': 0, 'total': 0}
        if interference not in interference_counts:
            interference_counts[interference] = {'success': 0, 'total': 0}
        
        # 更新计数
        total_counts['success'] += 1 if success else 0
        total_counts['total'] += 1

        state_counts[state]['success'] += 1 if success else 0
        state_counts[state]['total'] += 1

        interference_counts[interference]['success'] += 1 if success else 0
        interference_counts[interference]['total'] += 1

# 计算并输出每个 state 的总成功率
success_ratio = total_counts['success'] / total_counts['total'] if total_counts['total'] > 0 else 0
print(f"Total Success Ratio: {success_ratio:.2%}")
print()
for state, counts in state_counts.items():
    success_ratio = counts['success'] / counts['total'] if counts['total'] > 0 else 0
    print(f"State: {state}, Total Success Ratio: {success_ratio:.2%}")
print()
for interference, counts in interference_counts.items():
    success_ratio = counts['success'] / counts['total'] if counts['total'] > 0 else 0
    print(f"Interference: {interference}, Total Success Ratio: {success_ratio:.2%}")

Total Success Ratio: 61.76%

State: after, Total Success Ratio: 57.35%
State: before, Total Success Ratio: 66.18%

Interference: 32, Total Success Ratio: 58.82%
Interference: arch64, Total Success Ratio: 58.82%
Interference: clang, Total Success Ratio: 61.76%
Interference: Os, Total Success Ratio: 67.65%


# Fine-grained sorting algorithm classification (Zero-Shot)

In [2]:

with open(bubble_output) as fp:
    asm = json.load(fp)

prompts = [
    "This is a function related to bubble sort ",
    "This is a function related to selection sort",
    "This is a function related to insertion sort",
    "This is a function related to merge sort",
    "This is a function related to quick sort",
    "This is a function related to radix sort",
    "This is a function related to shell sort",
    "This is a function related to counting sort",
    "This is a function related to bucket sort",
    "This is a function related to heap sort",
]

with torch.no_grad():
    asm_input = asm_tokenizer([asm], padding=True, pad_to_multiple_of=8, return_tensors="pt", verbose=False)
    asm_input = asm_input.to(device)
    asm_embedding = asm_encoder(**asm_input)

with torch.no_grad():
    text_input = text_tokenizer(prompts, padding=True, truncation=True, return_tensors='pt')
    text_input = text_input.to(device)
    text_embeddings = text_encoder(**text_input)

logits = torch.einsum("nc,ck->nk", [asm_embedding, text_embeddings.T])
_, preds = torch.max(logits, dim=1)
preds = torch.softmax(logits / 0.07, dim=1).squeeze(0).tolist()

print("bubblesort zeroshot:")
for i in range(len(prompts)):
    print(f"Probability: {round(preds[i]*100, 3)}%, Text: {prompts[i]}")


input_ids: torch.Size([1, 72])
attention_mask: torch.Size([1, 72])
token_type_ids: torch.Size([1, 72])
asm_embedding shape: torch.Size([1, 768])
bubblesort zeroshot:
Probability: 18.425%, Text: This is a function related to bubble sort 
Probability: 6.845%, Text: This is a function related to selection sort
Probability: 11.032%, Text: This is a function related to insertion sort
Probability: 5.169%, Text: This is a function related to merge sort
Probability: 9.403%, Text: This is a function related to quick sort
Probability: 13.112%, Text: This is a function related to radix sort
Probability: 12.292%, Text: This is a function related to shell sort
Probability: 10.073%, Text: This is a function related to counting sort
Probability: 9.4%, Text: This is a function related to bucket sort
Probability: 4.249%, Text: This is a function related to heap sort


# Fine-grained malware functionality classification (Zero-Shot)

In [10]:
with open(malware_output) as fp:
    asm = json.load(fp)

prompts = [
    "This is a function related to screen shot",
    "This is a function related to auto start",
    "This is a function related to backdoor",
    "This is a function related to download",
    "This is a function related to upload",
    "This is a function related to rootkit",
    "This is a function related to anti detect",
    "This is a function related to anti debug",
    "This is a function related to passwords brute force",
    "This is a function related to file hijack",
]

with torch.no_grad():
    asm_input = asm_tokenizer([asm], padding=True, pad_to_multiple_of=8, return_tensors="pt", verbose=False)
    asm_input = asm_input.to(device)
    # 打印每个张量的形状
    for key, value in asm_input.items():
        print(f"{key}: {value.shape}")
    asm_embedding = asm_encoder(**asm_input)
    print(f"asm_embedding shape: {asm_embedding.shape}")

with torch.no_grad():
    text_input = text_tokenizer(prompts, padding=True, truncation=True, return_tensors='pt')
    text_input = text_input.to(device)
    text_embeddings = text_encoder(**text_input)

logits = torch.einsum("nc,ck->nk", [asm_embedding, text_embeddings.T])
_, preds = torch.max(logits, dim=1)
preds = torch.softmax(logits / 0.07, dim=1).squeeze(0).tolist()

print("malware zeroshot:")
for i in range(len(prompts)):
    print(f"Probability: {round(preds[i]*100, 3)}%, Text: {prompts[i]}")


input_ids: torch.Size([1, 1024])
attention_mask: torch.Size([1, 1024])
token_type_ids: torch.Size([1, 1024])
asm_embedding shape: torch.Size([1, 768])
malware zeroshot:
Probability: 78.529%, Text: This is a function related to screen shot
Probability: 6.667%, Text: This is a function related to auto start
Probability: 1.386%, Text: This is a function related to backdoor
Probability: 1.536%, Text: This is a function related to download
Probability: 2.315%, Text: This is a function related to upload
Probability: 2.946%, Text: This is a function related to rootkit
Probability: 1.304%, Text: This is a function related to anti detect
Probability: 2.994%, Text: This is a function related to anti debug
Probability: 0.829%, Text: This is a function related to passwords brute force
Probability: 1.495%, Text: This is a function related to file hijack


# Fine-grained crypto algorithm classification (Zero-Shot)

In [4]:
with open(sha3) as fp:
    asm = json.load(fp)

prompts = [
    "This is a function related to sha3",
    "This is a function related to des",
    "This is a function related to bubble sort",
    "This is a function related to md5",
    "This is a function related to rsa",
    "This is a function related to sm4"
]

with torch.no_grad():
    asm_input = asm_tokenizer([asm], padding=True, pad_to_multiple_of=8, return_tensors="pt", verbose=False)
    asm_input = asm_input.to(device)
    asm_embedding = asm_encoder(**asm_input)

with torch.no_grad():
    text_input = text_tokenizer(prompts, padding=True, truncation=True, return_tensors='pt')
    text_input = text_input.to(device)
    text_embeddings = text_encoder(**text_input)

logits = torch.einsum("nc,ck->nk", [asm_embedding, text_embeddings.T])
_, preds = torch.max(logits, dim=1)
preds = torch.softmax(logits / 0.07, dim=1).squeeze(0).tolist()

print("sha3 zeroshot:")
for i in range(len(prompts)):
    print(f"Probability: {round(preds[i]*100, 3)}%, Text: {prompts[i]}")

sha3 zeroshot:
Probability: 62.579%, Text: This is a function related to sha3
Probability: 1.63%, Text: This is a function related to des
Probability: 3.479%, Text: This is a function related to bubble sort
Probability: 24.634%, Text: This is a function related to md5
Probability: 5.705%, Text: This is a function related to rsa
Probability: 1.974%, Text: This is a function related to sm4
