In [4]:
import datetime
import threading
import pandas as pd
from web3 import Web3
from pprint import pprint as pp
from secrete import *

# 여러개의 CSV를 처리하기 위해서 사용
from multiprocessing import Process, Pool
# 한 CSV에서 탐색하기 위해서 사용
from threading import Thread, Lock

base_URL = "https://mainnet.infura.io/v3/"
base_Input_PATH = "./input/"
base_Output_PATH = "./output/"

MAX_Chunk_Number = 362
MAX_Quiry = 100000
MAX_Thread_Quiry = 20000

INFURA_URL_Limit_List = {key: 0 for key in INFURA_URL_List}
Second_INFURA_URL_Limit_List = {key: 0 for key in Second_INFURA_URL_List}
used_urls = set()
lock = Lock()

In [5]:
def is_eoa(w3, address):
    try:
        checksum_address = w3.to_checksum_address(address)
        uri = (w3.provider.endpoint_uri).replace(base_URL,"")
        INFURA_URL_Limit_List[uri] += 1
        return w3.eth.get_code(checksum_address) == b''
    except Exception as e:
        Exception(f"{uri} key is expired")
        

def find_available_url():
    with lock:
        for url in INFURA_URL_List:
            if url not in used_urls and MAX_Quiry - INFURA_URL_Limit_List[url] >= MAX_Thread_Quiry and INFURA_URL_Limit_List[url] < MAX_Quiry:
                used_urls.add(url)
                return url

        for url in Second_INFURA_URL_List:
            if url not in used_urls and MAX_Quiry - Second_INFURA_URL_Limit_List[url] >= MAX_Thread_Quiry and Second_INFURA_URL_Limit_List[url] < MAX_Quiry:
                used_urls.add(url)
                return url

        raise Exception("All API Keys expired or reached limit")


def work_thread(chunk_df, return_df):
    url_INFURA = find_available_url()
    w3 = Web3(Web3.HTTPProvider(base_URL + url_INFURA))
    print(f"{threading.current_thread().name} Start")
    print(f"Using API-Key:{url_INFURA}")

    # Convert the addresses to checksum format after handling NaN values
    chunk_df['from_address'] = chunk_df['from_address'].apply(w3.to_checksum_address)
    chunk_df['to_address'] = chunk_df['to_address'].apply(w3.to_checksum_address)

    while True:
        try:
            eoa_df = chunk_df[
                (chunk_df['from_address'].apply(lambda x: is_eoa(w3, x))) & 
                (chunk_df['to_address'].apply(lambda x: is_eoa(w3, x)))
            ]
            return_df.append(eoa_df)
            # 다 사용했다면, 회수하기.
            used_urls.remove(url_INFURA)
            break
        except Exception as e:
            if e in "All API Keys":
                raise Exception("All API Keys expired or reached limit")
            else:
                url_INFURA = find_available_url
                print(f'Change the API Key to {url_INFURA}')
                w3 = Web3(Web3.HTTPProvider(base_URL + url_INFURA))


def refine_INFURA(nProcess: str, file_Name: str):
    print(f"Process{nProcess} start reading {file_Name}")
    # 1 process 4 INFURA_URL
    file_Path = base_Input_PATH+file_Name
    chunk_df = pd.read_csv(file_Path)

    #chunk_df = pd.read_csv("C:\\Users\\PET\\Desktop\\논문작성관련\\self-trade\\230914\\result_1.csv")
    # Drop rows with NaN or empty values in 'from_address' or 'to_address'
    chunk_df = chunk_df.dropna(subset=['from_address', 'to_address'])
    thread_List = []
    output_df_list = []

    addition_Count = 10000
    start = 0

    for nThread in range(4):
        end = start + addition_Count
       
        # check API key limitation
        all_limits_exceeded = all(INFURA_URL_Limit_List[url] >= 100000 for url in INFURA_URL_List)
        if all_limits_exceeded:
            raise Exception("All API Key expired")

        thread_List.append(Thread(target=work_thread, args=(chunk_df.loc[start: end], output_df_list)))
        start += addition_Count

        try:
            thread_List[nThread].start()
        except Exception as e:
            raise Exception(f"All API Key expired\nLast FileName: {file_Name}")

    print(f"All Thread Started at {datetime.datetime.now()}")

    for thread in thread_List:
        thread.join()
    print("Thread End")
    # 사용했던 기록은 제거

    print("All Thread Complete")
    eoa_df = output_df_list[0]
    for i in range(1, 4):
        eoa_df = pd.concat([eoa_df, output_df_list[i]])

    # Save the filtered dataframe to result_1.csv
    now = datetime.datetime.now().strftime("%Y.%m.%d")
    eoa_df.to_csv(f"{base_Output_PATH}reulst_{file_Name}({now}).csv", index=False)

    print(f"Process{nProcess} Done\nOutput: reulst_{file_Name}({now}).csv")
    pp(INFURA_URL_Limit_List)
    pp(Second_INFURA_URL_Limit_List)

## pool Version
미완

In [None]:
# jupyter version
if __name__ == "__main__":
    nProcess = 2
    p = Pool(proces=nProcess)
    
        try:

               
        except Exception as e:
            print(e)

        for process in Process_List:
            process.join()

## Process Version

In [3]:
# py version
if __name__ == "__main__":
    Process_List = []
    for i in range(2, MAX_Chunk_Number,2):
        try:
            for nProcess in range(2):
                process = str(nProcess+1)
                file_name = "chunk_"+str(i+nProcess)
                p = Process(target=refine_INFURA, args=(process, file_name))
                p.start()
                Process_List.append(p)

        except Exception as e:
            print(e)

        for process in Process_List:
            process.join()