In [None]:
!pip install bitsandbytes==0.45.0
!pip install langchain-community==0.3.13
!pip install beautifulsoup4==4.12.3 chromadb==0.5.23 gradio==5.9.1
!pip -qq install langchain==0.3.13
!pip install sentence-transformers==3.3.1
!pip install pymupdf==1.25.1
!pip fitz

In [None]:
from langchain.chains import LLMChain
from langchain.embeddings import HuggingFaceEmbeddings
from langchain.llms import HuggingFacePipeline
from transformers import LlamaForCausalLM, LlamaTokenizer, pipeline
import torch
from typing import List
import gradio as gr
import os
import pickle
from google.colab import userdata
from google.colab import drive
from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain_community.cross_encoders import HuggingFaceCrossEncoder
from langchain_core.prompts import PromptTemplate

userdata.get('HF_TOKEN')
drive.mount('/content/drive')

from transformers import LlamaForCausalLM, AutoTokenizer, pipeline

model_path = "meta-llama/Meta-Llama-3.1-8B-Instruct"
tokenizer = AutoTokenizer.from_pretrained(model_path)
model = LlamaForCausalLM.from_pretrained(model_path, load_in_8bit=True, device_map="auto")

llm_pipeline = pipeline("text-generation", model=model, tokenizer=tokenizer, max_new_tokens=4096, do_sample=False)
llm = HuggingFacePipeline(pipeline=llm_pipeline)


In [None]:
from langchain_community.embeddings import HuggingFaceEmbeddings
from google.colab import userdata
from google.colab import drive

eng_embed = "BAAI/bge-large-en-v1.5"

embed_model = HuggingFaceEmbeddings(
    model_name = eng_embed,
    model_kwargs = {'device': 'cpu'},
    encode_kwargs = {'normalize_embeddings': True},
)

In [None]:
import os
from bs4 import BeautifulSoup as bs
from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain_community.document_loaders import WebBaseLoader
from langchain_community.vectorstores import Chroma
from langchain_community.document_loaders import PyMuPDFLoader
from langchain.retrievers import BM25Retriever, EnsembleRetriever
import chromadb
from chromadb.utils import embedding_functions
from chromadb.config import Settings

localDir = "/content/drive/MyDrive/Colab Notebooks/RAG_Doc/"
pdf_files = [
    {"file": "DOJ_Topkins_ENG.pdf", "metadata": {"Reason": "Price Fixing", "Keyword": "Algorithmic Pricing", "Company": "Topkins", "year": "2015"}},
    {"file": "DOJ_Realpage_ENG.pdf", "metadata": {"Reason": "Price Fixing", "Keyword": "Algorithmic Pricing", "Company": "Realpage", "year": "2024"}},
    {"file": "EU_Google_ENG.pdf", "metadata": {"Reason": "Self Preferencing", "Keyword": "Search engine",  "Company": "Google", "year": "2017"}},
    {"file": "FTC_Amazon_ENG.pdf", "metadata": {"Reason": "Self Preferencing", "Keyword": "Anti-discounting",  "Company": "Amazon", "year": "2023"}},
    {"file": "KFTC_KakaoMobility_ENG.pdf", "metadata": {"Reason": "Self Preferencing", "Keyword": "Franchise",  "Company": "Kakao Mobility", "year": "2023"}},
    {"file": "KFTC_Naver_ENG.pdf", "metadata": {"Reason": "Self Preferencing", "Keyword": "Search engine",  "Company": "Naver", "year": "2020"}},
    {"file": "KFTC_Coupang_ENG.pdf", "metadata": {"Reason": "Self Preferencing", "Keyword": "Private Label Product / Private Brand Product",  "Company": "Coupang", "year": "2020"}},
    {"file": "KFTC_NexonKorea_ENG.pdf", "metadata": {"Reason": "Probability Manipulation", "Keyword": "Loot box",  "Company": "Nexon Korea", "year": "2024"}}
]


def read_pdf_with_metadata(file_path, metadata):
    loader = PyMuPDFLoader(file_path)
    documents = loader.load()

    for doc in documents:
        doc.metadata = metadata
    return documents

all_documents=[]

for pdf in pdf_files:
    pdf_path = os.path.join(localDir, pdf["file"])
    docs = read_pdf_with_metadata(pdf_path, pdf["metadata"])
    all_documents.extend(docs)

