In [20]:
from langchain_ollama import ChatOllama
from langchain_ollama.embeddings import OllamaEmbeddings
from langchain_chroma import Chroma


llm = ChatOllama(
    model = "llama3.2",
    temperature = 0,
    format="json"
)

embeddings = OllamaEmbeddings(
    model="nomic-embed-text"
)

vector_store = Chroma(
    collection_name="descriptions",
    embedding_function=embeddings,
    persist_directory="./vector_store",
)

retriever = vector_store.as_retriever(
  search_type="similarity", 
  search_kwargs={'k': 1}
)

In [29]:
from langchain_core.output_parsers import JsonOutputParser
from langchain_core.prompts import PromptTemplate

retrieval_prompt = PromptTemplate(
    template="""<|begin_of_text|><|start_header_id|>system<|end_header_id|> You are a code grader assessing relevance 
    of a retrieved code snippet to a user question. If the code snippet answers the user's question, 
    grade it as relevant. It does need to be a stringent test. The goal is to filter out erroneous retrievals. \n
    Give a binary score 'yes' or 'no' score to indicate whether the code snippet is relevant to the question. \n
    Provide the binary score as a JSON with a single key 'score' and no premable or explanation.
     <|eot_id|><|start_header_id|>user<|end_header_id|>
    Here is the retrieved code snippet: \n\n {snippet} \n\n
    Here is the user question: {query} \n <|eot_id|><|start_header_id|>assistant<|end_header_id|>
    """,
    input_variables=["query", "snippet"],
)

retrieval_grader = retrieval_prompt | llm | JsonOutputParser()

question = "How do I create a worker pool in Go?"

snippet = """
package rabbitmq

import (
	"log"
	"strings"
	"sync"
	"time"

	"github.com/streadway/amqp"
	"golang.org/x/time/rate"
)

type WorkerPool struct {
	QueueName   string
	RateLimiter *rate.Limiter
	Workers     int
	MaxWorkers  int
}

func NewWorkerPool(queueName string, rateLimit *rate.Limiter, workers int, maxWorkers int) *WorkerPool {
	return &WorkerPool{
		QueueName:   queueName,
		RateLimiter: rateLimit,
		Workers:     workers,
		MaxWorkers:  maxWorkers,
	}
}

func Scheduler(pools []*WorkerPool, ch *amqp.Channel) {
	var wg sync.WaitGroup
	ch.Qos(
		1,     // prefetch count
		0,     // prefetch size
		false, // global
	)
	for _, pool := range pools {
		wg.Add(1)
		go func(pool *WorkerPool) {
			defer wg.Done()
			for i := 0; i < pool.Workers; i++ {
				go Worker(pool.QueueName, pool.RateLimiter, ch)
			}
		}(pool)
	}

	// Dynamic priority adjustment loop
	go func() {
		for {
			time.Sleep(7 * time.Second) // Adjust priorities every 10 seconds

			for _, pool := range pools {
				q, err := ch.QueueInspect(pool.QueueName)
				if err != nil {
					log.Printf("Failed to inspect queue %s: %v", pool.QueueName, err)
					continue
				}

				if q.Messages == 0 && pool.Workers > 1 {
					log.Printf("Queue %s is idle, reallocating workers", pool.QueueName)
					// Reallocate workers to another pool (this is a simplified example)
					reallocateWorkers(pools, ch)
				} else {
					log.Println("Queue is not idle, scaling workers...")
					scaleWorkers(pool, ch)
				}
			}
			println("Total messages: ", getTotalMessages(getQueueStats(pools, ch)))
			println()
		}
	}()

	wg.Wait()
}

func scaleWorkers(pool *WorkerPool, ch *amqp.Channel) {
	q, err := ch.QueueInspect(pool.QueueName)
	if err != nil {
		log.Printf("Failed to inspect queue %s: %v", pool.QueueName, err)
		return
	}

	messagesInQueue := q.Messages
	// required_workers = messagesInQueue / 5
	newWorkers := 1 + (messagesInQueue / 5)

	if newWorkers > pool.MaxWorkers {
		newWorkers = pool.MaxWorkers
	}

	pool.Workers = newWorkers

	log.Printf("Scaled workers for queue %s to %d", pool.QueueName, pool.Workers)
}

func getQueueStats(pools []*WorkerPool, ch *amqp.Channel) map[string]int {
	// Gather queue statistics
	queueStats := make(map[string]int)
	for _, pool := range pools {
		q, err := ch.QueueInspect(pool.QueueName)
		if err != nil {
			log.Printf("Failed to inspect queue %s: %v", pool.QueueName, err)
			continue
		}
		queueStats[pool.QueueName] = q.Messages
	}

	return queueStats
}

func getTotalMessages(queueStats map[string]int) int {
	totalMessages := 0
	for _, messages := range queueStats {
		totalMessages += messages
	}
	return totalMessages
}

func reallocateWorkers(pools []*WorkerPool, ch *amqp.Channel) {

	queueStats := getQueueStats(pools, ch)

	totalMessages := getTotalMessages(queueStats)
	println("Total messages: ", totalMessages)

	println("Reallocating workers...")
	// Adjust worker allocation based on queue backlog and priority
	for _, pool := range pools {
		if totalMessages == 0 {
			pool.Workers = 1 // At least one worker per pool
		} else {
			// Adjust based on the proportion of messages in the queue
			// and the priority level of the pool
			priorityWeight := getPriorityWeight(pool.QueueName)
			pool.Workers = int(float64(pool.Workers) * (float64(queueStats[pool.QueueName]) / float64(totalMessages)) * priorityWeight)

			// Ensure at least one worker is always allocated to avoid starvation
			if pool.Workers < 1 {
				pool.Workers = 1
			}

			// Ensure a maximum of 10 workers per pool
			if pool.Workers > pool.MaxWorkers {
				pool.Workers = pool.MaxWorkers
			}

		}

		log.Printf("Reallocated %d workers to queue %s", pool.Workers, pool.QueueName)

	}

	println("Workers reallocated")
	println()
}

func getPriorityWeight(queueName string) float64 {
	switch {
	case strings.Contains(queueName, "high"):
		return 1.5
	case strings.Contains(queueName, "medium"):
		return 1.0
	case strings.Contains(queueName, "low"):
		return 0.5
	default:
		return 1.0
	}
}

"""

