In [1]:
!pip install --force-reinstall --ignore-installed blinker

Collecting blinker
  Downloading blinker-1.9.0-py3-none-any.whl.metadata (1.6 kB)
Downloading blinker-1.9.0-py3-none-any.whl (8.5 kB)
Installing collected packages: blinker
Successfully installed blinker-1.9.0


In [None]:
!pip install crewai crewai-tools

In [None]:
!pip install python-dotenv

In [3]:
import os
from dotenv import load_dotenv

load_dotenv()

GEMINI_API_KEY = os.getenv("GEMINI_API_KEY")

In [4]:
from crewai import LLM
gemini_llm = LLM(
    model="gemini/gemini-1.5-flash",
    api_key=GEMINI_API_KEY,
    temperature=0,
)

In [5]:
from crewai.tools.structured_tool import CrewStructuredTool
from pydantic import BaseModel
import requests

class APICallInput(BaseModel):
    data: dict

# Wrapper function to execute the API call
def tool_wrapper(*args, **kwargs):
    url = "https://api.mainnet-beta.solana.com/"
    
    try:
        response = requests.post(url, json=kwargs["data"])

        if response.status_code == 200:
            return response.json()
        else:
            print('Error:', response.status_code)
            return None
    except requests.exceptions.RequestException as e:
        print('Error:', e)
        return None

# Create and return the structured tool
def create_structured_tool():
    return CrewStructuredTool.from_function(
        name='Wrapper API',
        description="A tool to wrap API calls with structured input.",
        args_schema=APICallInput,
        func=tool_wrapper,
    )

# Example usage
structured_tool = create_structured_tool()

# Execute the tool with structured input
result = structured_tool._run(**{
    "data": {"jsonrpc":"2.0", "id":1, "method":"getAccountInfo", "params":["TokenzQdBNbLqP5VEhdkAS6EPFLC1PHnBqCXEpPxuEb", {"encoding":"base64"}]}
})
print(result)

{'jsonrpc': '2.0', 'result': {'context': {'apiVersion': '2.0.21', 'slot': 314378092}, 'value': {'data': ['AgAAAL4zkcMPRYarxgednZ3r504XyexzY/JuY/2bbqm+blYz', 'base64'], 'executable': True, 'lamports': 1141440, 'owner': 'BPFLoaderUpgradeab1e11111111111111111111111', 'rentEpoch': 18446744073709551615, 'space': 36}}, 'id': 1}


In [9]:
# information = Task(
#     description='Fetch the details of the {data} on the blockchain and convert the balance into SOL and check if the SOL is greater than 0.',
#     expected_output='Balance, address, space ocucpied by the data, and return true is SOL balance is greather than 0 and false otherwise',
#     agent=informant,
# )

In [10]:
# crew = Crew(
#     agents=[informant],
#     tasks=[information],
#     verbose=True,
# )

In [None]:
# result = crew.kickoff(inputs={
#         "data": {
#             "jsonrpc": "2.0",
#             "id": 1,
#             "method": "getAccountInfo",
#             "params": [
#                 "6ZMqaeUzxUuwU692dvRwiZFeDWEsB3BFZ527Cbugxf9y",
#                 {"encoding": "base64"}
#             ]
#         }
#     })

In [29]:
from crewai import Agent, Task, Crew, Process, LLM
class FirstAgentOutput(BaseModel):
    task: str
    data: dict

task_define = Agent(
    role='Task Definer and Populator',
    goal='Process the user input and tell which task is needed to be performed after identifying if the input provided is an address, signature or a message. Also populate the field in curly brackets in params of the correct task in {task_params_map}.',
    backstory='Have knowledge about all the tasks that can be requested by the user'
    "Able to identify the right task needed to be performed given the user query. Your output will be used by Blockchain Information Retreiver to fetch the corresponding info",
    verbose=True,
    llm = gemini_llm
    
)

get_task = Task(
    description='Identify the task needed to be performed in {text}. Return getAccountInfo is the user wants to send SOL to someone.',
    expected_output='Give me the name of the task from getAccountInfo, getContractMetadata, getGasPrices, getTransactionDetails, getHealth. Also give the task data from the task_params_map with the populated address field if it has one in data field'
                    'Give output with "task" field and "data" field',
    agent=task_define,
    output_json = FirstAgentOutput
)


In [6]:
informant = Agent(
    role='Blockchain Information Retreiver',
    goal='Provide accurate blockchain details using the given information. You will use the output provided by the Task Definer and Populator to perform the correct task'
         'Pass the data field from the output of Task Definer and Populator to perform the task'
         'Only perform one task',
    backstory='An expert on retreiving information on blockchains',
    tools=[structured_tool],
    verbose=True,
    llm = gemini_llm
    
)

