In [None]:
%%capture --no-stderr
%pip install -U langchain langgraph langchain-community langchain-anthropic tavily-python pandas pygraphviz

In [None]:
import getpass
import os


def _set_env(var: str):
    if not os.environ.get(var):
        os.environ[var] = getpass.getpass(f"{var}: ")


_set_env("ANTHROPIC_API_KEY")
_set_env("TAVILY_API_KEY")
_set_env("GITHUB_APP_ID")
_set_env("GITHUB_APP_PRIVATE_KEY")
_set_env("GITHUB_REPOSITORY")

# Recommended
_set_env("LANGCHAIN_API_KEY")
os.environ["LANGCHAIN_TRACING_V2"] = "true"
os.environ["LANGCHAIN_PROJECT"] = "ekline-langgraph-api"

In [None]:
import requests
import base64
import time
import jwt

# Function to load the private key from a PEM file
def load_private_key(pem_file_path):
    with open(pem_file_path, 'r') as pem_file:
        private_key = pem_file.read()
    return private_key

def generate_jwt(app_id, private_key_path):
	private_key = load_private_key(private_key_path)
	payload = {
		'iat': int(time.time()),
		'exp': int(time.time()) + (10 * 60),  # JWT expiration time (10 minutes)
		'iss': app_id
	}
	encoded_jwt = jwt.encode(payload, private_key, algorithm='RS256')
	return encoded_jwt

def get_installation_access_token(app_id, private_key_path, installation_id):
	jwt_token = generate_jwt(app_id, private_key_path)
	headers = {
		'Authorization': f'Bearer {jwt_token}',
		'Accept': 'application/vnd.github+json',
		'X-GitHub-Api-Version': '2022-11-28'
	}
	url = f'https://api.github.com/app/installations/{installation_id}/access_tokens'
	response = requests.post(url, headers=headers)
	response_data = response.json()
	return response_data['token']

app_id = '903588'
installation_id = '51026556'
private_key_path = os.environ.get("GITHUB_APP_PRIVATE_KEY")
access_token = get_installation_access_token(app_id, private_key_path, installation_id)

In [None]:
from langchain_core.tools import tool

#Tools

def get_repo_content(path, branch="main"):
  """
  Get the content of a file or files in a GitHub repository
  Args:
    path (str): path of the file of which the content is to be retrieved
  """
  url = f"https://api.github.com/repos/ekline-io/documentation/contents/{path}?ref={branch}"
  headers = {
    'Authorization': f'Bearer {access_token}',
    'Accept': 'application/vnd.github+json'
  }
  response = requests.get(url, headers=headers)
  if response.status_code == 200:
    return response.json()
  else:
    print(f"Failed to get repository content: {response.status_code}")
    return None


def update_github_file(path, content, sha, commit_message, branch):
  """
  Update a file in a GitHub repository
  Args:
    path (str): path of the file to be updated
    content (str): new content of the file
    sha (str): SHA of the file to be updated
    commit_message (str): commit message
    branch (str): branch in which the file is to be updated
  """
  url = f"https://api.github.com/repos/ekline-io/documentation/contents/{path}"
  headers = {
    'Authorization': f'Bearer {access_token}',
    'Accept': 'application/vnd.github+json'
  }
  encoded_content = base64.b64encode(content.encode()).decode()
  data = {
    'message': commit_message,
    'committer': {
      "name": "Your Name",
      "email": "your-email@example.com"
    },
    'content': encoded_content,
    'sha': sha,
    'branch': branch
  }
  response = requests.put(url, headers=headers, json=data)
  if response.status_code == 200:
    print(f"File {path} updated successfully.")
  else:
    print(f"Failed to update file {path}: {response.status_code}")
    print(response.json())


@tool
def create_branch(branch_name="fix_branch", source_branch="main"):
    """
    Create a new branch in a GitHub repository
    Args:
      branch_name (str): name of the new branch
      source_branch (str): name of the branch from which the new branch is to be created
    """
    url = f"https://api.github.com/repos/ekline-io/documentation/git/refs/heads/{source_branch}"
    headers = {
      'Authorization': f'Bearer {access_token}',
      'Accept': 'application/vnd.github+json'
    }
    response = requests.get(url, headers=headers)
    if response.status_code == 200:
      source_sha = response.json()["object"]["sha"]
    else:
      print(f"Failed to get source branch SHA: {response.status_code}")
      print(response.json())
      return None

    url = f"https://api.github.com/repos/ekline-io/documentation/git/refs"
    data = {
      "ref": f"refs/heads/{branch_name}",
      "sha": source_sha
    }
    response = requests.post(url, headers=headers, json=data)
    if response.status_code == 201:
      print(f"Branch {branch_name} created successfully.")
      return branch_name
    else:
      print(f"Failed to create branch {branch_name}: {response.status_code}")
      print(response.json())
      return None

