### Python bot for Uniswap trading

In [None]:
%pip install web3
%pip install uniswap-python

In [1]:
import json
import math
import time
import secrets
import asyncio
import requests
import credentials
from web3 import Web3
from uniswap import Uniswap
from eth_account import Account
import matplotlib.pyplot as plt
from IPython.display import clear_output

In [2]:
#Connect to Ethereum

#RPC endpoints

INFURA_KEY = credentials.infura_key
ETHERSCAN_KEY = credentials.etherscan_key
QUICKNODE_KEY = credentials.quicknode_key

RPC = {
    'mainnet': f'https://mainnet.infura.io/v3/{INFURA_KEY}',
    'goreli': f'https://goerli.infura.io/v3/{INFURA_KEY}',
    'mainnet_ws': f'wss://mainnet.infura.io/ws/v3/{INFURA_KEY}',
    'goreli_ws': f'wss://goerli.infura.io/ws/v3/{INFURA_KEY}',
    'quicknode': f'https://wider-weathered-theorem.discover.quiknode.pro/{QUICKNODE_KEY}/',
    'quicknode_ws': f'wss://wider-weathered-theorem.discover.quiknode.pro/{QUICKNODE_KEY}/',
}

#Connect to Ethereum mainnet
mainnet_ws = RPC['quicknode_ws']
w3 = Web3(Web3.WebsocketProvider(mainnet_ws))
w3.is_connected()

True

In [3]:
def initialize_wallet(option):
    if option == 1:
        #Get a new wallet to trade with
        prive_key = secrets.token_hex(32)
        account = Account.from_key(prive_key)
        account_info = {
            "account": account,
            "private_key": prive_key,
            "address": account.address
        }
        return account_info
    elif option == 2:
        #Import wallet with funds
        prive_key_imported = input("Enter your private key: ")
        account_imported = Account.from_key(prive_key_imported)
        account_info = {
            "account": account_imported,
            "private_key": prive_key_imported,
            "address": account_imported.address
        }
        return account_info
    print("Invalid option")
    return None

In [15]:
def get_abi(contract_address):
    #Get Contract ABI with Etherscan API
    api_key = ETHERSCAN_KEY
    url = 'https://api.etherscan.io/api'
    params = {
        'module': 'contract',
        'action': 'getabi',
        'address': contract_address,
        'apikey': api_key
    }
    response = requests.get(url, params=params)
    data = response.json()
    if data['status'] != '1':
        print("Error: ", data['message'])
        return None
    else:
        abi = data['result']
        return abi

def get_token_info(token_address):
        abi = get_abi(token_address)
        #Initialize contract
        contract = w3.eth.contract(address=token_address, abi=abi)
        #Get token info
        totalsupply, decimals, symbol, owner = None, None, None, None
        try:
            totalsupply = contract.functions.totalSupply().call()
            decimals = contract.functions.decimals().call()
            symbol = contract.functions.symbol().call()
            owner = contract.functions.owner().call()
        except:
            pass
        token_info = {
            "totalsupply": totalsupply,
            "decimals": decimals,
            "symbol": symbol,
            "owner": owner
        }
        return token_info

### Method 1: Using Uniswap SDK python wrapper

In [5]:
#Initialize Uniswap Object
account_info = initialize_wallet(1)

address = account_info['address']
private_key = account_info['private_key']
version = 2
provider = RPC['mainnet']

uniswap_v2 = Uniswap(address=address, private_key=private_key, version=version, provider=provider)

version = 3
uniswap_v3 = Uniswap(address=address, private_key=private_key, version=version, provider=provider)

In [6]:
def conversion(rate_1, rate_2, decimal_1, decimal_2):
    conversion = ((1 / rate_1) * (rate_2 / 10 ** decimal_2)) * 10 ** decimal_1
    return conversion

In [13]:
#Token info
weth = Web3.to_checksum_address("0xC02aaA39b223FE8D0A0e5C4F27eAD9083C756Cc2")
usdt = Web3.to_checksum_address("0xdac17f958d2ee523a2206206994597c13d831ec7")

addr = Web3.to_checksum_address("0x00282FD551D03dC033256C4bf119532e8C735D8a")
token_info = get_token_info(addr)
print(token_info)

#USDT rate for dollar denominations
usdt_rate = uniswap_v2.get_price_input(weth, usdt, qty=10 ** 18)

#Search for token pair in Uniswap v2
try:
    token_rate = uniswap_v2.get_price_input(weth, addr, qty=10 ** 18)
    print("V2:")
    print(f"1 {token_info['symbol']} = {conversion(token_rate, usdt_rate, token_info['decimals'], 6)} USDT")
    print(f"1 ETH = {conversion(1, token_rate, 0, token_info['decimals'])} {token_info['symbol']}")
except Exception as error:
    print(error)

#Search for token pair in Uniswap v3
try:
    token_rate = uniswap_v3.get_price_input(weth, addr, qty=10 ** 18)
    print("V3:")
    print(f"1 {token_info['symbol']} = {conversion(token_rate, usdt_rate, token_info['decimals'], 6)} USDT")
    print(f"1 ETH = {conversion(1, token_rate, 0, token_info['decimals'])} {token_info['symbol']}")
except Exception as error:
    print(error)

{'totalsupply': 88888888888800, 'decimals': 2, 'symbol': 'BIAO', 'owner': '0x0000000000000000000000000000000000000000'}


No fee set, assuming 0.3%


V2:
1 BIAO = 5.922184993957105e-06 USDT
1 ETH = 313427910.12 BIAO
execution reverted


