In [1]:
import os
import json
import base64
import logging
import pandas as pd
from dotenv import load_dotenv
from sentence_transformers import SentenceTransformer
from langchain_openai import OpenAIEmbeddings

from build_embeddings import build_embeddings, get_file_embeddings
from search import get_total_files, query_top_files, query_top_files_specter, get_common_files_with_avg_score, get_unique_files


  from tqdm.autonotebook import tqdm, trange


In [3]:
# Load environment variables
load_dotenv()
# Initialize embeddings and model
embeddings = OpenAIEmbeddings(openai_api_key=os.getenv('OPENAI_API_KEY'))
model = SentenceTransformer('sentence-transformers/allenai-specter', device='cpu')


# Load OWASP data

In [3]:
# Load OWASP data
owasp_df = pd.read_csv('OWASP Controls - Application Security.csv')
owasp_df = owasp_df[~owasp_df['req_description'].str.contains('\[DELETED')]
owasp_df['req_description'] = owasp_df['req_description'].str.replace(r'\s*\([^)]*\)', '', regex=True)


In [4]:
# Decode JSON object
def decode_json_object(array):
    files = {}
    for file_name, file_content in array.items():
        if file_name.endswith(('.py', '.sh', '.java', '.php', '.js', '.htm', '.html', '.vue')):
            string_base64 = array[file_name]['content']
            decodedBytes = base64.b64decode(string_base64)
            files[file_name] = decodedBytes.decode("utf-8")
    return files

In [5]:
# Main processing function
def background_code_matching(repo_files, repo_id):
    try:
        repo_id = str(repo_id)
        section_result = {}
        build_embeddings(repo_files, repo_id)
        
        for section in owasp_df['section_name'].unique():
            reqs_list = list(owasp_df[owasp_df['section_name'] == section]['req_description'])
            req_str = ' '.join(reqs_list)
            query = req_str
            depth = get_total_files(repo_id)
            results_ada = query_top_files(query, depth, repo_id)
            results_specter = query_top_files_specter(query, depth, repo_id)
            
            common_files_with_avg_score = get_common_files_with_avg_score(results_ada, results_specter)
            unique_model = get_unique_files(results_ada, results_specter)
            result_dict = {
                'common_files': common_files_with_avg_score,
                'only_one_model': unique_model
            }
            section_result[section] = result_dict
        
        # Save the section_result dictionary to a .json file
        with open(f'section_result_{repo_id}.json', 'w') as json_file:
            json.dump(section_result, json_file, indent=4)
        
        print(f"Results saved to section_result_{repo_id}.json")
        return section_result
    except Exception as e:
        logging.error(f"Task failed: {e}")
        raise


In [6]:
# Process the JSON file
def process_json_file(file_path):
    try:
        with open(file_path, 'r') as json_file:
            repo_data = json.load(json_file)

        decoded_files = decode_json_object(repo_data)
        
        # Save decoded content to separate files
        output_dir = "decoded_files"
        os.makedirs(output_dir, exist_ok=True)
        
        for file_name, content in decoded_files.items():
            output_path = os.path.join(output_dir, file_name)
            with open(output_path, 'w') as output_file:
                output_file.write(content)
                logging.info(f"Decoded content saved to {output_path}")

        return decoded_files
    except Exception as e:
        logging.error(f"Error processing JSON file: {e}")
        return None

In [7]:
json_file_path = "output 2.json" 
decoded_content = process_json_file(json_file_path)
