In [1]:
from langchain.text_splitter import Language
from langchain_community.document_loaders.generic import GenericLoader
from langchain_community.document_loaders.parsers import LanguageParser
import os

from langchain.vectorstores import FAISS
from langchain.document_loaders import PyPDFLoader
from langchain.chains.question_answering import load_qa_chain
from langchain.prompts import PromptTemplate
from langchain.memory import ConversationBufferMemory
from langchain.embeddings import HuggingFaceEmbeddings
from langchain.embeddings import HuggingFaceInstructEmbeddings
from langchain.chains import RetrievalQA
from langchain.document_loaders import UnstructuredFileLoader
from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain.chains import RetrievalQAWithSourcesChain
from huggingface_hub import notebook_login
#from transformers import pipeline
#from transformers import AutoTokenizer, AutoModelForCausalLM
from langchain import HuggingFacePipeline
from langchain.text_splitter import CharacterTextSplitter
import textwrap
import sys
import torch

from langchain.embeddings import OpenAIEmbeddings

In [2]:
import APIKEY
os.environ["OPENAI_API_KEY"] = APIKEY.API_KEY_SERVICE_OPENAI

## Load multiple `code` files with code paser

In [89]:

loader = GenericLoader.from_filesystem(
        "./docs/nuc100bsp_StdDriver_regs",
        glob="**/*",
        suffixes=[".h", ".c"],
        parser=LanguageParser(language=Language.C, parser_threshold=10000),
        #parser=LanguageParser(),
        show_progress=True,
    )
docs = loader.load()
len(docs)

  0%|          | 0/43 [00:00<?, ?it/s]

43

In [90]:
for document in docs:
    print(document.metadata)
#print("\n\n--8<--\n\n".join([document.page_content for document in docs]))  

{'source': 'docs\\nuc100bsp_StdDriver_regs\\NUC100Series\\Include\\NUC100Series.h'}
{'source': 'docs\\nuc100bsp_StdDriver_regs\\NUC100Series\\Include\\system_NUC100Series.h', 'language': <Language.C: 'c'>}
{'source': 'docs\\nuc100bsp_StdDriver_regs\\StdDriver\\inc\\acmp.h', 'language': <Language.C: 'c'>}
{'source': 'docs\\nuc100bsp_StdDriver_regs\\StdDriver\\inc\\adc.h', 'language': <Language.C: 'c'>}
{'source': 'docs\\nuc100bsp_StdDriver_regs\\StdDriver\\inc\\clk.h', 'language': <Language.C: 'c'>}
{'source': 'docs\\nuc100bsp_StdDriver_regs\\StdDriver\\inc\\crc.h', 'language': <Language.C: 'c'>}
{'source': 'docs\\nuc100bsp_StdDriver_regs\\StdDriver\\inc\\fmc.h', 'language': <Language.C: 'c'>}
{'source': 'docs\\nuc100bsp_StdDriver_regs\\StdDriver\\inc\\gpio.h', 'language': <Language.C: 'c'>}
{'source': 'docs\\nuc100bsp_StdDriver_regs\\StdDriver\\inc\\i2c.h', 'language': <Language.C: 'c'>}
{'source': 'docs\\nuc100bsp_StdDriver_regs\\StdDriver\\inc\\i2s.h', 'language': <Language.C: 'c'>}


In [91]:
from langchain.text_splitter import RecursiveCharacterTextSplitter
C_splitter = RecursiveCharacterTextSplitter.from_language(language=Language.C, chunk_size=2000, chunk_overlap=200)
texts = C_splitter.split_documents(docs)
len(texts)

879

In [92]:
print(texts[0].page_content)
print(texts[0])

/**************************************************************************//**
 * @file     NUC100Series.h
 * @version  V3.0
 * $Revision: 33 $
 * $Date: 17/05/26 10:54a $
 * @brief    NUC100 Series Peripheral Access Layer Header File
 *
 * @note
 * SPDX-License-Identifier: Apache-2.0
 *
 * Copyright (C) 2014 Nuvoton Technology Corp. All rights reserved.
 *
 ******************************************************************************/



