## build openai llm model

In [1]:
import dotenv
from langchain_openai import ChatOpenAI

dotenv.load_dotenv()

chat_model = ChatOpenAI(model="gpt-3.5-turbo")

## simple test

In [None]:
from langchain.schema.messages import HumanMessage, SystemMessage

messages = [
    SystemMessage(
        content="""你是一個資安專家，根據CNS16190消費者物聯網之網宇安全：基準要求事項，回答相關問題"""
    ),
    HumanMessage(content="詳述CNS16190的適用範圍"),
]

response = chat_model.invoke(messages)
response

## test with template

### create a template

In [2]:
from langchain.prompts import (
    ChatPromptTemplate,
    HumanMessagePromptTemplate,
    PromptTemplate,
    SystemMessagePromptTemplate,
)

review_system_template_str = """你是一個{context}，根據CNS16190消費者物聯網之網宇安全：基準要求事項，回答相關問題"""

review_system_prompt = SystemMessagePromptTemplate(
    prompt=PromptTemplate(
        input_variables=["context"], template=review_system_template_str
    )
)

review_human_prompt = HumanMessagePromptTemplate(
    prompt=PromptTemplate(input_variables=["question"], template="{question}")
)

messages = [review_system_prompt, review_human_prompt]
review_prompt_template = ChatPromptTemplate(
    input_variables=["context", "question"],
    messages=messages,
)
context = "資安專家"
question = "簡述CNS16190的適用範圍"

review_prompt_template.format_messages(context=context, question=question)


[SystemMessage(content='你是一個資安專家，根據CNS16190消費者物聯網之網宇安全：基準要求事項，回答相關問題'),
 HumanMessage(content='簡述CNS16190的適用範圍')]

### test inference with openai llm

In [None]:
review_chain = (review_prompt_template | chat_model).with_config(
    {"run_name": "comparison_openai_hf"}
)

In [None]:
context = "資安專家"
question = "CNS16190可以用在哪裡？"
result = review_chain.invoke({"context": context, "question": question})

In [None]:
print(f'result.content: {result.content}\n')

print('\nresponse_metadata\n')
for key, value in result.response_metadata.items():
    print(f'{key}: {value}')

print('\n\nusage_metadata\n')
for key, value in result.usage_metadata.items():
    print(f'{key}: {value}')

## retrieve docs

### (Method 1) MarkdownHeaderTextSplitter

In [None]:
import os

from langchain.text_splitter import (
    MarkdownHeaderTextSplitter,
    RecursiveCharacterTextSplitter,
)
from langchain_huggingface import HuggingFaceEmbeddings

with open("../docs/CNS16190-zh_TW_only_provision.md", "r", encoding="utf-8") as f:
    data = f.read()

headers_to_split_on = [
    ("#", "Header 1"),
    ("##", "Header 2"),
    ("###", "Header 3"),
]

md_splits = MarkdownHeaderTextSplitter(
    headers_to_split_on=headers_to_split_on, strip_headers=True
).split_text(data)

text_splitter = RecursiveCharacterTextSplitter(
    chunk_size=40,
    chunk_overlap=10,
    separators=[
        "\n\n",
        "\n",
        " ",
        "。",
        "，",
        "\u200b",  # Zero-width space
        "\uff0c",  # Fullwidth comma
        "\u3001",  # Ideographic comma
        "\uff0e",  # Fullwidth full stop
        "\u3002",  # Ideographic full stop
        "",
    ],
)
chunked_documents = text_splitter.split_documents(md_splits)
chunked_documents

### (Method 2) 

In [15]:
from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain_community.document_loaders import UnstructuredMarkdownLoader


loader = UnstructuredMarkdownLoader(
    "../docs/CNS16190-zh_TW_only_provision.md"
)  # method 2

# loader = UnstructuredMarkdownLoader(
#     "../docs/CNS16190-zh_TW_only_provision.md", mode="elements"
# )  # method 2

docs = loader.load()

text_splitter = RecursiveCharacterTextSplitter(chunk_size=1000, chunk_overlap=200)