text_splitter = RecursiveCharacterTextSplitter(chunk_size=3000, chunk_overlap=200)
splits = text_splitter.split_documents(all_documents)


DB_PATH = "/content/drive/MyDrive/Colab Notebooks/ChromaDB_ENG_3000"

vectorstore = Chroma.from_documents(
    splits, embedding=embed_model, persist_directory=DB_PATH, collection_name="RAG_DB"
)

vectorstore.persist()

In [None]:
from langchain_community.vectorstores import Chroma
import chromadb
from chromadb.utils import embedding_functions
from chromadb.config import Settings
from langchain.retrievers import BM25Retriever, EnsembleRetriever

DB_PATH = "/content/drive/MyDrive/Colab Notebooks/ChromaDB_ENG_3000"

vectorstore = Chroma(
    persist_directory=DB_PATH,
    embedding_function=embed_model,
    collection_name="RAG_DB",
)

chroma_retriever = vectorstore.as_retriever(search_kwargs = {
    "k": 20
}
)

In [None]:
from langchain.retrievers import ContextualCompressionRetriever
from langchain.retrievers.document_compressors import CrossEncoderReranker
from langchain_community.cross_encoders import HuggingFaceCrossEncoder

reranker = HuggingFaceCrossEncoder(model_name="BAAI/bge-reranker-large")

compressor = CrossEncoderReranker(model=reranker, top_n=20)

compression_retriever = ContextualCompressionRetriever(
    base_compressor=compressor, base_retriever=chroma_retriever
)

In [None]:
rag_template = """
<|im_start|>SYSTEM<|im_sep|>
You are a helpful assistant working with the competition authority. Your primary responsibility is to analyze source code based solely on the provided instructions and context. Ensure that your analysis is detailed, accurate, and directly linked to the context. Avoid generating assumptions or conclusions not explicitly supported by the provided context and instructions.
You only consider competition laws, not personal information laws, security vulnerabilities, or issues unrelated to competition.
Specifically, avoid analyzing or commenting on code related to security vulnerabilities or data protection mechanisms.
All analysis results must be output as a structured JSON report with detailed explanations and no additional text.

Important:
1. If the source code depends on external data to determine its legality, clearly state this limitation and set `"anti-competitive_likelihood"` to `"Low"`.
2. Fair randomization algorithms, such as those that equitably and randomly select items with weights, must not be flagged as anti-competitive. Explicitly state that no violations are detected in such cases.

<|im_end|>
<|im_start|>USER<|im_sep|>
The following contexts are part of cases where companies were sanctioned for using anti-competitive algorithms.
Context: {context}

Note: Use only the provided context for analysis. Do not infer or assume additional information beyond the context provided.

Question:
# analyze_report is a detailed analysis of the target source code.
input_code = {code}

def analyze_source_code(input_code: str, context: str) -> dict:

    # Step 1: Summarize the source code.
    summary = summarize_source_code(input_code)
    if not summary:
        summary = "The source code does not provide enough information to determine its functionality."

    # Step 2: Analyze the code to detect patterns potentially violating competition laws.
    suspicious_patterns = detect_patterns(input_code, context)

    # Step 3: Exclude certain cases:
    explanation = ""
    if requires_external_data(input_code):
        anti_competitive_likelihood = "Low"
        explanation = "The source code requires external data to determine its legality."
    elif is_fair_randomization(input_code):
        anti_competitive_likelihood = "Low"
        explanation = "The source code implements fair randomization methods, which are not anti-competitive."
    else:
        anti_competitive_likelihood = calculate_violation_probability(suspicious_patterns)

    # Step 4: Generate a structured JSON report with detailed explanations for all findings.
    result = {{
        "summary": summary,
        "detected_patterns": [
          {{
              "pattern": suspicious_patterns.get("code_snippet", "Not Found"),
              "similar_case": suspicious_patterns.get("similar_case", "Not Found"),
              "explanation": explanation
          }}
        ],
        "anti-competitive_likelihood": anti_competitive_likelihood
    }}

    return json.dumps(result, indent=4)

>>> analyze_source_code(input_code, context)
※ You must print the JSON only.
Answer:<|im_end|><|im_start|>assistant<|im_sep|>
"""