/**
  \mainpage Introduction
  *
  *
  * This user manual describes the usage of NUC100 Series MCU device driver
  *
  * <b>Disclaimer</b>
  *
  * The Software is furnished "AS IS", without warranty as to performance or results, and
  * the entire risk as to performance or results is assumed by YOU. Nuvoton disclaims all
  * warranties, express, implied or otherwise, with regard to the Software, its use, or
  * operation, including without limitation any and all warranties of merchantability, fitness
  * for a particular purpose, and non-infringemen

## Load a single PDF file

In [52]:
# PyPdf + rapidocr-onnxruntime
from langchain_community.document_loaders import PyPDFLoader
loader = PyPDFLoader("./docs/TRM_NUC100_120(DN)_Series_EN_V1.04.pdf", extract_images=True)

# UnstructuredFileLoader
#loader = UnstructuredFileLoader("./docs/en-us--TRM_M463_M467_Series_EN_Rev1.01.pdf")

documents = loader.load()



In [53]:
text_splitter=CharacterTextSplitter(separator='\n', chunk_size=3000, chunk_overlap=200)
text_doc=text_splitter.split_documents(documents)

## Clip the Table of contents

In [59]:
#print(text_doc[0].page_content)
print(text_doc[22]) # Check the content

page_content='Table 6 -19 LIN Header Selection in Master mode  ................................ ................................ ....... 373 \nTable 6 -20 UART Interrupt Sources and Flags Table In DMA Mode  ................................ ..........  400 \nTable 6 -21 UART Interrupt Sources and Flags Table In Software Mode  ................................ .... 400 \nTable 6 -22 Timer2/Timer1/Timer0 Operation Mode  ................................ ................................ .... 421 \nTable 6 -23 I2C Status Code Description Table  ................................ ................................ ............  470 \n \n  DUVOTOn' metadata={'source': './docs/TRM_NUC100_120(DN)_Series_EN_V1.04.pdf', 'page': 12}


In [60]:
print("Before: {}".format(len(text_doc)))
text_doc_1 = text_doc[23:]
print("After: {}".format(len(text_doc_1)))

Before: 624
After: 601


## Use tabula to load pdf table

In [62]:
import tabula
from tabula import read_pdf
from tabulate import tabulate

DOC_FILE_NAME = r"./docs/TRM_NUC100_120(DN)_Series_EN_V1.04.pdf"
# convert PDF into CSV
#tabula.convert_into("en-us--DS_NAU8822A_DataSheet_EN_Rev3.5.pdf", "NAU8822A_table.json", output_format="json", pages='3')
dfs = read_pdf(DOC_FILE_NAME, pages='13-592', stream=True)
print(len(dfs))
 


940


In [63]:
str_list = [df.to_csv(path_or_buf=None, index=False) for df in dfs]


In [65]:
print(str_list[0])

Unnamed: 0,Unnamed: 1,Unnamed: 2,2,Unnamed: 3,Unnamed: 4,2.1,Unnamed: 5
Product Line,UART,SPI,I C,USB,PS/2,I S,SC
NUC100xxxDN,3,4,2,-,1,1,3
NUC120xxxDN,3,4,2,1,1,1,3



In [66]:
import tqdm
import time
from langchain_core.documents import Document

save_path = r'D:\nu_QA_data'
#save_name = r'TRM_M2354_and_StdDriver_C_Regs_bge_s'
#save_name = r'm460_StdDriver_Regs_openai'
save_name = r'TRM_NUC100_NUC200_table_0308'

table_docs = [
    Document(page_content=s, metadata={'source': save_name})
    for i, s in enumerate(str_list)
]

# load embedding model
print("===== Load the embedding model =====")
embedding=OpenAIEmbeddings()

# Create vectors store
print("===== Build FAISS =====")

# old way
vectorstore=FAISS.from_documents(table_docs, embedding)
print("1 doc done")


index_path = os.path.join(save_path, save_name)
print(index_path)
vectorstore.save_local(index_path)

===== Load the embedding model =====
===== Build FAISS =====
1 doc done
D:\nu_QA_data\TRM_NUC100_NUC200_table_0308


## Calculate the tokens

In [15]:
import tiktoken

def num_tokens_from_string(string: str, encoding_name: str) -> int:
    """Returns the number of tokens in a text string."""
    encoding = tiktoken.get_encoding(encoding_name)
    num_tokens = len(encoding.encode(string))
    return num_tokens

num_tokens = 0
for text_ele in text_doc_1:
    num_tokens += num_tokens_from_string("tiktoken is great!", "cl100k_base")
print(num_tokens) 

NameError: name 'text_doc_1' is not defined

## Load the second doc 

In [16]:
loader2 = UnstructuredFileLoader("./docs/M031AE_v1_svd.txt")
documents2 = loader2.load()
#text_splitter=CharacterTextSplitter(separator='\n', chunk_size=2000, chunk_overlap=200)
text_splitter=CharacterTextSplitter(separator='\n', chunk_size=3000, chunk_overlap=200) # custom separator

text_doc2=text_splitter.split_documents(documents2)
len(text_doc2)

893

In [17]:
print(text_doc2[2]) # Check the content

page_content="<name>1</name>\n<description>Clock cycles delay Enabled</description>\n<value>#1</value>\n</enumeratedValue>\n</enumeratedValues>\n<access>read-write</access>\n</field>\n<field>\n<name>PDWKIEN</name>\n<description>Power-down Mode Wake-up Interrupt Enable Bit (Write Protect)\\nNote 1: The interrupt will occur when both PDWKIF and PDWKIEN are high.\\nNote 2: This bit is write protected. Refer to the SYS_REGLCTL register.</description>\n<bitOffset>5</bitOffset>\n<bitWidth>1</bitWidth>\n<enumeratedValues>\n<enumeratedValue>\n<name>0</name>\n<description>Power-down mode wake-up interrupt Disabled</description>\n<value>#0</value>\n</enumeratedValue>\n<enumeratedValue>\n<name>1</name>\n<description>Power-down mode wake-up interrupt Enabled</description>\n<value>#1</value>\n</enumeratedValue>\n</enumeratedValues>\n<access>read-write</access>\n</field>\n<field>\n<name>PDWKIF</name>\n<description>Power-down Mode Wake-up Interrupt Status\\nSet by 'Power-down wake-up event', it indic

### Combine 2 different types

In [18]:
texts = text_doc_1 + text_doc2

In [19]:
len(texts)

14082

In [17]:
print(texts[7547].page_content)
print(texts[7547])

NuMicro ISP Flow And Command Set
Introduction:
Most of modern consumer products has the capability to upgrade its firmware code running on internal microcontroller. With this feature, the product is able to continuously support new functions after it is made and released to end customer. Nuvoton provides a ISP (In-System Programming) method to update the flash code of the NuMicro series Flash-memory-based microcontrollers.
The NuMicro ISP code is resident in LDROM (Loader ROM), it supports different I/O interfaces, including USB, UART, I2C, SPI, RS485, CAN to program or update the application code into internal APROM (Application ROM). This is a very convenient way for developer or end user to update application code of a NuMicro chip that was mounted on PCB (Printed Circuit Board).
This document describes the ISP code flow and the usage of ISP commands.
tools, and supporting http://www.nuvoton.com/
The relative datasheet, Technical Reference Manual, BSP (Board Support Package) sof

## Save the texts into FAISS in disk

In [18]:
import tqdm
import time

save_path = r'D:\nu_QA_data'
#save_name = r'TRM_M2354_and_StdDriver_C_Regs_bge_s'
save_name = r'TRM_M031_M032'
#save_name = r'TRM_NUC100_NUC200_pypdf_0308'

# load embedding model
print("===== Load the embedding model =====")
#embedding = HuggingFaceEmbeddings(model_name='sentence-transformers/all-MiniLM-L6-v2',model_kwargs={'device': 'cpu'})
#embedding = HuggingFaceInstructEmbeddings(model_name='BAAI/bge-base-en-v1.5', model_kwargs={'device': 'cpu'})
#embedding = HuggingFaceInstructEmbeddings(model_name='BAAI/bge-small-en')
embedding=OpenAIEmbeddings()

# Create vectors store
print("===== Build FAISS =====")

# old way
vectorstore=FAISS.from_documents(text_doc2, embedding)
print("1 doc done")
#vectorstore.add_documents(text_doc_1)
#print("2 doc done")

# new way
#def chunks(lst, n):
#  """Yield successive n-sized chunks from lst."""
#  for i in range(0, len(lst), n):
#    yield lst[i:i + n]
#
#text_chunks = chunks(texts, 1000) # adjust 20 based on your average character count per line
#vectorstore = None
#for (index, chunk) in tqdm.tqdm(enumerate(text_chunks)):
#  if index == 0:
#    vectorstore = FAISS.from_documents(chunk, embedding)
#  else:
#    time.sleep(60) # wait for a minute to not exceed any rate limits
#    vectorstore.add_documents(chunk)


index_path = os.path.join(save_path, save_name)
print(index_path)
vectorstore.save_local(index_path)

===== Load the embedding model =====
===== Build FAISS =====
1 doc done
D:\nu_QA_data\TRM_M031_M032


## Create VB only by chosen files

In [22]:
import re
import shutil



def find_files_with_pattern(directory, patterns, dst):
    matching_path = []
    
    for root, dirs, files in os.walk(directory):
        for file in files:
            file_path = os.path.join(root, file)
            
            for pattern in patterns:
                # Check if the file name matches the pattern
                if re.search(pattern, file, re.IGNORECASE):
                    matching_path.append(file_path)
                    shutil.copy(file_path, dst)
    
    return matching_path

def find_folders_with_pattern(directory, patterns, dst):
    matching_path = []
    
    for dirname in os.listdir(directory):
        
        for pattern in patterns:
            # Check if the file name matches the pattern
            if re.search(pattern, dirname, re.IGNORECASE):
                dir_loc = os.path.join(directory, dirname)
                #print(dir_loc, dirname)
                matching_path.append(dir_loc)
                shutil.copytree(dir_loc, os.path.join(dst, dirname), dirs_exist_ok=True)
            
    return matching_path

# Example usage:
directory_path = "./m251bsp"
#file_pattern = [r"acmp", r"dac"]
file_pattern = [r"uart", r"pdma", r"crc"]
dst = r"D:\nu_QA_data\m251bsp_partial_source"

if os.path.isdir(dst):
    print("Remove the old ones!")
    #os.rmdir(dst)
    shutil.rmtree(dst) 
else:
    print("First create!")
os.mkdir(dst)    

# Find the StdDriver
matching_files = find_files_with_pattern(os.path.join(directory_path, 'StdDriver'), file_pattern, dst)

# Find the SampleCode\StdDriver
matching_files += find_folders_with_pattern(os.path.join(directory_path, 'SampleCode', 'StdDriver'), file_pattern, dst)


print(matching_files)


Remove the old ones!
['./m251bsp\\StdDriver\\inc\\crc.h', './m251bsp\\StdDriver\\inc\\pdma.h', './m251bsp\\StdDriver\\inc\\scuart.h', './m251bsp\\StdDriver\\inc\\uart.h', './m251bsp\\StdDriver\\inc\\usci_uart.h', './m251bsp\\StdDriver\\src\\crc.c', './m251bsp\\StdDriver\\src\\pdma.c', './m251bsp\\StdDriver\\src\\scuart.c', './m251bsp\\StdDriver\\src\\uart.c', './m251bsp\\StdDriver\\src\\usci_uart.c', './m251bsp\\SampleCode\\StdDriver\\CRC_CCITT', './m251bsp\\SampleCode\\StdDriver\\CRC_CRC32', './m251bsp\\SampleCode\\StdDriver\\CRC_CRC8', './m251bsp\\SampleCode\\StdDriver\\DAC_PDMA_TimerTrigger', './m251bsp\\SampleCode\\StdDriver\\EADC_PDMA_BPWM_Trigger', './m251bsp\\SampleCode\\StdDriver\\EADC_PDMA_PWM_Trigger', './m251bsp\\SampleCode\\StdDriver\\FMC_CRC32', './m251bsp\\SampleCode\\StdDriver\\I2C_PDMA_TRX', './m251bsp\\SampleCode\\StdDriver\\PDMA_BasicMode', './m251bsp\\SampleCode\\StdDriver\\PDMA_ScatterGather', './m251bsp\\SampleCode\\StdDriver\\PDMA_ScatterGather_PingPongBuffer', '.

In [23]:

loader = GenericLoader.from_filesystem(
        dst,
        glob="**/*",
        suffixes=[".h", ".c"],
        parser=LanguageParser(language=Language.C, parser_threshold=10000),
        #parser=LanguageParser(),
        show_progress=True,
    )