In [35]:
task_params_map = {
    "getAccountInfo" : {
        "data": {
            "jsonrpc": "2.0",
            "id": 1,
            "method": "getAccountInfo",
            "params": [
                "{address}",
                {"encoding": "base64"}
            ]
        }
    },
    "getContractMetadata": {
        "data" : {
            "jsonrpc": "2.0",
            "id": 1,
            "method": "getProgramAccounts",
            "params": [
              "{address}",
              {
                "filters": [
                  {
                    "dataSize": 17
                  },
                  {
                    "memcmp": {
                      "offset": 4,
                      "bytes": "3Mc6vR"
                    }
                  }
                ]
              }
            ]
          }
    },
    "getGasPrices": {
        "data" : {
          "id":1,
          "jsonrpc":"2.0",
          "method":"getFeeForMessage",
          "params":[
            "{message}",
            {
              "commitment":"processed"
            }
          ]
        }
    },
    
    "getTransactionDetails": {
        "data" : {
            "jsonrpc": "2.0",
            "id": 1,
            "method": "getTransaction",
            "params": [
                "{signature}",
                {"encoding": "jsonParsed","maxSupportedTransactionVersion":0}
            ]
        }
    },

    "getHealth": {
        "data" : {
            "jsonrpc":"2.0",
            "id":1,
            "method":"getHealth"
        }
    },
    # "trackTransactionStatus": {
    #     "data" : {
    #         "jsonrpc": "2.0",
    #         "id": 1,
    #         "method": "trackTransactionStatus",
    #         "params": ["transaction_id"]
    #     }
    # }
    
}

In [None]:
from crewai.tasks.conditional_task import ConditionalTask
conditional_getAccountInfo_task = ConditionalTask(
    task=getAccountInfo,
    condition=getAccountInfo_condition
)

In [31]:
import json
def getContractMetadata_condition(output):
    output_dict = json.loads(output.raw)
    return output_dict['task'] == 'getContractMetadata'
    # return 'getContractMetadata' in output['task'] 
def getGasPrices_condition(output):
    output_dict = json.loads(output.raw)
    return output_dict['task'] == 'getGasPrices'
    # return 'getGasPrices' in output['task'] 
def getTransactionDetails_condition(output):
    output_dict = json.loads(output.raw)
    return output_dict['task'] == 'getTransactionDetails'
    # return  'getTransactionDetails' in output['task']
def getHealth_condition(output):
    output_dict = json.loads(output.raw)
    return output_dict['task'] == 'getHealth'
    # return 'getHealth' in output['task'] 
def getAccountInfo_condition(output):
    output_dict = json.loads(output.raw)
    return output_dict['task'] == 'getAccountInfo'
    # return 'getAccountInfo' in output['task'] 

# Task 1: getContractMetadata
getContractMetadata = Task(
    description='Fetch metadata details of the program at the given address. Metadata should include owner, lamports, and any associated tags.',
    expected_output='Contract metadata including owner, lamports, space, and other relevant details.',
    context = [get_task],
    agent=informant,  # Replace with the appropriate agent name,
)

# Task 2: getGasPrices
getGasPrices = Task(
    description='Fetch the fees for the message on blockchain.',
    expected_output='All the parameters',
    context = [get_task],
    agent=informant,
)

# Task 3: getTransactionDetails
getTransactionDetails = Task(
    description='Fetch the details of the transaction with the given ID. Include sender, receiver, amount, timestamp, status, and confirmation count.',
    expected_output='Transaction details including sender, receiver, amount, timestamp, status (confirmed or pending), and confirmation count.',
    context = [get_task],
    agent=informant,
)

# Task 4: getNetworkInfo
getHealth = Task(
    description='Retrieve the details of the blockchain network. Include current block height, number of transactions per second, and network health status.',
    expected_output='Network information including block height, TPS (transactions per second), and network health status (healthy, degraded, etc.).',
    context = [get_task],
    agent=informant,
)

# # Task 5: trackTransactionStatus
# trackTransactionStatus = Task(
#     description='Monitor the status of the transaction with the given ID. Provide updates on confirmation status and finality.',
#     expected_output='Transaction status including confirmation percentage, finality status (finalized or not), and any associated errors.',
#     agent=informant,
# )

