## Getting data (points) from Scroll Web_Site API in batches saving progress

### Key Points in the Code:
 - **Checkpoint File:** Before starting, the script checks for a checkpoint file (**wallets_with_points_checkpoint.csv**) to resume from where it left off.
 - **Batch Processing:** Wallet addresses are processed in batches. After processing each batch, the results are appended to the df_results DataFrame.
 - **Periodic Saving:** After processing each batch, the results are saved to the checkpoint file to ensure progress is not lost.
 - **Resume Capability:** If interrupted, the script can resume from the last processed batch by reading the checkpoint file.

This approach minimizes data loss and allows to handle large datasets by breaking the task into smaller, manageable batches

## Getting Data using parallel requests to speed up the process

In [1]:
import requests
import pandas as pd
import os
from concurrent.futures import ThreadPoolExecutor, as_completed
import time
from datetime import datetime
import multiprocessing

### Function to fetch points from AWS API

In [2]:
# Function to fetch points for a wallet
def fetch_points(wallet):
    try:
        response = requests.get(f'https://kx58j6x5me.execute-api.us-east-1.amazonaws.com/scroll/wallet-points?walletAddress={wallet}')
        if response.status_code == 200:
            data = response.json()
            if isinstance(data, list) and len(data) > 0:
                return data[0].get('points')
            elif isinstance(data, dict):
                return data.get('points')
        else:
            print(f"{datetime.now().strftime('%Y-%m-%d %H:%M:%S')}: Error: Received status code {response.status_code} for wallet {wallet}")
    except requests.exceptions.RequestException as e:
        print(f"{datetime.now().strftime('%Y-%m-%d %H:%M:%S')}: Error: Request failed for wallet {wallet}. Exception: {e}")
    return None

In [3]:
# Load wallet addresses
wallet_addresses = pd.read_csv("scrollwallets.csv")

### Create Checkpoint File to allow save data extraction progress and resume from the inrerruption point

In [4]:
# Load checkpoint file if it exists
checkpoint_file = "wallets_with_points_checkpoint.csv" # assign name to search for our chekpoint file

if os.path.exists(checkpoint_file):
    df_checkpoint = pd.read_csv(checkpoint_file)
    processed_wallets = set(df_checkpoint['Wallet'].tolist()) # transform list to SET to ensure each wallet address is unique
    start_index = len(processed_wallets) 
    df_results = df_checkpoint.copy() # Assign copy to ensure that any further operations on df_results do not affect the original checkpoint DataFrame
    print(f"{datetime.now().strftime('%Y-%m-%d %H:%M:%S')}: Resuming from checkpoint. Processed {start_index} wallets so far.")
else: # Create empty template for checkpoint file (if it does not exist)
    processed_wallets = set() # create enpty SET
    start_index = 0
    df_results = pd.DataFrame(columns=['Wallet', 'Points']) 
    print(f"{datetime.now().strftime('%Y-%m-%d %H:%M:%S')}: No checkpoint found. Starting from the beginning.")

2024-07-31 17:05:56: Resuming from checkpoint. Processed 348000 wallets so far.


### Function to process a batch of wallets

In [5]:
# Batch Size specifies the number of requests in one cycle. After processing the progress will be saved
batch_size = 1000

# num_cores = multiprocessing.cpu_count() # Gets the number of PC Cores to automaticaly set optimal streams
# max_workers = num_cores * 2  # Adjust this multiplier based on experimentation not to overload CPU

# Function to process a batch of wallets
def process_batch(batch):
    results = [] # to store points of our batch
    # Function allows to make parallel requests to speed up the process. 
    with ThreadPoolExecutor(max_workers=8) as executor: # Specify amount of streams - max_workers
        futures = {executor.submit(fetch_points, row['wallet']): row['wallet'] for index, row in batch.iterrows()} # assign tasks to executors
        for future in as_completed(futures):
            wallet = futures[future]
            try:
                points = future.result()
                results.append({'Wallet': wallet, 'Points': points})
            except Exception as e:
                print(f"{datetime.now().strftime('%Y-%m-%d %H:%M:%S')}: Exception for wallet {wallet}: {e}")
    return results

In [None]:
# Process wallet addresses in batches
for i in range(start_index, len(wallet_addresses), batch_size):
    batch = wallet_addresses.iloc[i:i + batch_size]
    batch = batch[~batch['wallet'].isin(processed_wallets)]  # Exclude already processed wallets
    batch_results = process_batch(batch)
    df_batch = pd.DataFrame(batch_results)
    df_results = pd.concat([df_results, df_batch], ignore_index=True)
    df_results['Points'] = df_results['Points'].apply(lambda x: round(x, 3) if x is not None else x)
    df_results.to_csv(checkpoint_file, index=False)
    processed_wallets.update(batch['wallet'].tolist())
    print(f"{datetime.now().strftime('%Y-%m-%d %H:%M:%S')}: Processed up to index {i + batch_size} out of {len(wallet_addresses)}")
    time.sleep(0.4)  # Adjust sleep time to avoid hitting rate limits

# Save the final results to a CSV file
df_results.to_csv("wallets_with_points.csv", index=False)
print(f"{datetime.now().strftime('%Y-%m-%d %H:%M:%S')}: All data processed and saved to wallets_with_points.csv")