docs = loader.load()
len(docs)

  0%|          | 0/50 [00:00<?, ?it/s]

50

In [24]:
for document in docs:
    print(document.metadata)

{'source': 'D:\\nu_QA_data\\m251bsp_partial_source\\crc.c', 'language': <Language.C: 'c'>}
{'source': 'D:\\nu_QA_data\\m251bsp_partial_source\\crc.h', 'language': <Language.C: 'c'>}
{'source': 'D:\\nu_QA_data\\m251bsp_partial_source\\pdma.c', 'language': <Language.C: 'c'>}
{'source': 'D:\\nu_QA_data\\m251bsp_partial_source\\pdma.h', 'language': <Language.C: 'c'>}
{'source': 'D:\\nu_QA_data\\m251bsp_partial_source\\scuart.c', 'language': <Language.C: 'c'>}
{'source': 'D:\\nu_QA_data\\m251bsp_partial_source\\scuart.h', 'language': <Language.C: 'c'>}
{'source': 'D:\\nu_QA_data\\m251bsp_partial_source\\uart.c', 'language': <Language.C: 'c'>}
{'source': 'D:\\nu_QA_data\\m251bsp_partial_source\\uart.h', 'language': <Language.C: 'c'>}
{'source': 'D:\\nu_QA_data\\m251bsp_partial_source\\usci_uart.c', 'language': <Language.C: 'c'>}
{'source': 'D:\\nu_QA_data\\m251bsp_partial_source\\usci_uart.h', 'language': <Language.C: 'c'>}
{'source': 'D:\\nu_QA_data\\m251bsp_partial_source\\CRC_CCITT\\main.

In [25]:
from langchain.text_splitter import RecursiveCharacterTextSplitter
C_splitter = RecursiveCharacterTextSplitter.from_language(language=Language.C, chunk_size=1000, chunk_overlap=100)
texts = C_splitter.split_documents(docs)
len(texts)

935

In [26]:
save_path = r'D:\nu_QA_data'
save_name = r'm251bsp_partial'

# load embedding model
print("===== Load the embedding model =====")
embeddings = HuggingFaceEmbeddings(model_name='sentence-transformers/all-MiniLM-L6-v2',model_kwargs={'device': 'cpu'})

# Create vectors store
print("===== Build FAISS =====")
vectorstore=FAISS.from_documents(texts, embeddings)

index_path = os.path.join(save_path, save_name)
print(index_path)
vectorstore.save_local(index_path)

===== Load the embedding model =====
===== Build FAISS =====
D:\nu_QA_data\m251bsp_partial


# LanceDB

In [16]:
import lancedb
from langchain.vectorstores import LanceDB

save_path = r'D:\nu_QA_data\lanceDB'
#save_name = r'TRM_M2354_and_StdDriver_C_Regs_bge_s'
#save_name = r'm460_StdDriver_Regs_openai'
save_name = r'TRM_m460_pypdf'

# load embedding model
print("===== Load the embedding model =====")
embedding=OpenAIEmbeddings()

# Create vectors store
print("===== Build LanceDB =====")

db = lancedb.connect(save_path)
table = db.create_table(save_name, data=[
    {"vector": embedding.embed_query("Hello World"), "text": "Hello World", "id": "1"}
], mode="overwrite")
docsearch = LanceDB.from_documents(text_doc, embedding, connection=table)


===== Load the embedding model =====
===== Build LanceDB =====
