**Class Extraction**

In [None]:
import ast  # Module for parsing Python source into its Abstract Syntax Tree (AST)
import csv  # Module for reading and writing CSV files
import os   # Module for interacting with the operating system (directory traversal)

# Function to extract all class definitions from a single Python file
def extract_classes_from_file(file_path):
    # Open the file and parse it into an AST
    with open(file_path, "r", encoding="utf-8") as f:
        tree = ast.parse(f.read(), filename=file_path)

    classes = []  # List to hold extracted class info

    # Re-read the file to get the full content for code extraction
    with open(file_path, "r", encoding="utf-8") as f:
        file_content = f.read()

    # Walk through each node in the AST to find class definitions
    for node in ast.walk(tree):
        if isinstance(node, ast.ClassDef):  # Check if the node is a class definition
            class_name = node.name  # Get the class name
            class_code = ast.get_source_segment(file_content, node)  # Extract full source code for the class
            classes.append((class_name, class_code, file_path))  # Add a tuple with class info to the list

    return classes  # Return list of extracted classes

# Function to recursively extract classes from all Python files in a given directory
def extract_classes_from_project(directory):
    all_classes = []  # List to store all extracted classes from all files
    for root, _, files in os.walk(directory):  # Traverse the directory tree
        for file in files:
            if file.endswith(".py"):  # Check if the file is a Python source file
                file_path = os.path.join(root, file)  # Construct the full file path
                all_classes.extend(extract_classes_from_file(file_path))  # Extract and append classes from this file
    return all_classes  # Return all extracted classes

# Function to save extracted class data to a CSV file
def save_to_csv(data, output_file):
    with open(output_file, "w", encoding="utf-8", newline='') as f:
        writer = csv.writer(f)  # Create a CSV writer
        writer.writerow(["ClassName", "Code", "File"])  # Write header row
        writer.writerows(data)  # Write all class data rows

# === Main Execution ===

# Path to the root directory of the project to scan
project_directory = ""  # <- Replace with your target directory path
output_csv = "extracted_classes_case_study.csv"  # Output CSV file name

# Extract classes from the project directory
extracted_classes = extract_classes_from_project(project_directory)

# Save extracted data to CSV
save_to_csv(extracted_classes, output_csv)

# Print confirmation
print(f"Extracted classes saved to {output_csv}")

# **First Vertical Decomposition:**

***Groq*** refers to the Groq API client, which is used to access LLMs (Large Language Models) hosted on Groq's ultra-fast inference engine.


***dotenv*** is used to load environment variables from a .env file into the system's environment so they can be accessed via os.environ

In [None]:
!pip install dotenv groq

Collecting dotenv
  Downloading dotenv-0.9.9-py2.py3-none-any.whl.metadata (279 bytes)
Collecting groq
  Downloading groq-0.24.0-py3-none-any.whl.metadata (15 kB)
Collecting python-dotenv (from dotenv)
  Downloading python_dotenv-1.1.0-py3-none-any.whl.metadata (24 kB)
