In [None]:
pip install langchain

In [None]:
from dotenv import load_dotenv
import os
import json

load_dotenv('../.env')

In [None]:
from langchain.llms import OpenAI
from langchain.prompts import PromptTemplate
from langchain.chains import LLMChain
from langchain.chat_models import ChatOpenAI
from langchain.prompts.chat import (
    ChatPromptTemplate,
    SystemMessagePromptTemplate,
    AIMessagePromptTemplate,
    HumanMessagePromptTemplate,
)
from langchain.schema import AIMessage, HumanMessage, SystemMessage
from langchain.agents import initialize_agent, Tool
from langchain.agents import AgentType

llm = ChatOpenAI(temperature=0, model="gpt-3.5-turbo-1106")

## langchain calling CCCS tools: AssemblyLine and Beaver

In [None]:
AL_API_KEY = os.getenv('AL_API_KEY')
AL_USER = os.getenv('AL_USER')
BEAVER_API_KEY = os.getenv('BEAVER_API_KEY')

In [None]:
from assemblyline_client import get_client
al_client = get_client("https://malware.cyber.gc.ca:443", apikey=(AL_USER, AL_API_KEY))

In [None]:
import requests
def beaver_search(sha256):
    response = requests.get(
        f"https://beaver.ops.cyber.gc.ca/auth/api/sha256/{sha256}/json",
        headers={
            "X-API-Key": BEAVER_API_KEY,
            "Accept": "application/json",
        }
    )
    return response

In [None]:
def get_antivirus_service_results_from_al(sha256):
    """Get the AssemblyLine antivirus report for the given sha256"""
    search_result = al_client.search.result(f'sha256:{sha256} AND response.service_name:AntiVirus', fl="*")
    # return "\n".join([i['title_text'] for i in search_result['items'][0]['result']['sections']])
    if len(search_result['items']) > 0:
        return json.dumps([i['title_text'] for i in search_result['items'][0]['result']['sections']])
    else:
        return "No antivirus report available for this sha256 in AssemblyLine"

def get_antivirus_service_results_from_beaver(sha256):
    response = beaver_search(sha256)
    if response.status_code == 200:
        json_data = json.loads(response.text)
        if "SHADOW_SERVER" in json_data['malwareReport']['reports']:
            return json.dumps(json_data['malwareReport']['reports']['SHADOW_SERVER'][0]['results'])
    return "No antivirus report available for this sha256 in Beaver"

In [None]:
tools = [
    Tool(
        name = "GetAntivirusServiceResultsFromAssemblyLine",
        func=get_antivirus_service_results_from_al,
        description="useful for when you need to answer questions about AssemblyLine's antivirus reports for a given sha256.  AssemblyLine is a CCCS tool available at: https://malware.cyber.gc.ca"
    ),
    Tool(
        name = "GetAntivirusServiceResultsFromBeaver",
        func=get_antivirus_service_results_from_beaver,
        description="useful for when you need to answer questions about Beaver's antivirus reports for a given sha256.  Beaver is a CCCS tool available at https://beaver.ops.cyber.gc.ca"
    )
]
mrkl = initialize_agent(tools, llm, agent=AgentType.OPENAI_FUNCTIONS, verbose=False, debug=False)

In [None]:
# from langchain.globals import set_verbose, set_debug
# set_verbose(True)
# set_debug(False)
print(mrkl.run("What are the antivirus results in AssemblyLine and BeAVER for sha256 af42459d9989ef3ff60d43e8069b647bfa2307e847452d2848dcc6f970984104?"))

In [None]:
print(mrkl.run("What are the antivirus results in AssemblyLine and BeAVER for sha256 ca6a054154acb559a495cfea1923933a65c94a325834a3cf9f46d85d3adaacf5?"))