### Method 2: Using Query the blockchain directly

In [51]:
#Uniswap V2, token pair address
UNISWAP_V2_FACTORY = Web3.to_checksum_address("0x5C69bEe701ef814a2B6a3EDD4B1652CB9cc5aA6f")
USDT = Web3.to_checksum_address("0xdac17f958d2ee523a2206206994597c13d831ec7")
WETH = Web3.to_checksum_address("0xC02aaA39b223FE8D0A0e5C4F27eAD9083C756Cc2")

TOKEN_0 =  Web3.to_checksum_address("0x6982508145454Ce325dDbE47a25d4ec3d2311933")
TOKEN_1 = WETH

#Get token info
token_0_info = get_token_info(TOKEN_0)
token_1_info = get_token_info(TOKEN_1)
#Get token decimals
token_0_decimals = token_0_info['decimals']
token_1_decimals = token_1_info['decimals']
#Get token symbol
token_0_symbol = token_0_info['symbol']
token_1_symbol = token_1_info['symbol']

#Get Uniswap V2 Factory Contract ABI
abi_v2 = get_abi(UNISWAP_V2_FACTORY)
#Initialize contract
contract_v2 = w3.eth.contract(address=UNISWAP_V2_FACTORY, abi=abi_v2)
#Get token pair address
pair_address = contract_v2.functions.getPair(TOKEN_0, TOKEN_1).call()

#Calculate token price using getReserves() function
abi_pair = get_abi(pair_address)
contract_pair = w3.eth.contract(address=pair_address, abi=abi_pair)
reserve = contract_pair.functions.getReserves().call()

#Caculate token pair rate

if TOKEN_1 < TOKEN_0:
    token_0_reserve = reserve[1]/10**token_0_decimals
    token_1_reserve = reserve[0]/10**token_1_decimals
    token_pair_rate = token_1_reserve/token_0_reserve

else:
    token_0_reserve = reserve[0]/10**token_0_decimals
    token_1_reserve = reserve[1]/10**token_1_decimals
    token_pair_rate = token_0_reserve/token_1_reserve

print(f"Token 0 ({token_0_symbol}) reserve: {token_0_reserve}")
print(f"Token 1 ({token_1_symbol}) reserve: {token_1_reserve}")

#Raw token rate
print("True token price:")
print(f"1 WETH = {token_pair_rate} Token")

#Caculate the cumulative price using priceCumulativeLast() function
token_0_price_cumulative_last = contract_pair.functions.price0CumulativeLast().call()
token_1_price_cumulative_last = contract_pair.functions.price1CumulativeLast().call()


Token 0 (WETH) reserve: 2531.3744175001248
Token 1 (PEPE) reserve: 3698643184785.309
True token price:
1 WETH = 1461120551.4346344 Token


### Token Monitor mode

In [None]:
price_chart = []
while True:
    reserve = contract_pair.functions.getReserves().call()

    if TOKEN_1 < TOKEN_0:
        token_0_reserve = reserve[1]/10**token_0_decimals
        token_1_reserve = reserve[0]/10**token_1_decimals
        token_pair_rate = token_1_reserve/token_0_reserve
    else:
        token_0_reserve = reserve[0]/10**token_0_decimals
        token_1_reserve = reserve[1]/10**token_1_decimals
        token_pair_rate = token_0_reserve/token_1_reserve

    price_chart.append(token_pair_rate)
    
    clear_output(wait=True)
    print("Token 0 reserve:", token_0_reserve)
    print("Token 1 reserve:", token_1_reserve)

    #Raw token rate
    print("Raw token rate:")
    print("1 WETH =", token_pair_rate, "Token")
    plt.plot(price_chart)
    plt.show()
    time.sleep(5)

In [15]:
# Monitor the memepool of a given token

# addr = Web3.to_checksum_address("0x955d5c14c8d4944da1ea7836bd44d54a8ec35ba1")
# token_info = get_token_info(addr)
# print(token_info)

### Uniswap Fundamentals

In [82]:
#constant product formula
# x * y = k
#Assume initial liquidity of 100000DAI and 1000WETH
#Our share is 10000DAI and 100WETH (10%)
x = 100000
y = 1000
k = x * y
print("before")
print("The LP has", x, "DAI and", y, "WETH")
print("price of 1 DAI =", y/x, "WETH")
print("price of 1 WETH =", x/y, "DAI")

#Now assume that the price of ETH increases to 150DAI

#find new x and y such that x/y = 150 and x * y = k

y = math.sqrt(k / 150)
x = math.sqrt(k * 150)
print("after")
print("The LP has", x, "DAI and", y, "WETH")
print("price of 1 DAI =", y/x, "WETH")
print("price of 1 WETH =", x/y, "DAI")

#Find the impermanent loss

hold_value = 10000 + 100 * 150
print("Hold value =", hold_value, "DAI")
LP_share_value = x*0.1 + y*0.1*150
print("LP share value =", LP_share_value, "DAI")
impermanent_loss = LP_share_value - hold_value
print("Impermanent loss =", impermanent_loss, "DAI", impermanent_loss/hold_value*100,"%")


before
The LP has 100000 DAI and 1000 WETH
price of 1 DAI = 0.01 WETH
price of 1 WETH = 100.0 DAI
after
The LP has 122474.48713915891 DAI and 816.496580927726 WETH
price of 1 DAI = 0.006666666666666666 WETH
price of 1 WETH = 150.00000000000003 DAI
Hold value = 25000 DAI
LP share value = 24494.897427831784 DAI
Impermanent loss = -505.1025721682163 DAI -2.020410288672865 %