split_snipp = snippet.split("\n")
enumerated = list(enumerate(split_snipp))
snippet = f"""{enumerated[1][0]}: {enumerated[1][1]}\n"""
for i, line in enumerated[2:]:
    snippet += f"{i}: {line}\n"


retrieval_grader.invoke({"query": question, "snippet": snippet})

{'score': 'yes'}

In [32]:
highlighter_prompt = PromptTemplate(
    template="""<|begin_of_text|><|start_header_id|>system<|end_header_id|> You are a coding assistant extracting useful lines of code from the given code snippet. 
    Our retrieval system has extracted a code snippet that answers a users query. Your task is to identify the exact function that answers users query. \n
    Analyse the code snippet, give the start line number and end line number of the indentified function/module.
    Don't be too precise also include a few lines before the start line and after the end line  \n
    Provide line numbers as JSON with a two keys 'start_line' and 'end_line' and no premable or explanation.
     <|eot_id|><|start_header_id|>user<|end_header_id|>
    Here is the retrieved code snippet: \n\n {snippet} \n\n
    Here is the user question: {query} \n <|eot_id|><|start_header_id|>assistant<|end_header_id|>
    """,
    input_variables=["query", "snippet"],
)

retrieval_grader = highlighter_prompt | llm | JsonOutputParser()

retrieval_grader.invoke({
    "query": question,
    "snippet": snippet
})

{'start_line': 20, 'end_line': 26}

In [None]:
explanation_prompt = PromptTemplate(
    template="""<|begin_of_text|><|start_header_id|>system<|end_header_id|> You are tasked with analyzing and explaining 
    the provided code snippet. Your explanation should cover the purpose of the code, its functionality, 
    and any important concepts or techniques used which are followed by the organization.
    Discuss the efficiency, readability, and potential improvements of the code. \n
    Provide examples or scenarios where the code might be useful. \n
     <|eot_id|><|start_header_id|>user<|end_header_id|>
    Here is the code: \n ```{snippet}``` \n\n <|eot_id|><|start_header_id|>assistant<|end_header_id|>
    """,
    input_variables=["snippet"],
)

explanation_grader = explanation_prompt | llm | JsonOutputParser()

In [None]:
difference_prompt = PromptTemplate(
    template="""<|begin_of_text|><|start_header_id|>system<|end_header_id|> You are a coding buddy in an organization. You are tasked to help a new joinee with comparing two code snippets. Suggest the best out of two adheering to your organizations coding conventions. \n
    You need to identify the differences between the two code snippets.
    Generate 'differences' and 'judgement' keys in the JSON output. \n
    The 'differences' key should contain the differences between the two code snippets. Judge them on different parameters like,
     reliablility, dependancies, performance, scalability , compliance with organization's coding standards, portability, etc. \n
    The 'judgement' key should contain the judgement of the differences. Keep this short and to the point. Tell here which of the two to use. \n
    Provide output as JSON with a two keys 'differences' and 'judgement' as strings and no premable or explanation.
     <|eot_id|><|start_header_id|>user<|end_header_id|>
    Here is the code snippet 1: \n ```{snippet1}``` \n\n
    Here is the code snippet 2: \n ```{snippet2}``` \n\n \n <|eot_id|><|start_header_id|>assistant<|end_header_id|>
    """,
    input_variables=["snippet1", "snippet2"],
)

difference_grader = difference_prompt | llm | JsonOutputParser()