In [1]:
!pip install pocketflow

Collecting pocketflow
  Downloading pocketflow-0.0.2-py3-none-any.whl.metadata (329 bytes)
Downloading pocketflow-0.0.2-py3-none-any.whl (3.3 kB)
Installing collected packages: pocketflow
Successfully installed pocketflow-0.0.2


In [None]:
from pocketflow import Node, BatchNode, Flow
import yaml

In [None]:
from google import genai

# Initialize Gemini client (ideally done once per app)
client = genai.Client(api_key="AIzaSyDEaD5pWBhA3bzrKnYRV6vjwyw-7tUkCvs")  # Replace with your actual API key

def call_llm(prompt, model="gemini-2.0-flash"):
    try:
        response = client.models.generate_content(
            model=model,
            contents=prompt
        )
        return response.text  # The generated text response
    except Exception as e:
        print("LLM call failed:", e)
        return "LLM call error"


In [None]:
# Node 1: Generate Interview Questions
class GenerateQuestions(Node):
    def prep(self, shared):
        return shared["resume"], shared["interview_type"] 

    def exec(self, inputs):
        resume, interview_type = inputs
        return f"1. Tell me about your experience with Python.\n2. How do you approach debugging?\n3. Describe a challenging project.\n4. What are your thoughts on clean code?\n5. Explain a technical decision you made recently."

    def post(self, shared, _, questions_text):
        shared["questions"] = questions_text.strip().split("\n")
        return "default"

In [None]:
# Node 2: Evaluate Candidate Answers
class EvaluateAnswers(BatchNode):
    def prep(self, shared):
        return list(zip(shared["questions"], shared["answers"]))

    def exec(self, qa_pair):
        question, answer = qa_pair
        prompt = f"""
Question: {question}
Answer: {answer}

Evaluate and return a score (1-10) and feedback:
```yaml
score: <score>
feedback: <feedback>
```"""
        response = call_llm(prompt)
        yaml_part = response.split("```yaml")[1].split("```")[0].strip()
        return yaml.safe_load(yaml_part)

    def post(self, shared, _, results):
        shared["evaluations"] = results
        return "default"

In [None]:
# Node 3: Summarize the Interview
class SummarizeInterview(Node):
    def prep(self, shared):
        return shared["evaluations"]

    def exec(self, evaluations):
        total = sum(e["score"] for e in evaluations)
        avg = total / len(evaluations)
        feedback = "\n".join([f"Q{i+1}: {e['feedback']}" for i, e in enumerate(evaluations)])
        return f"""Overall Score: {avg:.1f}/10\n\nDetailed Feedback:\n{feedback}"""

    def post(self, shared, _, result):
        shared["summary"] = result
        return "default"

In [None]:
# Create the flow
qgen = GenerateQuestions()
eval = EvaluateAnswers()
summ = SummarizeInterview()

qgen >> eval >> summ
flow = Flow(start=qgen)

# Example execution
if __name__ == "__main__":
    shared = {
        "resume": "John Doe: Python dev, 5 years experience in web backend, SQL, Flask.",
        "interview_type": "technical",
        "answers": [
            "I have worked with Python for over 5 years...",
            "My debugging process starts with...",
            "A project I found challenging was...",
            "I follow clean code principles like...",
            "Recently, I chose PostgreSQL over MongoDB because..."
        ]
    }
    flow.run(shared)
    print("\n=== INTERVIEW SUMMARY ===\n")
    print(shared["summary"])
