In [1]:
import os, re, time
from dotenv import load_dotenv 
import json
from utils import *
from web3 import Web3
import pandas as pd
from pprint import pprint
from utils import *
from tqdm import tqdm
from decimal import Decimal
from datetime import datetime
import numpy as np
import mplfinance as mpf

# Get the config file
configObj = ConfigManager("config.json")
appInfo, configData = configObj.load_config()
nodeUrl = appInfo["alchemy_url"]+appInfo["alchemy_key"]


In [2]:

# Replace these values with your specific pair
PAIR_ADDRESS = "0x5c7f3aaba7943d90f10166ed53f9179ce7d8d989"; 
RPC_URL = nodeUrl
web3Obj = Web3(Web3.HTTPProvider(nodeUrl))

# Get transactions from the last 1000 blocks
end_block = 21400000 + 80000
from_block = 21400000 - 60000
end_block = web3Obj.eth.block_number
from_block = end_block - 40000

handler = ETH_Handler(web3Obj)
pairInfo = handler.get_pair_info(PAIR_ADDRESS)
quoteAssetIndex = 0 if pairInfo["token0"]["symbol"] == "WETH" else 1 if pairInfo["token1"]["symbol"] == "WETH" else None
quoteAsset = pairInfo[f"token{quoteAssetIndex}"]["symbol"]
baseAsset = pairInfo[f"token{quoteAssetIndex-1 if quoteAssetIndex == 1 else 1}"]["symbol"]
assetName = f"{baseAsset}/{quoteAsset}"

candleHandler = tokenCandlestick("./resources/ETH_block_data", web3Obj, "ETH", True)
transactions = candleHandler.get_uniswap_pair_transactions(
    PAIR_ADDRESS, 
    from_block, 
    end_block, 
    None, 
    pairInfo["token0"]["decimals"], 
    pairInfo["token1"]["decimals"], 
    2 if pairInfo["version"]=="UNI_V2" else 3 if pairInfo["version"]=="UNI_V3" else -1,
    quoteAssetIndex,
    0.0001)
priceDf = candleHandler.process_transactions(transactions)
candleHandler.get_plot(priceDf, "4h", {"title":assetName})

Exception: Error fetching pair information: Not a uniswap V2 or V3 pair. are you sure what you have entered is for a conteract address?

In [8]:
import asyncio
import websockets
import json
from datetime import datetime
import logging

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

class EthereumListener:
    def __init__(self, websocket_url):
        self.websocket_url = websocket_url
        self.subscription_id = None
    
    async def subscribe_to_blocks(self, websocket):
        subscribe_message = {
            "jsonrpc": "2.0",
            "id": 1,
            "method": "eth_subscribe",
            "params": ["newHeads"]
        }
        await websocket.send(json.dumps(subscribe_message))
        response = await websocket.recv()
        response_json = json.loads(response)
        self.subscription_id = response_json.get('result')
        logger.info(f"Subscribed to newHeads with id: {self.subscription_id}")

    def process_block(self, block_data):
        try:
            block = block_data['params']['result']
            number = int(block['number'], 16)  # Convert hex to int
            timestamp = int(block['timestamp'], 16)
            gas_used = int(block['gasUsed'], 16)
            gas_limit = int(block['gasLimit'], 16)
            
            print(f"\nNew Block #{number} at {datetime.fromtimestamp(timestamp)}")
            print(f"Hash: {block['hash']}")
            print(f"Parent Hash: {block['parentHash']}")
            print(f"Gas Used: {gas_used:,}")
            print(f"Gas Limit: {gas_limit:,}")
            print(f"Base Fee: {int(block.get('baseFeePerGas', '0x0'), 16)}")
            print("-" * 50)
        except Exception as e:
            logger.error(f"Error processing block: {e}")

    async def listen(self):
        while True:
            try:
                async with websockets.connect(self.websocket_url) as websocket:
                    logger.info("Connected to Ethereum node")
                    await self.subscribe_to_blocks(websocket)
                    
                    while True:
                        try:
                            message = await websocket.recv()
                            data = json.loads(message)
                            
                            # Handle subscription messages
                            if 'method' in data and data['method'] == 'eth_subscription':
                                self.process_block(data)
                                
                        except websockets.ConnectionClosed:
                            logger.warning("Connection closed unexpectedly")
                            break
                        
            except Exception as e:
                logger.error(f"Connection error: {e}")
                logger.info("Reconnecting in 5 seconds...")
                await asyncio.sleep(5)