@tool
def create_pull_request(title, body, head, base="main"):
  """
  Create a pull request in a GitHub repository
  Args:
    title (str): title of the pull request
    body (str): body of the pull request
    head (str): name of the branch from which the changes are to be pulled
    base (str): name of the branch to which the changes are to be pulled
  """
  url = f"https://api.github.com/repos/ekline-io/documentation/pulls"
  headers = {
    'Authorization': f'Bearer {access_token}',
    'Accept': 'application/vnd.github+json'
  }
  data = {
    "title": title,
    "body": body,
    "head": head,
    "base": base
  }
  response = requests.post(url, headers=headers, json=data)
  if response.status_code == 201:
    print("Pull request created successfully.")
    return response.json()
  else:
    print(f"Failed to create pull request: {response.status_code}")
    print(response.json())
    return None
 
@tool
def modify_files_in_repo(old_content, new_content, path="", branch="main"):
  """
  Modify files in a GitHub repository by replacing old content with new content
  Args:
    old_content (str): content to be replaced
    new_content (str): new content
    path (str): path of the directory in the repository
    branch (str): branch in which the files are to be modified
  """
  items = get_repo_content(path, branch)
  if items is None:
    return

  for item in items:
    if item['type'] == 'file':
      file_content = base64.b64decode(item['content']).decode()
      modified_content = file_content.replace(old_content, new_content)
      if modified_content != file_content:
        update_github_file(item['path'], modified_content, item['sha'], "Update token variable", branch)
    elif item['type'] == 'dir':
      modify_files_in_repo(old_content, new_content, item['path'], branch)

@tool
def code_search_github_repo(query):
  """
	Search for user query and return all the files from a repository which contains string in the query
	Args:
		query (str): search query
	Returns: 
		List[str]: List of files from the repository
	"""
  headers = {
		'Authorization': f'Bearer {access_token}',
		'Accept': 'application/vnd.github+json',
		'X-GitHub-Api-Version': '2022-11-28'
	}
  print(f"Searching for '{query}' in the repository...{access_token}")
  url = f'https://api.github.com/search/code?q={query}&org:ekline-io&type=Code&path:*.md'
  response = requests.get(url, headers=headers)
  return response.json()

In [None]:
from langgraph.graph import StateGraph, END
from langchain_core.runnables import Runnable, RunnableConfig
from typing import Annotated
from typing_extensions import TypedDict
import operator
from langchain_core.messages import AnyMessage, SystemMessage, HumanMessage, AIMessage, ChatMessage, ToolMessage
from langchain_anthropic import ChatAnthropic

class AgentState(TypedDict):
	messages: Annotated[list[AnyMessage], operator.add]

class Agent:
	def __init__(self, model, tools, system=""):
		self.system = system	
		graph = StateGraph(AgentState)
		graph.add_node("llm", self.call_llm)
		graph.add_node("action", self.take_action)
		graph.add_conditional_edges(
				"llm",
				self.exists_action,
				{True: "action", False: END}
		)
		graph.add_edge("action", "llm")
		graph.set_entry_point("llm")
		self.graph = graph.compile()
		self.tools = {t.name: t for t in tools}
		self.model = model.bind_tools(tools)
	
	def exists_action(self, state: AgentState):
		result = state['messages'][-1]
		return len(result.tool_calls) > 0
	
	def call_llm(self, state: AgentState):
		messages = state['messages']
		if self.system:
				messages = [SystemMessage(content=self.system)] + messages
		message = self.model.invoke(messages)
		return {'messages': [message]}
	
	def take_action(self, state: AgentState):
		tool_calls = state['messages'][-1].tool_calls
		results = []
		for t in tool_calls:
			print(f"Calling: {t}")
			if not t['name'] in self.tools:      # check for bad tool name from LLM
				print("\n ....bad tool name....")
				result = "bad tool name, retry"  # instruct LLM to retry if bad
			else:
				result = self.tools[t['name']].invoke(t['args'])
			results.append(ToolMessage(tool_call_id=t['id'], name=t['name'], content=str(result)))
		print("Back to the model!")
		return {'messages': results}


In [None]:
prompt = """You are a smart github assistant. You have access to Github respository. \
You need to help user with solving their request for a task. \
You have access to tools that can help you with search repository, create branch, solve the query and create Pull Request on the repo. \
If you are asked to solve an issue, You can first create a branch, then solve the query by updating files and then create a Pull Request.
"""

tools = [code_search_github_repo, create_branch, modify_files_in_repo, create_pull_request]
model = ChatAnthropic(model="claude-3-sonnet-20240229", temperature=1)
abot = Agent(model, tools, system=prompt)

In [None]:
messages = [HumanMessage(content="Can you update all files with `YOUR_EKLINE_TOKEN` to `YOUR_EKLINE_TOKEN_V2`?")]
result = abot.graph.invoke({"messages": messages})

In [None]:
messages = [HumanMessage(content="Can you just List all files with `YOUR_EKLINE_TOKEN` ?")]
thread = {"configurable": {"thread_id": "1"}}
for event in abot.graph.stream({"messages": messages}, thread):
    for v in event.values():
        print(v)