chunked_documents = text_splitter.split_documents(docs)
print(f"length {len(chunked_documents)}")
chunked_documents


length 18


[Document(page_content='中華民國國家標準 CNS\n\n消費者物聯網之網宇安全：基準要求事項 Cybersecurity for consumer internet of things: Baseline requirements\n\nCNS16190:2023\n\n4. 報告實作\n\n本標準中控制措施之實作，係藉由風險評鑑及威脅建模(諸如CNS27005[27]及STRIDE威脅模型[28])所告知；此係由裝置製造者及/或其他相關個體所執行，但非屬本標準範圍。對某些使用案例及下列風險評鑑，可適切應用額外控制措施及本標準內之控制措施。\n\n本標準設定安全基準；然而，由於消費者IoT之廣闊前景，控制措施的適用性取決於各裝置。本標準透過使用非必備宜使用控制措施(建議)，提供一定程度之彈性。\n\n控制措施4-1：對本標準中視為不適用或消費者IoT裝置未滿足之各項建議，應記錄衡量理由。\n\n表B.1提供以結構化方式記錄此等衡量理由之綱要。此係容許其他利害相關者(例：保證評鑑者、供應鏈成員、安全研究者或零售商)判定是否正確且適切適用控制措施。\n\n例1.製造者於其網站上與產品說明一起發布表B.1之完整版本。\n\n例2.製造者填寫表B.1以保存內部紀錄。一段時間後，外部保證組織依本標準評鑑產品，並請求提供與產品安全設計相關之資訊。製造者可易於提供此等資訊，因其包含於表B.1中。\n\n消費者IoT裝置不適用或未執行控制措施之情況包括：\n\n當裝置係受限制裝置時，某些安全措施之實作，對已識別風險(安全或隱私)係不可能或不適切。\n\n未納入控制措施中所描述之功能性(例：僅提供資料而未要求鑑別的裝置)。\n\n例3.電池壽命有限之窗戶感測器於觸發時，經由遠端相關聯服務發送警示，並經由集線器控制。因相較於其他消費者IoT裝置，其電池壽命及處理能力有限，故其為受限制裝置。此外，由於使用者經由集線器控制裝置，使用者無需使用通行碼或其他鑑別機制直接鑑別裝置。\n\n5. 消費者IoT裝置之網宇安全控制措施\n\n5.1 禁止使用通用預設通行碼\n\n控制措施5.1-1：使用通行碼且處於原廠預設值外之任何狀態時，所有消費者IoT裝置通行碼應為每裝置唯一或由使用者所定義。', metadata={'source': '../docs/CNS16190-

### embed via openai

In [16]:
from langchain_community.vectorstores import Qdrant
from langchain_openai import OpenAIEmbeddings

reviews_vector_db = Qdrant.from_documents(
    chunked_documents,
    OpenAIEmbeddings(),
    location=":memory:",
)

### test search results of vector database 

In [24]:
# ## 控制措施4-1：對本標準中視為不適用或消費者IoT裝置未滿足之各項建議，應記錄衡量理由。

# 表B.1提供以結構化方式記錄此等衡量理由之綱要。此係容許其他利害相關者(例：保證評鑑者、供應鏈成員、安全研究者或零售商)判定是否正確且適切適用控制措施。

# 例1.製造者於其網站上與產品說明一起發布表B.1之完整版本。

# 例2.製造者填寫表B.1以保存內部紀錄。一段時間後，外部保證組織依本標準評鑑產品，並請求提供與產品安全設計相關之資訊。製造者可易於提供此等資訊，因其包含於表B.1中。

# 消費者IoT裝置不適用或未執行控制措施之情況包括：

# - 當裝置係受限制裝置時，某些安全措施之實作，對已識別風險(安全或隱私)係不可能或不適切。
# - 未納入控制措施中所描述之功能性(例：僅提供資料而未要求鑑別的裝置)。

# 例3.電池壽命有限之窗戶感測器於觸發時，經由遠端相關聯服務發送警示，並經由集線器控制。因相較於其他消費者IoT裝置，其電池壽命及處理能力有限，故其為受限制裝置。此外，由於使用者經由集線器控制裝置，使用者無需使用通行碼或其他鑑別機制直接鑑別裝置。

question = """「對本標準中視為不適用或消費者IoT裝置未滿足之各項建議，應記錄衡量理由」符合哪一項控制措施？"""

print('similarity\n')
relevant_docs = reviews_vector_db.search(question, search_type='similarity',k=5)

for item in relevant_docs:
    print(f'{item}\n')

print('\n\nsimilarity_score_threshold\n')
relevant_docs = reviews_vector_db.search(question, search_type='similarity_score_threshold',k=5, score_threshold=0.7)

for item in relevant_docs:
    print(f'{item}\n')

print('\n\nmmr\n')
relevant_docs = reviews_vector_db.search(question, search_type='mmr',k=5)

for item in relevant_docs:
    print(f'{item}\n')

similarity

page_content='例：外部感測能力可為光學或聲學感測器。本標準第6節包含特定於保護個人資料之控制措施。\n\n5.9 使系統對中斷具韌性\n\n本節之各控制措施旨在確保隨著消費者生活之各層面(包括與人身設備安全相關的功能)採用IoT裝置日增，IoT服務保持正常運行。重要的是需注意能適用安全相關之法規，但關鍵為避免使中斷成為對使用者產生影響的原因，並設計能於對此等挑戰有一定程度韌性之產品及服務。\n\n控制措施5.9-1：宜於消費者IoT裝置與服務中建立韌性，考量資料網路與電源中斷之可能性。\n\n控制措施5.9-2：消費者IoT裝置於失去網路接取權之情況下，宜保持運作及本地功能性，且宜於斷電復原的情況下澈底復原。\n\n備考：〝澈底復原〞通常涉及以相同或改善之狀態復原連接性及功能性。\n\n控制措施5.9-3：消費者IoT裝置宜以預期可操作且穩定之狀態，並以有秩序的方式連接至網路，同時考量基礎設施之能力。\n\n例 1.智慧家庭於停電後失去對網際網路之連接。當網路連接回復時，家中之裝置於隨機延遲後重新連接，以最小化網路利用率。\n\n例 2.於提供更新後，製造者分批通知裝置，以防止其同時下載更新。\n\n消費者依賴IoT系統及裝置處理日趨重要之使用案例，此等使用案例可能與人身設備安全相關或衝擊生命。若網路中斷，則保持服務於本地運行係提高韌性的措施之一。其他措施可包括於相關聯服務中建構備援，以及減緩分散式阻絕服務(DDoS)攻擊或信令風暴，其可能係由中斷後裝置之大規模重新連接所引起。預期必要之韌性水準係成正比，且由使用情況所判定，以及考量依賴系統、服務或裝置之其他者，因中斷可能產生較預期更廣的影響。\n\n有秩序重新連接意指採取明確步驟之方式，以避免源自大量IoT裝置的同時請求，諸如軟體更新或重新連接。此種明確步驟可包括依遞增後退機制，於重新連接嘗試前引入隨機延遲。\n\n5.10 檢查系統遙測資料\n\n控制措施5.10-1：若蒐集由消費者IoT裝置及服務蒐集遙測資料(諸如使用及量測資料)，則宜檢查安全是否異常。\n\n例 1.安全異常可藉由裝置正常行為的偏差表示之，依監視指示符所表示者，例：不成功登入嘗試的異常增加。\n\n例 2.源自多個裝置之遙測，容許製造者注意由於無效的軟體更新真確性核對而導致更新不成功。\n\n對安全評估，

### embed via sentence-transformer

In [None]:
sentence_transformer_model_root = "../sentence_transformer_model"
sentence_transformer_model = "multi-qa-mpnet-base-dot-v1"

reviews_vector_db = Qdrant.from_documents(
    chunked_documents,
    HuggingFaceEmbeddings(
        model_name=os.path.join(
            sentence_transformer_model_root, sentence_transformer_model
        )
    ),
    location=":memory:",
)


In [None]:
question = """「對本標準中視為不適用或消費者IoT裝置未滿足之各項建議，應記錄衡量理由」是控制措施哪一條？"""

print('similarity\n')
relevant_docs = reviews_vector_db.search(question, search_type='similarity',k=5)

for item in relevant_docs:
    print(f'{item}\n')

print('\n\nsimilarity_score_threshold\n')
relevant_docs = reviews_vector_db.search(question, search_type='similarity_score_threshold',k=5, score_threshold=0.7)

for item in relevant_docs:
    print(f'{item}\n')

print('\n\nmnr\n')
relevant_docs = reviews_vector_db.search(question, search_type='mmr',k=5)

for item in relevant_docs:
    print(f'{item}\n')

## ask llm with rag

In [17]:
from langchain.schema.runnable import RunnablePassthrough

reviews_retriever = reviews_vector_db.as_retriever(k=5)

In [18]:
context = "資安專家"
question = "簡述CNS16190的適用範圍"

review_prompt_template.format_messages(context=context, question=question)

[SystemMessage(content='你是一個資安專家，根據CNS16190消費者物聯網之網宇安全：基準要求事項，回答相關問題'),
 HumanMessage(content='簡述CNS16190的適用範圍')]

### invoke openai llm model

In [19]:
review_chain = (
    {"context": reviews_retriever, "question": RunnablePassthrough()}
    | review_prompt_template
    | chat_model
).with_config({"run_name": "comparison_openai_hf"})

### invoke other hf model

In [None]:
import torch
from langchain_huggingface import HuggingFacePipeline
from transformers import (
    AutoModelForCausalLM,
    AutoTokenizer,
    BitsAndBytesConfig,
    TextStreamer,
    pipeline,
)

model_name = "MediaTek-Research/Breeze-7B-32k-Instruct-v1_0"

tokenizer = AutoTokenizer.from_pretrained(
    model_name, cache_dir="../llm_model/", trust_remote_code=True
)
streamer = TextStreamer(tokenizer=tokenizer, skip_prompt=True)

model = AutoModelForCausalLM.from_pretrained(
    model_name,
    cache_dir="../llm_model/",
    device_map="auto",
    low_cpu_mem_usage=True,  # try to limit RAM
    quantization_config=BitsAndBytesConfig(
        load_in_4bit=True,
        bnb_4bit_use_double_quant=True,
        bnb_4bit_quant_type="nf4",
        bnb_4bit_compute_dtype=torch.bfloat16,
    ),  # load model in low precision to save memory
    # attn_implementation="flash_attention_2",
)

pipe = pipeline(
    model=model,
    tokenizer=tokenizer,
    task="text-generation",
    return_full_text=True,
    max_new_tokens=2048,
    streamer=streamer,
)

hfPipeline = HuggingFacePipeline(pipeline=pipe)

In [None]:
review_chain = (
    {"context": reviews_retriever, "question": RunnablePassthrough()}
    | review_prompt_template
    | hfPipeline
).with_config({"run_name": "comparison_openai_hf"})


### ask

In [20]:
question = """「對本標準中視為不適用或消費者IoT裝置未滿足之各項建議，應記錄衡量理由」符合哪一項控制措施？"""

In [21]:
final_result = review_chain.invoke(question)

In [22]:
final_result

AIMessage(content='這是符合「控制措施5.8-2」的要求。根據這項控制措施，對於本標準中視為不適用或消費者IoT裝置未滿足的各項建議，應記錄衡量理由。這有助於確保對於不適用或未滿足標準的情況有清晰的記錄和理由。', response_metadata={'token_usage': {'completion_tokens': 141, 'prompt_tokens': 5037, 'total_tokens': 5178}, 'model_name': 'gpt-3.5-turbo-0125', 'system_fingerprint': None, 'finish_reason': 'stop', 'logprobs': None}, id='run-9d4692a5-3d83-4aaa-9899-f19055304c14-0', usage_metadata={'input_tokens': 5037, 'output_tokens': 141, 'total_tokens': 5178})