# Task 5: getAccountInfo
getAccountInfo = Task(
    description='Fetch the details of the given address on the blockchain and convert the balance into SOL and check if the SOL is greater than 0.',
    expected_output='Balance, address, space ocucpied by the data, and return true is SOL balance is greather than 0 and false otherwise',
    context = [get_task],
    agent=informant,
)

In [37]:
crew = Crew(
    agents=[task_define],
    tasks=[get_task],
    verbose=True,
)

result = crew.kickoff(inputs={"text":"Get me the details for hide2jJRvN58wfCr4frtdEwULWPK3dQ92ewqbxgJnEL", "task_params_map": task_params_map})

# crew2 = Crew(
#     agents=[informant],
#     tasks=[getContractMetadata, getGasPrices, getTransactionDetails, getHealth, getAccountInfo],
#     verbose=True,
# )

second_agent_tasks = {
    "getContractMetadata" : getContractMetadata,
    "getGasPrices" : getGasPrices,
    "getTransactionDetails" : getTransactionDetails,
    "getHealth" : getHealth,
    "getAccountInfo" : getAccountInfo
}

if result['task'] in second_agent_tasks:
    second_task = second_agent_tasks[result['task']]
    crew = Crew(agents=[informant], tasks=[second_task], verbose=True)
    final_result = crew.kickoff(inputs={"data": result['data']})

[92m14:45:31 - LiteLLM:INFO[0m: utils.py:2802 - 
LiteLLM completion() model= gemini-1.5-flash; provider = gemini


[1m[95m# Agent:[00m [1m[92mTask Definer and Populator[00m
[95m## Task:[00m [92mIdentify the task needed to be performed in Get me the details for hide2jJRvN58wfCr4frtdEwULWPK3dQ92ewqbxgJnEL. Return getAccountInfo is the user wants to send SOL to someone.[00m


[92m14:45:33 - LiteLLM:INFO[0m: utils.py:949 - Wrapper: Completed Call, calling success_handler
[92m14:45:33 - LiteLLM:INFO[0m: utils.py:2802 - 
LiteLLM completion() model= gemini-1.5-flash; provider = gemini




[1m[95m# Agent:[00m [1m[92mTask Definer and Populator[00m
[95m## Final Answer:[00m [92m
{
  "task": "getAccountInfo",
  "data": {
    "jsonrpc": "2.0",
    "id": 1,
    "method": "getAccountInfo",
    "params": ["hide2jJRvN58wfCr4frtdEwULWPK3dQ92ewqbxgJnEL", {"encoding": "base64"}]
  }
}[00m


[1m[95m# Agent:[00m [1m[92mBlockchain Information Retreiver[00m
[95m## Task:[00m [92mFetch the details of the given address on the blockchain and convert the balance into SOL and check if the SOL is greater than 0.[00m


[92m14:45:37 - LiteLLM:INFO[0m: utils.py:949 - Wrapper: Completed Call, calling success_handler
[92m14:45:37 - LiteLLM:INFO[0m: utils.py:2802 - 
LiteLLM completion() model= gemini-1.5-flash; provider = gemini




[1m[95m# Agent:[00m [1m[92mBlockchain Information Retreiver[00m
[95m## Thought:[00m [92mThought:I need to use the Wrapper API to fetch the account information using the provided parameters.  Then I'll extract the balance, convert it to SOL, check if it's greater than 0, and construct the final answer.[00m
[95m## Using tool:[00m [92mWrapper API[00m
[95m## Tool Input:[00m [92m
"{\"data\": {\"jsonrpc\": \"2.0\", \"id\": 1, \"method\": \"getAccountInfo\", \"params\": [\"hide2jJRvN58wfCr4frtdEwULWPK3dQ92ewqbxgJnEL\", {\"encoding\": \"base64\"}]}}"[00m
[95m## Tool Output:[00m [92m
{'jsonrpc': '2.0', 'result': {'context': {'apiVersion': '2.1.8', 'slot': 314382693}, 'value': {'data': ['', 'base64'], 'executable': False, 'lamports': 14639420117, 'owner': '11111111111111111111111111111111', 'rentEpoch': 18446744073709551615, 'space': 0}}, 'id': 1}[00m


[92m14:45:39 - LiteLLM:INFO[0m: utils.py:949 - Wrapper: Completed Call, calling success_handler




[1m[95m# Agent:[00m [1m[92mBlockchain Information Retreiver[00m
[95m## Final Answer:[00m [92m
{'Balance': 14.639420117, 'address': 'hide2jJRvN58wfCr4frtdEwULWPK3dQ92ewqbxgJnEL', 'space occupied by the data': 0, 'SOL balance greater than 0': True}[00m