Downloading dotenv-0.9.9-py2.py3-none-any.whl (1.9 kB)
Downloading groq-0.24.0-py3-none-any.whl (127 kB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m127.5/127.5 kB[0m [31m4.5 MB/s[0m eta [36m0:00:00[0m
[?25hDownloading python_dotenv-1.1.0-py3-none-any.whl (20 kB)
Installing collected packages: python-dotenv, dotenv, groq
Successfully installed dotenv-0.9.9 groq-0.24.0 python-dotenv-1.1.0


Classify the classes into User, Logic and Data layers

# **Model:** llama-3.1-8b-instant

In [None]:
# Import necessary libraries
import pandas as pd  # For handling CSV and DataFrames
import re  # For regular expression operations (used to extract the category number)
from dotenv import load_dotenv  # To load environment variables from a .env file
from groq import Groq  # Groq client to interact with the LLM API
import os  # For operating system-related operations like environment variable access

# Load environment variables from a .env file (e.g., API keys)
load_dotenv()

# Set the Groq API key (make sure to replace with your actual key or load it securely)
os.environ["GROQ_API_KEY"] = ""

# Initialize the Groq LLM client with the API key
groq = Groq(api_key=os.environ["GROQ_API_KEY"])

# Define a mapping from numeric category values to human-readable architectural layer names
category_map = {
    "1": "User Layer",
    "2": "Logic Layer",
    "3": "Data Layer"
}

def classify_with_llm(class_name, class_code, class_file):
    """
    Sends the class code to the LLM for classification into one of the three architectural layers.
    Returns the layer number (as a string) or "Unclassified" if classification fails.
    """
    # Prompt for LLM with detailed instructions and class metadata
    prompt = f'''
    You are an AI assistant specialized in software architecture analysis of Python applications. Your task is to examine the following class definition and assign it to the most appropriate architectural layer in the system based on its responsibilities.

    There are three architectural layers:

    1. **User Layer**: Responsible for managing user interaction. Classes in this layer handle incoming requests, interface with the logic layer, and prepare/display responses. Typical examples include API endpoints, UI controllers, or request routers.

    2. **Logic Layer**: Encapsulates core business rules and logic. This includes traditional application services and ML-specific functionalities like data preprocessing, model training, evaluation logic, or decision-making components.

    3. **Data Layer**: Manages persistent data storage and retrieval. This layer includes classes that handle databases, file systems, or any data source—whether for storing system state or machine learning models.

    Class Metadata:
    - **Class Name**: {class_name}
    - **Class File**: {class_file}

    Class Code:
    ```python
    {class_code}
    ```

    Categorize the class and respond with the category number.
    '''

    try:
        # Send the prompt to the LLM using the llama-3.1-8b-instant model
        chat_completion = groq.chat.completions.create(
            messages=[{"role": "user", "content": prompt}],
            model="llama-3.1-8b-instant",
            temperature=1  # Moderate randomness
        )

        # Extract the response content
        content = chat_completion.choices[0].message.content

        # Extract the first digit found (expected to be 1, 2, or 3)
        match = re.search(r'\d', content)
        if match:
            return match.group(0)
        else:
            return "Unclassified"  # If no digit is found in the response
    except Exception as e:
        # Catch and print any errors, return "Unclassified" for failed attempts
        print(f"Error occurred while processing class: {class_name}. Error: {str(e)}")
        return "Unclassified"

def map_category(category_number):
    """
    Maps a numeric category (as returned by the LLM) to a human-readable architectural layer.
    """
    return category_map.get(category_number, "Unclassified")

def classify_classes(df):
    """
    Applies LLM classification to each class in the DataFrame and maps results to readable labels.
    Adds a new column 'Layer_Category' with the classification results.
    """
    # First classify with raw LLM output (1, 2, 3, or Unclassified)
    df["Layer_Category"] = df.apply(lambda row: classify_with_llm(row["ClassName"], row["Code"], row["File"]), axis=1)

    # Then map the numeric result to descriptive category names
    df["Layer_Category"] = df["Layer_Category"].apply(map_category)
    return df

def classify_csv(input_file):
    """
    Reads a CSV file containing extracted class definitions,
    classifies each class using an LLM, and writes the results to a new CSV file.
    """
    # Load the CSV file into a DataFrame
    df = pd.read_csv(input_file)

    # Apply classification to all rows
    classified_df = classify_classes(df)

    # Print the classified DataFrame to console
    print(classified_df)

    # Save the classification results to a new output file
    output_file = "output_layers_Asparagus_llama3.1-8b.csv"
    classified_df.to_csv(output_file, index=False)
    return output_file

# Entry point when the script is run directly
if __name__ == '__main__':
    # Classify the classes from the specified CSV file
    classify_csv("extracted_classes_Asparagus.csv")

ModuleNotFoundError: No module named 'dotenv'

AI Layer Classification

In [None]:
import pandas as pd
import re
from dotenv import load_dotenv
from groq import Groq
import os

# Load environment variables from the .env file
load_dotenv()

# Set the Groq API key (make sure to replace with your actual key or load it securely)
os.environ["GROQ_API_KEY"] = ""

# Initialize the Groq client
groq = Groq(api_key=os.environ["GROQ_API_KEY"])

def classify_ai_layer(class_name, class_code, class_file):
    """
    Sends a class definition to the LLM to classify it into an AI-specific architectural layer.
    """
    prompt = f'''
    You are an AI software analysis assistant specializing in Python code. Your task is to analyze the given class definition and categorize it into one of the following categories:

    Categories:
    1. Non-AI Layer: Handles general application logic, such as UI, APIs, logging, or database management.
    2. AI Preprocessing Layer: Manages data preparation, feature extraction, or dataset transformations.
    3. Model Training & Evaluation Layer: Responsible for training machine learning models, fine-tuning, and evaluating their performance.
    4. Model Deployment & Monitoring Layer: Handles inference, model serving, and performance monitoring.

    Class Information:
    - Class Name: {class_name}
    - Class File: {class_file}

    Class Code:
    ```python
    {class_code}
    ```

    Please categorize the class and respond with the category number.
    '''

    try:
        # Send the prompt to the LLM and get the response
        chat_completion = groq.chat.completions.create(
            messages=[{"role": "user", "content": prompt}],
            model="llama-3.1-8b-instant",
            temperature=1
        )

        # Extract the numeric category from the model response
        content = chat_completion.choices[0].message.content
        match = re.search(r'\d', content)
        if match:
            return match.group(0)
        else:
            return "Unclassified"
    except Exception as e:
        # Print any error that occurs and return a fallback label
        print(f"Error occurred while processing class: {class_name}. Error: {str(e)}")
        return "Unclassified"

# Dictionary mapping category numbers to human-readable AI layers
ai_category_map = {
    "1": "Non-AI Layer",
    "2": "AI Preprocessing Layer",
    "3": "Model Training & Evaluation Layer",
    "4": "Model Deployment & Monitoring Layer"
}

def map_ai_category(category_number):
    """
    Converts a category number to a descriptive name.
    """
    return ai_category_map.get(category_number, "Unclassified")

def classify_ai_layers(df):
    """
    Applies AI layer classification to each class in the DataFrame.
    Adds a new column with the AI-related classification.
    """
    df["AI_Layer_Category"] = df.apply(
        lambda row: classify_ai_layer(row["ClassName"], row["Code"], row["File"]), axis=1
    )
    df["AI_Layer_Category"] = df["AI_Layer_Category"].apply(map_ai_category)
    return df

def process_ai_classification(input_file):
    """
    Loads a CSV with previously classified classes, applies AI-specific classification,
    and saves the results to a new CSV.
    """
    df = pd.read_csv(input_file)
    df = classify_ai_layers(df)

    # Show the result in console
    print(df)

    # Save the new classification result
    output_file = "8B-output_ai_layers_Asparagus_llama3.1-8b.csv"
    df.to_csv(output_file, index=False)
    return output_file

# Run the AI layer classification when this script is executed directly
if __name__ == '__main__':
    process_ai_classification("output_layers_PFEAPP_llama3.1-8b.csv")


Error occurred while processing class: NewsService. Error: Error code: 413 - {'error': {'message': 'Request too large for model `llama-3.1-8b-instant` in organization `org_01jmyy0n05efgvvv3ttw9hppyx` service tier `on_demand` on tokens per minute (TPM): Limit 6000, Requested 7027, please reduce your message size and try again. Need more tokens? Upgrade to Dev Tier today at https://console.groq.com/settings/billing', 'type': 'tokens', 'code': 'rate_limit_exceeded'}}
Error occurred while processing class: RabbitMQService. Error: Error code: 413 - {'error': {'message': 'Request too large for model `llama-3.1-8b-instant` in organization `org_01jmyy0n05efgvvv3ttw9hppyx` service tier `on_demand` on tokens per minute (TPM): Limit 6000, Requested 6122, please reduce your message size and try again. Need more tokens? Upgrade to Dev Tier today at https://console.groq.com/settings/billing', 'type': 'tokens', 'code': 'rate_limit_exceeded'}}
Error occurred while processing class: PredictionService. 

Mapping of the Class Classification into a CSV

In [None]:
import pandas as pd

# Load the original CSV file containing class layer predictions
txt_file = 'output_ai_layers_PFEAPP-llama31-8b.csv'
df = pd.read_csv(txt_file, header=0)

# Strip any leading/trailing whitespace from column names
df.columns = df.columns.str.strip()

# Ensure the file contains the required 'ClassName' column
if 'ClassName' not in df.columns:
    raise ValueError("The file does not contain 'ClassName' as a column.")

# Define mapping for traditional system layers
layer_mapping = {
    'User Layer': 'User Layer',
    'Logic Layer': 'Logic Layer',
    'Data Layer': 'Data Layer'
}

# Define mapping for AI-specific layers
ai_layer_mapping = {
    'Non-AI Layer': 'Non-AI Layer',
    'AI Preprocessing Layer': 'AI Preprocessing Layer',
    'Model Training & Evaluation Layer': 'Model Training & Evaluation Layer',
    'Model Deployment & Monitoring Layer': 'Model Deployment & Monitoring Layer'
}

# Create a DataFrame to hold one-hot encoded system layer categories
df_layers = pd.DataFrame()
df_layers['ClassName'] = df['ClassName'].str.strip()  # Clean class names

# One-hot encode each traditional layer category
for category in layer_mapping:
    df_layers[category] = (df['Layer_Category'].str.strip() == category).astype(int)

# Add a column to mark unclassified system layers
df_layers['Unclassified'] = ~df['Layer_Category'].str.strip().isin(layer_mapping.keys())
df_layers['Unclassified'] = df_layers['Unclassified'].astype(int)

# Create a DataFrame to hold one-hot encoded AI layer categories
df_ai_layers = pd.DataFrame()

# One-hot encode each AI-specific layer category
for category in ai_layer_mapping:
    df_ai_layers[category] = (df['AI_Layer_Category'].str.strip() == category).astype(int)

# Add a column to mark unclassified AI layers
df_ai_layers['Unclassified'] = ~df['AI_Layer_Category'].str.strip().isin(ai_layer_mapping.keys())
df_ai_layers['Unclassified'] = df_ai_layers['Unclassified'].astype(int)

# Merge both system and AI layer one-hot encodings into a single DataFrame
df_transformed = pd.concat([df_layers, df_ai_layers], axis=1)

# Sort the output by class name
df_transformed = df_transformed.sort_values(by=['ClassName'])

# Save the transformed results to a new CSV file
output_file = 'LLama8B_PFEAPP.csv'
df_transformed.to_csv(output_file, index=False)

# Notify the user
print(f'Transformed file saved to {output_file}')

# **Phase 2: Class Vector Embedding Encoding**

In [None]:
import torch
from transformers import AutoTokenizer, AutoModel
import pandas as pd
import numpy as np
from sklearn.decomposition import PCA

# Load the pretrained CodeBERT tokenizer and model
model_name = "microsoft/codebert-base"
tokenizer = AutoTokenizer.from_pretrained(model_name)
model = AutoModel.from_pretrained(model_name)

def get_code_embedding(code_snippet):
    """
    Generates an embedding vector for a given code snippet using CodeBERT.
    The embedding is the mean of all token representations in the last hidden state.
    """
    tokens = tokenizer(code_snippet, return_tensors="pt", padding=True, truncation=True, max_length=512)
    with torch.no_grad():
        output = model(**tokens)
    embedding = output.last_hidden_state.mean(dim=1).squeeze().numpy()
    return embedding

# Load class data from CSV and drop the 'File' column (not needed for embedding)
df = pd.read_csv("cleaned_Classes_LLama70B_PFEAPP.csv").drop(columns=['File'])

# Generate CodeBERT embeddings for each class's code snippet
df['Embedding'] = df['Code'].apply(lambda code: get_code_embedding(code))

# Stack all embedding vectors into a single matrix for PCA processing
embeddings_matrix = np.vstack(df['Embedding'].values)

# Reduce dimensionality of embeddings using PCA (from 768 → 100 dimensions)
pca = PCA(n_components=100)
dim_reduced_embeddings = pca.fit_transform(embeddings_matrix)

# Create a DataFrame with the reduced embeddings
embedding_columns = [f'feature_{i}' for i in range(dim_reduced_embeddings.shape[1])]
embeddings_df = pd.DataFrame(dim_reduced_embeddings, columns=embedding_columns)

# Combine reduced embeddings with the corresponding class names
df_final = pd.concat([df[['ClassName']], embeddings_df], axis=1)

# Save the result to a new CSV file
output_path = "generated_reduced_code_embeddings_reduced.csv"
df_final.to_csv(output_path, index=False)

# Notify completion
print(f"Reduced embeddings saved to {output_path}")

Merging the CSVs

In [None]:
import pandas as pd

# Load the two CSV files
df1 = pd.read_csv("generated_reduced_code_embeddings_reduced.csv")
df2 = pd.read_csv("cleaned_LLama70B_PFEAPP.csv")

# Find common class names
common_classes = set(df1['ClassName']).intersection(set(df2['ClassName']))

# Filter DataFrames to only keep rows with common class names
df1_common = df1[df1['ClassName'].isin(common_classes)]
df2_common = df2[df2['ClassName'].isin(common_classes)]

# Sort both DataFrames by 'ClassName'
df1_sorted = df1_common.sort_values(by='ClassName').reset_index(drop=True)
df2_sorted = df2_common.sort_values(by='ClassName').reset_index(drop=True)
df2_sorted = df2_sorted.drop(df2_sorted.columns[0], axis=1)

# Merge both sorted DataFrames side by side
merged_df = pd.concat([df1_sorted, df2_sorted], axis=1)

# Drop the 7th column from the end

# Save the merged DataFrame
merged_df.to_csv("merged_classnames_LLama70B.csv", index=False)

# **Phase 3: Microservices Identification and Clustering**

In [None]:
import pandas as pd
import hdbscan
import numpy as np
from sklearn.preprocessing import StandardScaler

def cluster_classes_by_layer(csv_path, output_csv_path):
    """
    Clusters classes within each layer (Non-AI, AI Preprocessing, Model Training, Model Deployment)
    using HDBSCAN and saves the cluster labels to a new CSV.
    """
    try:
        df = pd.read_csv(csv_path)

        layers = {
            "Non-AI Layer": "Non_AI_Cluster",
            "AI Preprocessing Layer": "AI_Preprocessing_Cluster",
            "Model Training & Evaluation Layer": "Model_Training_Cluster",
            "Model Deployment & Monitoring Layer": "Model_Deployment_Cluster"
        }

        # Initialize cluster columns with NaN
        for cluster_col_name in layers.values():
            df[cluster_col_name] = np.nan

        for layer, cluster_col_name in layers.items():
            layer_df = df[df[layer] == 1].copy()

            if layer_df.empty:
                print(f"Skipping {layer}: No data found.")
                continue

            # Extract feature columns (excluding layer columns & ClassName)
            feature_cols = [col for col in df.columns if col not in layers.keys() and col != 'ClassName']

            # Drop rows with NaN in feature columns
            layer_df = layer_df.dropna(subset=feature_cols)

            # Ensure there are valid feature columns
            if layer_df.empty or len(feature_cols) == 0:
                print(f"Skipping {layer}: No valid features after removing NaN values.")
                continue

            features = layer_df[feature_cols].values

            # Check if there are enough samples
            if features.shape[0] < 2:
                print(f"Skipping {layer}: Not enough samples for clustering (Found {features.shape[0]} samples).")
                continue

            # Standardize features
            scaler = StandardScaler()
            scaled_features = scaler.fit_transform(features)

            # Apply HDBSCAN clustering
            clusterer = hdbscan.HDBSCAN(min_cluster_size=2, gen_min_span_tree=True)
            cluster_labels = clusterer.fit_predict(scaled_features)

            # Assign cluster labels
            df.loc[layer_df.index, cluster_col_name] = cluster_labels

        # Save the DataFrame with cluster labels to a new CSV
        df.to_csv(output_csv_path, index=False)
        print(f"Clustering results saved to {output_csv_path}")

    except FileNotFoundError:
        print("Error: Input CSV file not found.")
    except KeyError as e:
        print(f"Error: Missing column in CSV - {e}")
    except Exception as e:
        print(f"An unexpected error occurred: {e}")

csv_path = "merged_classnames_LLama70B.csv"
output_csv_path = "LayerBased_clustered_classes.csv"

cluster_classes_by_layer(csv_path, output_csv_path)

Results Evaluation

In [None]:

import pandas as pd
from sklearn.metrics import precision_score, recall_score, f1_score, accuracy_score
# file1 = './70Bmerged_Output_Llama3.1-8B_cleaned.csv'
# file2 = './upupdated_feature_matrix_modified_cleaned.csv'
# Load the CSV files
df_true = pd.read_csv('') #Benchmark Labels
df_pred = pd.read_csv('') #Predicted Labels
# Sort both DataFrames alphabetically by 'ClassName'
df_true = df_true.sort_values(by='ClassName').reset_index(drop=True)
df_pred = df_pred.sort_values(by='ClassName').reset_index(drop=True)
# Validate structure
assert df_true.shape == df_pred.shape, "Mismatch in shape"
assert all(df_true.columns == df_pred.columns), "Mismatch in column names"

# Find start index after 'ClassName'
start_index = df_true.columns.get_loc('ClassName') + 1
target_columns = df_true.columns[start_index:]

# Lists for overall metrics
all_true = []
all_pred = []

print("\n📊 Per-Column Evaluation Metrics:")
print("-" * 40)

for col in target_columns:
    y_true = df_true[col]
    y_pred = df_pred[col]

    # Store for global metrics
    all_true.extend(y_true)
    all_pred.extend(y_pred)

    precision = precision_score(y_true, y_pred, average='binary', zero_division=0)
    recall = recall_score(y_true, y_pred, average='binary', zero_division=0)
    f1 = f1_score(y_true, y_pred, average='binary', zero_division=0)
    accuracy = accuracy_score(y_true, y_pred)

    print(f"🔸 {col}")
    print(f"   Precision: {precision:.4f}")
    print(f"   Recall:    {recall:.4f}")
    print(f"   F1 Score:  {f1:.4f}")
    print(f"   Accuracy:  {accuracy:.4f}\n")

# Overall metrics across all columns and rows
overall_precision = precision_score(all_true, all_pred, average='binary', zero_division=0)
overall_recall = recall_score(all_true, all_pred, average='binary', zero_division=0)
overall_f1 = f1_score(all_true, all_pred, average='binary', zero_division=0)
overall_accuracy = accuracy_score(all_true, all_pred)

print("📈 Overall Evaluation Metrics (across all columns):")
print("-" * 50)
print(f"✅ Precision: {overall_precision:.4f}")
print(f"✅ Recall:    {overall_recall:.4f}")
print(f"✅ F1 Score:  {overall_f1:.4f}")
print(f"✅ Accuracy:  {overall_accuracy:.4f}")