rag_prompt = PromptTemplate.from_template(rag_template)
rag_chain = rag_prompt | llm


normal_template = """
<|begin_of_text|>
<|start_header_id|>SYSTEM<|end_header_id|>
You are a helpful assistant working with the competition authority. Your task is to verify the validity of an analysis report and provide recommendations for further investigation. Use the provided source code and the first analysis report to:
1. Identify any potential hallucinations in the analysis.
2. Assess the likelihood of legal violations.
3. Recommend additional data that should be collected for a more thorough investigation.

Avoid analyzing or commenting on issues related to security vulnerabilities or personal data protection. Focus solely on competition-related concerns.

All results must be output as a structured JSON report with detailed explanations and no additional text.
If the provided information is insufficient to make a judgment, explicitly state: "The provided context and source code do not provide enough evidence to determine a violation," and set "violation_possibility" to "Low" in the JSON response.
<|eot_id|>
<|start_header_id|>USER<|end_header_id|>

First_Analysis_Report =
{report}

input_code =
{code}

Instructions:
def verify_analysis(report: dict, code: str) -> dict:
    # Step 1: Verify the validity of the "detected_patterns" in the first analysis report.
    validation_result = []

    for pattern in report.get("detected_patterns", []):
        if is_supported_by_code(pattern, code):
            # Lower hallucination detection threshold by allowing partial or contextual matches.
            if verify_contextual_match(pattern["pattern"], code):
                validation_result.append({{
                    "pattern": pattern["pattern"],
                    "status": "Valid",
                    "similar_case": pattern.get("similar_case", "Not Provided")
                }})
            else:
                validation_result.append({{
                    "pattern": pattern["pattern"],
                    "status": "Invalid",
                    "similar_case": "Not Applicable"
                }})

    # Step 2: Assess the likelihood of legal violations based on valid patterns only.
    valid_patterns = [p for p in validation_result if p["status"] == "Valid"]
    if valid_patterns:
        anti-competitive_likelihood = reassess_violation(valid_patterns, report["anti-competitive_likelihood"])
        # Use only the labels "High", "Low" and provide a clear justification for the assigned label.
    else:
        anti-competitive_likelihood = "Low"  # Default to "Low" if no valid patterns are found.

    # Step 3: Recommend additional data to collect.
    recommendation_to_collect = recommend_additional_data(valid_patterns)
    # Suggest specific data that could provide more evidence for or against the detected patterns.

    # Step 4: Generate a structured JSON report with detailed explanations for all findings.
    result = {{
        "summary": generate_summary(report, code),
        # Provide a comprehensive but concise summary of the analyze_report’s functionality and its potential anti-competitive implications.

        "detected_patterns": validation_result,
        # Include only valid patterns and their validation status, along with the original similar_case information.

        "anti-competitive_likelihood": anti-competitive_likelihood,
        # "High" or "Low"

        "recommendation_to_collect": recommendation_to_collect
        # Provide actionable and specific recommendations for further data collection or steps for investigation. Avoid generic suggestions.
    }}

    return json.dumps(result, indent=4)

>>> verify_analysis(First_Analysis_Report, input_code)
※ You must print the JSON only.
Answer:<|eot_id|><|start_header_id|>ASSISTANT<|end_header_id|>
"""

normal_prompt = PromptTemplate.from_template(normal_template)
normal_chain = normal_prompt | llm

In [None]:
def predict(input_code):

  docs = compression_retriever.invoke(input_code)
  print("================================================")
  response1 = rag_chain.invoke({"context": docs, "code": input_code})
  response1_result = response1.split("Answer:<|eot_id|><|start_header_id|>ASSISTANT<|end_header_id|>")[-1].strip()

  response2 = normal_chain.invoke({"report": response1_result ,"code": input_code})
  report = response2.split("Answer:<|eot_id|><|start_header_id|>ASSISTANT<|end_header_id|>")[-1].strip()
  print("================================================")

  return str(report)


# Set up Gradio interface
def gradio_interface(query):
    return predict(query)

demo = gr.Interface(fn=gradio_interface, inputs="text", outputs="text", title="Anti-Competitive Algorithm Detector", description="Enter a query to check if the source code contains anti-competitive behavior.")

demo.launch()