async def main():
    # Replace YOUR-API-KEY with your Alchemy API key
    alchemy_url = f"wss://eth-mainnet.g.alchemy.com/v2/gMrbyp_Ydp55S-AfZPNLztwYS_pa1Z59"
    listener = EthereumListener(alchemy_url)
    await listener.listen()

await main()

INFO:__main__:Connected to Ethereum node
INFO:__main__:Subscribed to newHeads with id: 0xe90a7754ca8c4491fe529525b5a2fa37



New Block #21624628 at 2025-01-14 22:20:23
Hash: 0x84292f1d251b18b3132508cd0404d9c70038fc90eb3b62470c2facb52592cc03
Parent Hash: 0xb81bac76ce7d4a2918d0cb8d8b0f3b4235be8df52ee775daf6a08ee0330f4f7a
Gas Used: 8,983,713
Gas Limit: 30,000,000
Base Fee: 7165748934
--------------------------------------------------

New Block #21624629 at 2025-01-14 22:20:35
Hash: 0xe3847dceb52e1da89402739382c70b3cb8bbd4236b1cd42f35cee0b0ad88b8ee
Parent Hash: 0x84292f1d251b18b3132508cd0404d9c70038fc90eb3b62470c2facb52592cc03
Gas Used: 19,967,773
Gas Limit: 30,000,000
Base Fee: 6806488917
--------------------------------------------------


CancelledError: 

In [10]:

def get_pool_transactions(pool_address: str, alchemy_api_key: str, limit: int = 10):
    url = f"https://solana-mainnet.g.alchemy.com/v2/{alchemy_api_key}"
    
    payload = {
        "id": 1,
        "jsonrpc": "2.0",
        "method": "getSignaturesForAddress",
        "params": [
            pool_address,
            {"limit": limit}
        ]
    }
    
    headers = {
        "accept": "application/json",
        "content-type": "application/json"
    }
    
    response = requests.post(url, json=payload, headers=headers)
    transactions = response.json().get("result", [])
    
    # Process and print transactions
    for tx in transactions:
        print(f"""
        Signature: {tx['signature']}
        Time: {datetime.fromtimestamp(tx['blockTime'])}
        Slot: {tx['slot']}
        """)
import requests

url = "https://solana-mainnet.g.alchemy.com/v2/gMrbyp_Ydp55S-AfZPNLztwYS_pa1Z59"

payload = {
    "id": 1,
    "jsonrpc": "2.0",
    "method": "getBlockTime",
    "params": [165768577]
}
headers = {
    "accept": "application/json",
    "content-type": "application/json"
}

get_pool_transactions("aegbjqtexiegdaxu1zhbeqmkxk6ruqncxjwvdo4n4cbg", "gMrbyp_Ydp55S-AfZPNLztwYS_pa1Z59")

[]


In [15]:
url = f"https://solana-mainnet.g.alchemy.com/v2/gMrbyp_Ydp55S-AfZPNLztwYS_pa1Z59"

payload = {
    "id": 1,
    "jsonrpc": "2.0",
    "method": "getSignaturesForAddress",
    "params": [{ "limit": 10 }, "aegbjqtexiegdaxu1zhbeqmkxk6ruqncxjwvdo4n4cbg"]
}
headers = {
    "accept": "application/json",
    "content-type": "application/json"
}

response = requests.post(url, json=payload, headers=headers)
print(response.text)
transactions = response.json().get("result", [])

{"jsonrpc":"2.0","error":{"code":-32602,"message":"Invalid params: invalid type: map, expected a string."},"id":1}
