###### <h1><center> Progetto finale Laboratorio di Web Scraping </h1>
<h1><center> Anno Accademico 2023-2024 </h1>
<h2><center>  Marsili Gabriele </h2>

## Considerazioni iniziali

**-Struttura del codice :**
Per mantenere il codice chiaro e facilmente interpretabile ho deciso di strutturarlo nei seguenti moduli : 
- *Main.py* : file principale da cui avviare l'esecuzione
- *Utilities.py* : file contenente alcune strutture e funzioni utilizzate da più moduli
- *Scraping* : modulo relativo alla parte di scraping 
- *Graphic* : modulo contenente le funzioni necessarie per la creazione dei grafici 
- *Dataset_analysis* : modulo in cui ho inserito le funzioni relative all'analisi dei dati ed alle operazioni sui dataframes

**-Organizzazione del notebook :**
Il seguente notebook è creato in modo tale da poter rispettare la struttura a moduli del codice, quindi vengono prima introdotti i moduli minori (o secondari) e non dipendenti da altri moduli fino ad arrivare al main (che ovviamente necessita della definizione degli altri moduli per poter funzionare correttamente).

**-Path dei files**
I files csv dovrebbero esser inseriti nella cartella datasetCSV avente percorso : ./dataset_analysis/datasetCSV con i seguenti nomi: 
- inputs.csv
- map.csv
- outputs.csv
- transactions.csv

## Utilities

Nel seguente codice definisco un paio di dizionari (SETTINGS e LOG_LEVELS) che verranno utilizzati nei vari moduli ed alcune funzioni utili per la gestione delle date nei dataframes e per il calcolo della dimensione dei chunk (per la lettura dei files CSV).


In [5]:
import pandas as pd
import datetime

SETTINGS = {
    'MAX_THREAD_QUANTITY' : 13,
    'ELIGIUS_ANALYSIS_STEPS':7,
    'SELENIUM_HEADLESS_MODE' : True
}

LOG_LEVELS = {
    'debug' : True,
    'results' : True,
    'processing' : True,
    'all infos' : True,
    'time' : True,
    'reduce spam':True
}


def preprocess_data(df):
    # Convert timestamp to datetime if not already
    if not pd.api.types.is_datetime64_any_dtype(df['timestamp']):
        df['timestamp'] = pd.to_datetime(df['timestamp'], unit='s')
    return df

def unix_to_date(timestamp, date_type = 'datetime'):
    """
    Converte un timestamp Unix in un oggetto datetime / 
    lo converte in formato gg/mm/aaaa - hh:mm:ss se date_type != datetime.
    
    @param timestamp (int): Timestamp Unix da convertire.
    @param date_type (str) (optional) : tipo di conversione ritornata
    
    @return :
     - datetime.datetime: Oggetto datetime corrispondente al timestamp Unix.
        oppure 
     - str: Data formattata come 'gg/mm/aaaa - hh:mm:ss'.
    """
    if(date_type != 'datetime'):
        dt = datetime.datetime.fromtimestamp(timestamp)
        return format_datetime(dt)
    
    return datetime.datetime.fromtimestamp(timestamp)

def format_datetime(dt):
    """
    Formatta un oggetto datetime in formato 'gg/mm/aaaa - hh:mm:ss'.
    
    @param dt (datetime.datetime): Oggetto datetime da formattare.
    
    @return : str: Data formattata come 'gg/mm/aaaa - hh:mm:ss'.
    """
    return dt.strftime('%d/%m/%Y - %H:%M:%S')

def calculate_chunk_size(total_rows, desired_chunk_size):
    """Calcola il numero dei chunk e la relativa dimenione in base 
    al totale delle righe del file ed alla dimensione desiderata dei chunk.

    @param total_rows : totale delle righe del file
    @param desired_chunk_size : dimensione di un chunk 
    
    @return num_chunks, actual_chunk_size : il nuumero dei chunk e la relativa size
    """
    
    num_chunks = total_rows // desired_chunk_size
    if total_rows % desired_chunk_size != 0:
        num_chunks += 1
    actual_chunk_size = total_rows // num_chunks
    return num_chunks, actual_chunk_size

## Graphic module 

**Modulo relativo alla crezione dei grafici, sfrutta plot_creator.py**

In plot_creator.py ho inserito le funzioni relative alla creazione dei grafici 

In [None]:
import matplotlib.pyplot as plt
import seaborn as sns
import pandas as pd 
import networkx as nx

def plot_fees_vs_network_congestion(df):
    # Convert the 'month' column to datetime if it's not already
    if df['month'].dtype != 'datetime64[ns]':
        df['month'] = pd.to_datetime(df['month'])

    # Resample the data to 3-month intervals
    df_resampled = df.set_index('month').resample('3ME').sum().reset_index()

    fig, ax1 = plt.subplots(figsize=(10, 6))

    # Plot fees
    color = 'tab:blue'
    ax1.set_xlabel('Month')
    ax1.set_ylabel('Fees (in Satoshi)', color=color)
    ax1.plot(df_resampled['month'], df_resampled['fees'], color=color, label='Fees')
    ax1.tick_params(axis='y', labelcolor=color)
    ax1.yaxis.set_major_formatter(plt.FuncFormatter(lambda x, loc: "{:,}".format(int(x))))

    # Create a second y-axis for the network congestion
    ax2 = ax1.twinx()
    color = 'tab:red'
    ax2.set_ylabel('Network Congestion (bytes)', color=color)
    ax2.plot(df_resampled['month'], df_resampled['networkCongestion'], color=color, label='Network Congestion')
    ax2.tick_params(axis='y', labelcolor=color)
    ax2.yaxis.set_major_formatter(plt.FuncFormatter(lambda x, loc: "{:,}".format(int(x))))

    ax1.legend(loc='upper left', bbox_to_anchor=(0, 1))  
    ax2.legend(loc='upper left', bbox_to_anchor=(0, 0.9))

    # Adjust layout to make room for title
    plt.tight_layout(rect=[0, 0.1, 1, 0.9])
    fig.suptitle('Fees vs Network Congestion Over Time (3-Month Intervals)', y=0.95, fontsize=12)

    plt.show()

def plot_script_type_usage(df):
    # Convert the 'month' column to datetime if it's not already
    if df['month'].dtype != 'datetime64[ns]':
        df['month'] = pd.to_datetime(df['month'])

    first_3_years = df[df['month'] < '2012-01-01']

    # Create a column for the year
    first_3_years['year'] = first_3_years['month'].dt.year

    # Calculate the 'Other types' column
    first_3_years['Other_types'] = first_3_years.drop(columns=['month', 'year', 'P2PK', 'P2KH']).sum(axis=1)

    fig, axes = plt.subplots(3, 1, figsize=(10, 8), sharex=True)  

    script_types = ['P2PK', 'P2KH', 'Other_types']
    colors = ['tab:green', 'tab:orange', 'tab:purple']
    
    for i, script_type in enumerate(script_types):
        sns.lineplot(ax=axes[i], data=first_3_years, x='month', y=script_type, color=colors[i], label=script_type)
        axes[i].set_ylabel(f'{script_type} Count')
        axes[i].set_title(f'Usage of {script_type} Over Time')
        axes[i].legend(loc='upper left')

    
    axes[-1].set_xlabel('Month')
    plt.xticks(rotation=45)
    plt.tight_layout()
    plt.subplots_adjust(top=0.9, hspace=0.5)  
    fig.suptitle('Script Types Usage in the First 3 Years of Bitcoin', y=0.97) 
    plt.show()   
       
def plot_annual_script_type_usage(df):
    # Convert the 'month' column to datetime if it's not already
    if df['month'].dtype != 'datetime64[ns]':
         df['month'] = pd.to_datetime(df['month'])
        
    df = df[df['month'] < '2012-01-01']

    # Extract the year from the 'month' column
    df['year'] = df['month'].dt.year
    
    # Calculate the 'Other types' column
    df['Other_types'] = df.drop(columns=['month', 'year', 'P2PK', 'P2KH']).sum(axis=1)
    
    # Group by year and sum the counts of each script type
    annual_data = df.groupby('year')[['P2PK', 'P2KH', 'Other_types']].sum().reset_index()

    # Melt the data for seaborn
    annual_data_melted = pd.melt(annual_data, id_vars='year', var_name='scriptType', value_name='count')
        
    plt.figure(figsize=(14, 7))
    sns.barplot(data=annual_data_melted, x='year', y='count', hue='scriptType', palette=['tab:green', 'tab:orange', 'tab:purple'])
    plt.title('Annual Usage of Script Types')
    plt.xlabel('Year')
    plt.ylabel('Count')
    plt.legend(title='Script Type')
    plt.yscale('log')  #log scale to better visualize differences
    plt.xticks(rotation=45)
    plt.tight_layout()
    plt.show()

def plot_blocks_mined_by_top_4_miners(blocks_mined_by_T4miners):
    plt.figure(figsize=(10, 6))
    sns.barplot(x='addressId', y='blocks_mined', data=blocks_mined_by_T4miners)
    plt.title('Total Blocks Mined by the top 4 miners')
    plt.xlabel('Miner')
    plt.ylabel('Total Blocks Mined')
    plt.show()
    
def plot_total_blocks_mined(global_blocks_mined):
    plt.figure(figsize=(10, 6))
    sns.barplot(x='pool', y='blocks_mined', data=global_blocks_mined)
    plt.title('Total Blocks Mined by Each Pool')
    plt.xlabel('Mining Pool')
    plt.ylabel('Total Blocks Mined')
    plt.show()
    
def plot_bi_monthly_blocks_mined(bi_monthly_blocks_mined):
    plt.figure(figsize=(14, 8))
    sns.lineplot(x='bi_month', y='blocks_mined', hue='pool', data=bi_monthly_blocks_mined, marker='o')
    plt.title('Blocks Mined Every 2 Months by Each Pool')
    plt.xlabel('Time (Bi-Monthly Intervals)')
    plt.ylabel('Blocks Mined')
    plt.xticks(rotation=45)
    plt.show()
    
def plot_total_rewards(global_total_rewards):
    plt.figure(figsize=(10, 6))
    sns.barplot(x='pool', y='total_rewards', data=global_total_rewards)
    plt.title('Total Rewards Received by Each Pool')
    plt.xlabel('Mining Pool')
    plt.ylabel('Total Rewards (BTC)')
    plt.show()
    
def plot_bi_monthly_rewards(bi_monthly_total_rewards):
    plt.figure(figsize=(14, 8))
    sns.lineplot(x='bi_month', y='total_rewards', hue='pool', data=bi_monthly_total_rewards, marker='o')
    plt.title('Rewards Received Every 2 Months by Each Pool')
    plt.xlabel('Time (Bi-Monthly Intervals)')
    plt.ylabel('Total Rewards (BTC)')
    plt.xticks(rotation=45)
    plt.show()
    
    
def plot_Eligius_path(graph_DF):
    # creazione di un grafo diretto
    G = nx.DiGraph()

    # aggiungo nodi e archi al grafo
    for _, row in graph_DF.iterrows():
        txId = row['txId']
        outputs = row['outputs']
        for output in outputs:
            G.add_edge(txId, output)

    # disegno il grafo
    pos = nx.spring_layout(G)
    nx.draw(G, pos, with_labels=True, node_size=100, node_color='skyblue', font_size=3, font_weight='bold', arrowsize=15)
    plt.show()

## Dataset analysis module 

**Modulo relativo all'analisi dei dati ed alle operazioni sui dataframes**

In analizer.py ho scritto il codice relativo al calcolo della network congestion e delle fees per intervalli temporali di un mese sfruttando un meccanismo di threads ed operazioni vettoriali sul dataframe.
Nel modulo vi sono anche le funzioni che calcolano la quantità di script per ogni tipo (sempre su intervalli temporali di un mese) e le funzioni relative al calcolo delle statische delle pools : total rewards e numero di blocchi minati da ogni pool sia globalmente che per ogni due mesi.

In [None]:
import pandas as pd
from concurrent.futures import ThreadPoolExecutor
import time 
from utilities import SETTINGS, LOG_LEVELS

pd.set_option("mode.copy_on_write", True)
SCRIPT_SIZE_MAP = {
    1: 153,  # 'P2PK'
    2: 180,  # 'P2KH'
    3: 291   # 'P2SH'
}

SCRIPT_TYPE_MAP = {
    0 : 'Unknown',
    1: 'P2PK',   
    2: 'P2KH',   
    3: 'P2SH',   
    4 : 'RETURN',
    5 : 'EMPTY',
    6 : 'P2WPKH',
    7 : 'P2WSH',
}


MAX_THREAD_QUANTITY = SETTINGS['MAX_THREAD_QUANTITY']

def processTransactions(inputsDF, outputsDF, transactionDF):
    """Process transactions using inputs, outputs and transaction dataframe:
    •split transactions in months
    •for each month calculate the network congestion and the fees 
    
    @params : inputsDF : inputs dataframe 
    @params : outputsDF : outputs dataframe 
    @params : transactionDF : transactions dataframe 
    @return dataframe of months in which every month has network congestion and fees
    """
    startT = time.time()
    months = transactionDF.groupby(pd.Grouper(freq='ME'))
    if LOG_LEVELS['processing'] or LOG_LEVELS['all infos'] or LOG_LEVELS['debug']:
        print(f"found {len(months)} months")
        print("------------------------")
    
    def process_month(name, month):
        
        if not LOG_LEVELS['reduce spam'] and ( LOG_LEVELS['processing'] or LOG_LEVELS['all infos'] or LOG_LEVELS['debug']):
            print(f"processing month \n{name.strftime('%Y-%m')}")

        month_data = {
            'P2PK': 0,
            'P2KH': 0,
            'P2SH': 0,
            'fees': 0,
            'networkCongestion': 0,
            'month': name.strftime('%Y-%m')
        }

        # Calcola la network congestion relativa al mese 
        month_data['networkCongestion'] = calculate_network_congestion(month, inputsDF, outputsDF)

        # Calcola le fees relativa al mese 
        month_data['fees'] = month['fee'].sum()

        # Calcola il numero di script type per ogni tipo
        month_data.update(calculate_script_type_counts(month, outputsDF))

        if not LOG_LEVELS['reduce spam'] and ( LOG_LEVELS['results'] or LOG_LEVELS['all infos'] or LOG_LEVELS['debug']):    
            print(f"got month data:\n{month_data}\n")

        return month_data

    with ThreadPoolExecutor() as executor:
        results = list(executor.map(lambda x: process_month(*x), months))

    result_df = pd.DataFrame(results)
    result_df = result_df.loc[:, ~result_df.columns.str.match('None')]
    
    if LOG_LEVELS['time']:
        print(f"transactions processed in {time.time()-startT} seconds")
    
    return result_df

def calculate_network_congestion(month, inputsDF, outputsDF):
    """Calculate network congestion for single month:
    
    @params : month : single month dataframe 
    @params : inputsDF : inputs dataframe 
    @params : outputsDF : outputs dataframe 

    @return network congestion related to the month passed by argument
    """
    
    # Calcolo del numero di input per ogni transazione nel mese
    month_inputs = inputsDF[inputsDF['txId'].isin(month['txId'])]
    n_inputs_per_tx = month_inputs.groupby('txId').size()

    # Calcolo del numero di output per ogni transazione nel mese
    month_outputs = outputsDF[outputsDF['txId'].isin(month['txId'])]
    n_outputs_per_tx = month_outputs.groupby('txId').size()

    # Recupera il tipo di script per ogni transazione nel mese
    # (prende il primo poiché i tipi di script relativi agli outputs della stessa transazione dovrebbero esser tutti uguali)
    script_types_per_tx = month_outputs.groupby('txId')['scriptType'].first()

    # Calcolo della dimensione di ogni transazione
    tx_sizes = 40 * n_inputs_per_tx + 9 * n_outputs_per_tx + script_types_per_tx.map(SCRIPT_SIZE_MAP).fillna(153)

    # Somma dele dimensioni delle transazioni per ottenere la network congestion del mese
    return tx_sizes.sum()

def calculate_script_type_counts(month, outputsDF):
    """Calculate script type quantity for each script type in a month:
    
    @params : month : (single) month dataframe 
    @params : outputsDF : outputs dataframe 

    @return dictionary with script type quantity for each script type
    """
    
    # Numero di outputs per ogni script type
    script_type_counts = outputsDF[outputsDF['txId'].isin(month['txId'])].groupby('scriptType').size()
    # Mappa i tipi di script con i relativi nomi
    script_type_counts.index = script_type_counts.index.map(SCRIPT_TYPE_MAP.get)
    # Costruisci il dizionario tipo di script <--> quantità
    script_type_dict = {f'{script_type}': count for script_type, count in script_type_counts.items()}
    return script_type_dict

def calculate_pool_statistics(df):
    """Calculate the number of minted blocks and the total rewards for each pool of a dataframe. 
    
    @params : df : dataframe (of Coinbase transactions associated to a pool) 

    @return dataframe with minted blocks and dataframe with total rewards for each pool
    """
        
    grouped = df.groupby('pool')
    
    # Ottiene il numero di blocchi minati da ciascuna pool
    blocks_mined = grouped['blockId'].nunique().reset_index(name='blocks_mined')
    
    # Ottiene le reward totali ricevute da ciascuna pool
    total_rewards = grouped['amount'].sum().reset_index(name='total_rewards')
    
    return blocks_mined, total_rewards

def calculate_bi_monthly_statistics(df):
    """Calculate the number of minted blocks and the total rewards 
    for every time period of 2 months for each pool of a dataframe. 
    
    @params : df : dataframe (of Coinbase transactions associated to a pool) 

    @return dataframe with minted blocks for every two months and dataframe with total rewards for every two months for each pool
    """
    
    # Aggiungo una colonna al dataframe per ogni due mesi 
    df['bi_month'] = df['timestamp'].apply(lambda x: f"{x.year}-{(x.month-1)//2*2+1:02d}")

    # Raggruppo il dataframe per pool e periodo di due mesi
    grouped_bi_monthly = df.groupby(['pool', 'bi_month'])
    
    # Ottengo il numero di blocchi minati per intervallo di due mesi
    blocks_mined_bi_monthly = grouped_bi_monthly['blockId'].nunique().reset_index(name='blocks_mined')
    
    # Ottengo le reward totali per intervallo di due mesi
    total_rewards_bi_monthly = grouped_bi_monthly['amount'].sum().reset_index(name='total_rewards')
    
    return blocks_mined_bi_monthly, total_rewards_bi_monthly



## Scraping module 

**Modulo relativo allo scraping dei dati**

In questo modulo ho inserito tutte le funzioni relative allo scraping dei dati su WalletExplorer.

Nel dettaglio, i dati relativi alle pools sono ottenuti in due modi, sia sfruttando Selenium nella funzione get_W_addresses_Selenium sia tramite BeautifulSoup nella funzione getWalletAddresses.

All'interno del codice ho definito un dizionario per mantenere le proxies ottenute da sslproxies.org che sfrutta 'last search time' per evitare too many requests su sslproxies.org.
Il codice comprende inoltre alcune altre utili funzioni volte a migliorare quanto più possibile le requests, in particolare getRequestUtils fornisce headers e proxies da sfruttare per la request costruendoli dinamicamente e randomicamente (è inoltre possibile passarle True come argomento per scegliere di utilizzare un referrer specifico tra quelli relativi a WalletExplorer e di forzare la generazione di nuove proxies ottenute da generate_proxies -a cui verrà passato True come parametro da getRequestUtils - )

Per ottenere i dati da WalletExplorer ho deciso inizialmente di lavorare con Selenium poiché ritenevo ardua la sfida di non avere blocchi nel fare oltre 70 requests consecutive per ottenere gli address della pool BTCGuild.Per tal motivo ho creato get_W_addresses_Selenium che ottiene il numero di pagine totali per la pool considerata e, sfruttando i threads, ottiene gli indirizzi di ogni pagina, distribuendo il lavoro.
Con questo approccio ero soddisfatto per le pools di Eligius e BitMinter, ma non per BTCGuild, per cui il codice impiegava oltre 5 minuti ad ottenere gli addresses.
Quindi ho deciso di implementare le funzioni per supportare le requests e ho migliorato getWalletAddresses, andando inoltre a creare getWalletAddress_multiplePages (corrispettiva di get_W_addresses_Selenium).In essa inizialmente avevo introdotto un meccanismo di threads sempre al fine di migliorare il più possibile le prestazioni, ma la frequenza delle requests era troppo elevata e, nonostante gli accorgimenti, ottenevo blocchi (429 e too many requests).
Ho quindi optato per la rimozione dei threads in getWalletAddress_multiplePages e, con ulteriori migliorie relative al time.sleep in getWalletAddresses, son riuscito ad ottenere una media di poco più di due minuti senza ricevere alcuna limitazione nelle requests.


Infine, per quanto riguarda la taint analysis di Eligius, ho associato un nodo del grafo ad una transazione, dove :
- l'id della transazione identifica anche il nodo
- vi è un arco tra due transazioni se l'output di una è input dell'altra

La funzione che ottiene i nodi :
- Parte dalla transazione della Coinbase Eligius e procede per k step.
- Allo step 1 ottiene gli input e gli output della prima transazione e salva gli output per lo step successivo.
- Allo step i-esimo (i >= 2) considera gli ID delle transazioni degli output dello step precedente (i-1) come nodi e per ognuno di essi trova input e output. Inoltre, salva come nuovi output per l'iterazione successiva (i+1) tutti gli output dell'iterazione corrente.

  

In [2]:
import requests
import time
from bs4 import BeautifulSoup as bs
from selenium import webdriver
from selenium.webdriver.chrome.options import Options
from selenium.webdriver.common.by import By
import re
import os
import pandas as pd
import sys
from concurrent.futures import ThreadPoolExecutor, as_completed
from fake_useragent import UserAgent
from urllib.request import Request, urlopen
import random 

# Aggiunge la directory contenente utilities.py al percorso di ricerca dei moduli
sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), '..')))
from utilities import LOG_LEVELS, SETTINGS

HEADLESS_MODE = SETTINGS['SELENIUM_HEADLESS_MODE']
BASE_LINK = "https://www.walletexplorer.com"
MAX_THREAD_QUANTITY = SETTINGS['MAX_THREAD_QUANTITY']

ELIGIUS_COINBASE_TX = 'c82c10925cc3890f1299'

sslproxies_infos = {
    'last search time': None,
    'sslproxies' : []
}

referrer_list = [
    "https://www.google.com",
    "https://www.bing.com",
    "https://search.yahoo.com",
    "https://www.duckduckgo.com",
    "https://www.baidu.com",
    "https://www.yandex.ru",
    "https://www.aol.com",
    "https://www.wikipedia.org",
    "https://www.facebook.com",
    "https://www.twitter.com",
    "https://www.linkedin.com",
    "https://www.instagram.com",
    "https://www.reddit.com",
    "https://www.pinterest.com",
    "https://www.quora.com",
    "https://www.medium.com",
    "https://news.ycombinator.com",
    "https://www.bbc.com",
    "https://www.cnn.com",
    "https://www.nytimes.com",
    "https://www.theguardian.com",
    "https://www.forbes.com",
    "https://www.bloomberg.com"
]

specific_referrer_list = [ # referrer form walletexplorer pages 
    'https://www.walletexplorer.com/wallet/7644ed877fa28a88?from_address=1G43MvhzCqRz1ctsQUmgU4LgLuSVdfU557',
    'https://www.walletexplorer.com',
    'https://www.walletexplorer.com/?q=',
    'https://www.walletexplorer.com/wallet/DeepBit.net',
    'https://www.walletexplorer.com/wallet/DeepBit.net/addresses',
    'https://www.walletexplorer.com/info',
    'https://www.walletexplorer.com/privacy',
    'https://www.walletexplorer.com/wallet/BTCCPool',
]

class RequestError(Exception):
    def __init__(self, message, erroCode):
        super().__init__(message)
        self.erroCode = erroCode

    def __str__(self):
        return f"{self.erroCode}: {self.args[0]}"

def generate_proxies(forceNew = False): 
    """generate and return (free) proxies by sslproxies.org if 
    the last request made to sslproxies is at least 3 seconds ago, 
    otherwise it returns the last generated proxies
    
    @param forceNew : if True the function will request new proxies to sslproxies.org
    @return : proxies list with ip & port for every proxy
    """
    proxies = []
    lastReqTime = sslproxies_infos['last search time']
    
    if forceNew or (lastReqTime is None or time.time() - lastReqTime > 3):
        user_agent = getRandomUserAgent()
        proxies_req = Request('https://www.sslproxies.org/')
        proxies_req.add_header('User-Agent', user_agent)
        proxies_doc = urlopen(proxies_req).read().decode('utf8')
        soup = bs(proxies_doc, 'html.parser')
        proxies_table = soup.find('table', class_='table table-striped table-bordered')        
        
        for row in proxies_table.tbody.find_all('tr'):
            td = row.find_all('td')
            proxies.append({
            'ip':   td[0].string,
            'port': td[1].string})

        sslproxies_infos['sslproxies'] = proxies
        sslproxies_infos['last search time'] = time.time()
    else:
        proxies = sslproxies_infos['sslproxies']
  
    return proxies

def getRandomUserAgent():
    """generate and return a random user agent
    @no params
    @return : random user agent string
    """
    ua = UserAgent()     
    return ua.random

def getRequestUtils(specific = False):
    """Returns utils for a request (headers, proxies)
    @param specific : if True the function will use specific proxies and referrer 
    @return : headers, proxies to use for a request / request session
    """
    
    # Liste di opzioni dinamiche per gli headers
    accept_language_options = [
        "it-IT,it;q=0.9,en-US;q=0.8,en;q=0.7",
        "en-US,en;q=0.9,it-IT;q=0.8,it;q=0.7",
        "fr-FR,fr;q=0.9,en-US;q=0.8,en;q=0.7"
    ]

    sec_ch_ua_options = [
        "\"Chromium\";v=\"122\", \"Not(A:Brand\";v=\"24\", \"Google Chrome\";v=\"122\"",
        "\"Opera\";v=\"80\", \"Chromium\";v=\"94\", \"Not(A:Brand\";v=\"24\"",
        "\"Microsoft Edge\";v=\"90\", \"Chromium\";v=\"90\", \"Not(A:Brand\";v=\"24\""
    ]

    sec_ch_ua_platform_options = [
        "\"macOS\"",
        "\"Windows\"",
        "\"Linux\""
    ]

    user_A = getRandomUserAgent()
    if specific : 
        referrer = random.choice(specific_referrer_list)        
    else:
        referrer = random.choice(referrer_list)
    
    proxies = generate_proxies(specific)
    

    headers = {
        'User-Agent': user_A,
        "accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.7",
        "cache-control": "max-age=0",
        "accept-language": random.choice(accept_language_options),
        "sec-ch-ua": random.choice(sec_ch_ua_options),
        "sec-ch-ua-platform": random.choice(sec_ch_ua_platform_options),
        "sec-fetch-dest": "document",
        "sec-fetch-mode": "navigate",
        "sec-fetch-site": "same-origin",
        "sec-fetch-user": "?1",
        "upgrade-insecure-requests": "1",
        'referer': referrer,
        "referrerPolicy": "strict-origin-when-cross-origin",
        "credentials": "omit"

    }
    
    return headers, proxies
              
def getWalletAddresses(url):  
    """Get wallet addresses associated to a pool by the WalletExplorer's url. 
    
    @params : url : url of the WalletExplorer address page of a pool.

    @return : list of addresses associated to the pool
    """
    
    startT = time.time()
    
    if LOG_LEVELS['time']:
        print("get wallet addresses started")
        
    if LOG_LEVELS['all infos'] or LOG_LEVELS['debug']:
        print(f"going to get wallet addresses with url {url}") 
         
    headers, proxies = getRequestUtils()
    
    addresses = [] 
    
  
    time.sleep(0.75)
    timeBeforeRequest = time.time()
    response = requests.get(url,headers=headers, proxies=random.choice(proxies)) 
    if LOG_LEVELS['time']:
        print(f"request time = {time.time()-timeBeforeRequest} seconds")
    attempt = 0
    while attempt < 20 and (response.text.startswith("Too") or response.status_code < 200 or response.status_code > 299):
        attempt+= 1
        if LOG_LEVELS['debug']:
            print(f"Unsuccessful response in attempt {attempt}/15 - response status code = {response.status_code}")
        
        new_headers, new_proxies = getRequestUtils(specific=True)
        chosenProxy = random.choice(new_proxies)
        time.sleep(5)
        response = requests.get(url, proxies=chosenProxy, headers=new_headers)        
        
        
        if attempt == 20:
            if LOG_LEVELS['debug']:
                print(f'unsuccessful response:\n{response}\nTxt:\n{response.text}')
            raise RequestError(f'Error in get wallet address request\nresponse status = {response.status_code}\nsession headers = {new_headers}\nchosen proxy = {chosenProxy}', response.status_code)
                


    html_content = response.text
    soup = bs(html_content,'html.parser')
    
    table = soup.find('table') 
    if table:
        trs = table.findAll('tr')
        
        if not LOG_LEVELS['reduce spam'] and ( LOG_LEVELS['all infos'] or LOG_LEVELS['debug']):
            print(f"going to process {len(trs)} table rows")  
    
        i = 0
        for tr in trs:
            if not LOG_LEVELS['reduce spam'] and (LOG_LEVELS['all infos'] or LOG_LEVELS['debug'] or LOG_LEVELS['processing']):
                print(f"processing row {i+1}/{len(trs)}") 
            i+=1
            td = tr.find('td')  # Find the first td element
            
            
            if td and td.a:  # Check if td and td.a exist
                href = td.a['href']
                address = href.split("/")[-1]
                addresses.append(address)  # Use append instead of push
                if not LOG_LEVELS['reduce spam'] and (LOG_LEVELS['all infos'] or LOG_LEVELS['debug'] or LOG_LEVELS['processing']):
                    print(f"-> found  wallet : {address}") 
      
    if LOG_LEVELS['all infos'] or LOG_LEVELS['debug'] or LOG_LEVELS['results']:
        print(f"Found {len(addresses)} wallets") 
    
    if LOG_LEVELS['time']:
        print(f"get wallet addresses ended in {time.time()-startT} seconds")
        
    return addresses

def getWalletAddress_multiplePages(url):
    """Finds and returns wallet addresses for a mininng pool also if it has addresses in more than one page
    @param url : url of the first page of addresses of a pool
    @return : all wallet addresses found for the pool    
    """
    
      
    startT = time.time()
    if LOG_LEVELS['time']:
        print("get wallet addresses (with Selenium) started")
    driver = setup_selenium_driver()
    num_pages = get_number_of_pages(driver, url)
    driver.quit()
    
    addresses = []
    
    for page_number in range(num_pages+1):
        if LOG_LEVELS['debug'] or LOG_LEVELS['all infos'] or LOG_LEVELS['processing']:
            print(f"processing page {page_number}")
            

        if page_number != 1:
            page_url = f"{url}?page={page_number}"
        else:
            page_url = url
                    
        curent_addresses = getWalletAddresses(page_url)
        addresses += curent_addresses
                
   
            
    if LOG_LEVELS['debug'] or LOG_LEVELS['all infos'] or LOG_LEVELS['results']:
        print(f"Processed {num_pages} pages and found {len(addresses)} addresses")
    
    if LOG_LEVELS['time']:
        print(f"get wallet addresses (with multiple pages) ended in {time.time()-startT} seconds")
    return addresses
  
def setup_selenium_driver():
    """Setup the Selenium driver and return it.
    
    @params : no params

    @return : Selenium web driver (webdriver.Chrome)
    """
    options = Options()
    if HEADLESS_MODE:
        options.add_argument('--headless=new')
        if LOG_LEVELS['debug'] or LOG_LEVELS['all infos']:
            print("Using Selenium in headless mode...")
    driver = webdriver.Chrome(options=options)
    return driver

def get_number_of_pages(driver, url):
    """Finds and returns the number of pages associated to a pool.
    
    @params : driver : Selenium web driver
    @params : url : url of the first addresses page of a pool

    @return : quantity of addresses pages of the pool
    """
    
    driver.get(url)
    time.sleep(2)
    
    paging_info = driver.find_elements(By.XPATH, '//*[@id="main"]/div[1]')[0]
    page_text = paging_info.text
    
    match = re.search(r'Page 1 / (\d+)', page_text)
    if match:
        num_pages = int(match.group(1))
        if LOG_LEVELS['debug'] or LOG_LEVELS['all infos'] or LOG_LEVELS['processing']:
            print(f"Found {num_pages} pages to process")
    else:
        num_pages = 1  # Default to 1 if the number of pages cannot be found
    
    return num_pages

def get_addresses_from_page(driver, url, page_number):
    """Finds and returns the addresses associated to a pool.
    
    @params : driver : Selenium web driver
    @params : url : url of the page of the page from which to obtain the addresses

    @return : addresses found in the page
    """
    
    addresses = []
    
    try:
        if page_number != 1:
            new_url = f"{url}?page={page_number}"
            driver.get(new_url)
            time.sleep(0.2)
        else:
            driver.get(url)
        
        if LOG_LEVELS['debug'] or LOG_LEVELS['all infos'] or LOG_LEVELS['processing']:
            print(f"processing page {page_number}")
            
        table = driver.find_element(By.TAG_NAME, 'table')
        trs = table.find_elements(By.TAG_NAME, 'tr')
        
        for i, tr in enumerate(trs, start=1):
            td_list = tr.find_elements(By.TAG_NAME, 'td')
            if len(td_list) > 0:
                td = td_list[0]
                try:
                    a_tag = td.find_element(By.TAG_NAME, 'a')
                    if a_tag:
                        href = a_tag.get_attribute('href')
                        address = href.split("/")[-1]
                        addresses.append(address)
                        if not LOG_LEVELS['reduce spam'] and (LOG_LEVELS['debug'] or LOG_LEVELS['all infos'] or LOG_LEVELS['processing']):
                            print(f"-> found address {address}")

                except Exception as e:
                    if LOG_LEVELS['debug'] or LOG_LEVELS['all infos'] or LOG_LEVELS['processing']:
                        print(f"address {i} is not a standard address -> skipped")
                        
                    if LOG_LEVELS['debug']:
                        print(f"error:\n{e}")

                    continue
    except Exception as e:
        if LOG_LEVELS['debug']:
            print(f"error in get address from page:\n{e}")
    
    finally:
        return addresses

def get_W_addresses_Selenium(url):
    """Finds and returns all the wallet addresses associated to a pool.
    
    @params : url : url of the first page of addresses of a pool

    @return : all wallet addresses found for the pool
    """
    
    startT = time.time()
    if LOG_LEVELS['time']:
        print("get wallet addresses (with Selenium) started")
    driver = setup_selenium_driver()
    num_pages = get_number_of_pages(driver, url)
    driver.quit()
    
    # Imposta MAX_THREAD_QUANTITY a num_pages se è maggiore
    max_t_quantity = min(MAX_THREAD_QUANTITY, num_pages)
    addresses = []
    
    def process_pages(start_page, end_page): #processa le pagine da start_page a end_page
        local_driver = setup_selenium_driver()
        local_addresses = []
        for page in range(start_page, end_page + 1):
            curent_addresses = get_addresses_from_page(local_driver, url, page)
            local_addresses += curent_addresses
        
        local_driver.quit()
        return local_addresses
    
    with ThreadPoolExecutor(max_workers=max_t_quantity) as executor:
        future_to_page = {executor.submit(process_pages, i, min(i + num_pages // max_t_quantity, num_pages)): i for i in range(1, num_pages + 1, num_pages // max_t_quantity)}
        for future in as_completed(future_to_page):
            results = future.result()
            addresses += results
            
    if LOG_LEVELS['debug'] or LOG_LEVELS['all infos'] or LOG_LEVELS['results']:
        print(f"Processed {num_pages} pages and found {len(addresses)} addresses")
    
    if LOG_LEVELS['time']:
        print(f"get wallet addresses (with Selenium) ended in {time.time()-startT} seconds")
    return addresses
       
def getPools(): 
    """Finds and returns all the addresses associated with each of the 4 mining pools considered
    
    @params : no params

    @return : dataframe of transactions associated to a pool
    """

    startT = time.time()
    if LOG_LEVELS['time']:
        print("Get pools started")
            
    eligiusAddressesLink = f"{BASE_LINK}/wallet/Eligius.st/addresses"
    deepBitAddressesLink = BASE_LINK+'/wallet/DeepBit.net'+'/addresses'    
    bitMinterAddressesLink = BASE_LINK+'/wallet/BitMinter.com'+'/addresses'    
    BTCGuildAddressesLink = BASE_LINK + '/wallet/BTCGuild.com'+ '/addresses'
    
    
    BTCGuild_WalletAddresses = getWalletAddress_multiplePages(BTCGuildAddressesLink)
    
    eligius_WalletAddresses = get_W_addresses_Selenium(eligiusAddressesLink)
    
    deepBit_WalletAddresses = getWalletAddresses(deepBitAddressesLink)
    
    bitMinter_WalletAddresses = get_W_addresses_Selenium(bitMinterAddressesLink)
    
    
    
    pools = {
        'Eligius' : eligius_WalletAddresses,
        'DeepBit' : deepBit_WalletAddresses,
        'BitMinter' : bitMinter_WalletAddresses,
        'BTCGuild' : BTCGuild_WalletAddresses,
    }
    
    # Creo una lista di tuple (address, pool) per poi poterla convertire in un dataframe 
    pool_data = [(address, pool) for pool, addresses in pools.items() for address in addresses]

    # Converto la lista di tuple in un DataFrame con colonne txHash e pool
    df_pools = pd.DataFrame(pool_data, columns=['txHash', 'pool'])
    
    if LOG_LEVELS['time']:
        print(f"get pools ended in {time.time()-startT} seconds ")
    
    return df_pools
    

def getTxAsNode(txId):
    """Searchs the single transaction by it's hash on WalletExplorer, 
    finds transaction's inputs and outputs and return the transaction as a node
    
    @params : txId : transaction hash 

    @return : the node related to the transaction as a dictionary with txId, inputs and outputs keys
    """

    tx_node = {
        'txId': txId,
        'inputs': [],
        'outputs': [],        
    }
    
    try:
        driver = setup_selenium_driver()
        if(txId == ELIGIUS_COINBASE_TX):
            driver.get(BASE_LINK)
            time.sleep(0.5)        
            
            inputSpace = driver.find_element(By.XPATH, '/html/body/div[2]/form/p/label/input')
            submitButton = driver.find_element(By.XPATH, '/html/body/div[2]/form/p/input')
            
            inputSpace.send_keys(txId)    
            submitButton.click()
        else:
            link = f'{BASE_LINK}/txid/{txId}'
            
            driver.get(link)
            time.sleep(0.5)
            
        
        infoTable = driver.find_element(By.CLASS_NAME, 'info')        
        infoTableBody = infoTable.find_element(By.TAG_NAME,'tbody')
        infoTableFirstTr = infoTableBody.find_elements(By.TAG_NAME,'tr')[0]
        tx_node['txId'] = infoTableFirstTr.find_element(By.TAG_NAME,'td').text
        
        txTable = driver.find_element(By.XPATH, '/html/body/div[2]/table[2]')
        txTableBody = txTable.find_element(By.TAG_NAME,'tbody')
        txTableSecondTr = txTableBody.find_element(By.XPATH,'/html/body/div[2]/table[2]/tbody/tr[2]')
        
        raw_inputs = txTableSecondTr.find_element(By.XPATH, '/html/body/div[2]/table[2]/tbody/tr[2]/td[1]')
        raw_outputs = txTableSecondTr.find_element(By.XPATH, '/html/body/div[2]/table[2]/tbody/tr[2]/td[2]')    
        raw_inputs_trs = raw_inputs.find_element(By.TAG_NAME,'tbody').find_elements(By.TAG_NAME,'tr')
        raw_outputs_trs = raw_outputs.find_element(By.TAG_NAME,'tbody').find_elements(By.TAG_NAME,'tr')
        
        inputs = []
        for tr in raw_inputs_trs:
            if txId == ELIGIUS_COINBASE_TX:
                firstTd = tr.find_elements(By.TAG_NAME,'td')[0]
                inputTxt = firstTd.text
                inputs.append(inputTxt)
                
            else:
                firstTd = tr.find_element(By.CLASS_NAME,'small')
                a_tag = firstTd.find_element(By.TAG_NAME, 'a')
                href = a_tag.get_attribute('href')        
                
                if 'txid' in href:
                    txid = href.split('/txid/')[1]
                    inputs.append(txid)
                else:
                    inputTxt = a_tag.text
                    inputs.append(inputTxt)
            
        tx_node['inputs'] = inputs
        
        outputs = []
        for tr in raw_outputs_trs:
            firstTd = tr.find_element(By.CLASS_NAME,'small')
            try:
                a_tag = firstTd.find_element(By.TAG_NAME, 'a')
                href = a_tag.get_attribute('href')        
                
                if 'txid' in href:
                    txid = href.split('/txid/')[1]
                    outputs.append(txid)
                else:
                    outputTxt = a_tag.text
                    outputs.append(outputTxt)

            except Exception as e :
                if 'unspent' in firstTd.text and (LOG_LEVELS['debug'] or LOG_LEVELS['all infos']):
                    print("unspent tx -> skip it")                
                elif LOG_LEVELS['debug']:
                    print(f"error = {e}")
        
        tx_node['outputs'] = outputs
        
        if not LOG_LEVELS['reduce spam'] and (LOG_LEVELS['debug'] or LOG_LEVELS['all infos'] or LOG_LEVELS['processing']):
            print(f"-> found node:\n {tx_node}")
    except Exception as e:
        if LOG_LEVELS['debug']:
            print(f"error in get tx as node:\n{e}")
            
    finally:            
        return tx_node

def getEligius_taint_analysis():
    """carries out the taint analysis on the Eligius pool for k steps, obtaining the graph of the path of the Bitcoins
    
    @params : no params

    @return : list of nodes of the graph related of Eligius pool taint analysis
    """

    startT = time.time()
    if LOG_LEVELS['time']:
        print("Eligius taint analysis miners started")
    
    steps = SETTINGS['ELIGIUS_ANALYSIS_STEPS']
    if steps < 0:
        raise ValueError('ELIGIUS_ANALYSIS_STEPS must be greater than 0')

    nodes = []
    last_outputs = []
    for step in range(steps):
        startLen = len(nodes)
        currentOutputs = []
        if step == 0:
            eligiusCoinbaseNode = getTxAsNode(ELIGIUS_COINBASE_TX)
            nodes.append(eligiusCoinbaseNode)        
            currentOutputs += eligiusCoinbaseNode['outputs']
        else:  
            if (not LOG_LEVELS['reduce spam'] or LOG_LEVELS['results']) and (LOG_LEVELS['debug'] or LOG_LEVELS['all infos'] or LOG_LEVELS['processing']):
                print(f"going to proceed {len(last_outputs)} transactions")
                  
            max_t_quantity = min(MAX_THREAD_QUANTITY, len(last_outputs))
            if max_t_quantity > 7: #reduce the max threads quantity to 7 to do not overload the use of resources
                max_t_quantity = 7
            with ThreadPoolExecutor(max_workers=max_t_quantity) as executor:
                future_to_node = {executor.submit(getTxAsNode, id): id for id in last_outputs}
                for future in as_completed(future_to_node):
                    node = future.result()
                    nodes.append(node)
                    currentOutputs += node['outputs']
            
            
                
        last_outputs = currentOutputs
        
        if (not LOG_LEVELS['reduce spam'] or LOG_LEVELS['results']) and (LOG_LEVELS['debug'] or LOG_LEVELS['all infos'] or LOG_LEVELS['processing']):
            print(f"-> found {len(nodes)-startLen} nodes at step {step}/{steps}")
            
    if LOG_LEVELS['debug'] or LOG_LEVELS['all infos'] or LOG_LEVELS['results']:
        print(f"Found {len(nodes)} nodes in {steps} steps")            
    
    if LOG_LEVELS['time']:
        print(f"Eligius taint analysis miners ended in {time.time()-startT} seconds")
     
    return nodes

               
            
if __name__ == "__main__": #for module tests 
    os.system("cls" if os.name == "nt" else "clear")  # clear console
    
    pools = getPools()
    print(f"\npools = {pools}")
    
    nodesEligius_T_analisys = getEligius_taint_analysis()
    print(f"nodesEligius_T_analisys:\n{nodesEligius_T_analisys}")
    

ValueError: greenlet.greenlet size changed, may indicate binary incompatibility. Expected 152 from C header, got 40 from PyObject

## Main 

**Global variables and take datas from CSV files**

In questa prima parte di codice mi limito ad importare le librerie necessarie, a definire alcune variabili globali utili e le funzioni necessarie per leggere correttamente i files CSV.
A riguardo ho deciso di implementare un semplice meccanismo di threads (sfruttato dalla funzione takeCSV_data) al fine di velocizzare ulteriormente le operazioni di lettura dei files.
Ho anche differenziato la lettura di ogni file con una relativa funzione per poter sfruttare uno schema per i tipi di dati (in modo da ridurre quanto più possibile le dimensioni dei dati letti), per utilizzare unicamente le colonne necessarie e per leggere ogni file in chunk (la cui dimensione viene calcolata da calculate_chunk_size).
Con questi accorgimenti son riuscito a ridurre considerevolmente il tempo di lettura dei files fino ad arrivare a circa un minuto per i files transactions, inputs e outputs (assieme) e poco più di un minuto per il file map (che richiede maggior tempo dovendo leggere gli hash).

**Dataframe analisi, network congestion, fees e script types**

Una volta ottenuti i dataframes relativi ai dati nei files csv inizialmente necessari inizia l'elaborazione delle transazioni per ottenere la network congeston e le fees per intervalli temporali di un mese e per ottenere la quantità di script type sempre per ogni mese.
Per far ciò vengono considerate le transazioni non Coinbase che vengono processate sfruttando il modulo dataset analysis ottenendo un dataframe di mesi utilizzabile per la creazione dei grafici.

**Scraping e mining pool analisi**

Successivamente vengono considerate le transazioni Coinbase, viene creato il dataframe relativo a map.csv, vengono ottenuti gli indirizzi associati alle mining pools tramite il modulo di scraping, vengono filtrate le transazioni per ottenere quelle associate alle mining pools e quelle non associate.
In seguito vengono trovati i top quattro miners raggruppando per txHash e contando le transazioni per ogni miner, ordinando per ordine decrescente di blocchi minati e prendendo i primi 4 (gli altri vengono inseriti in un dataframe a parte).
Vengono poi calcolate le statistiche richieste : total rewards e numero di blocchi minati per le pools sia globalmente che per intervalli temporali di due mesi e tali statistiche vengono utilizzate per la creazione dei relativi grafici.
Infine viene effettuata la taint analisi su Eligius sfruttando il modulo di scraping, creando un dataframe relativo al grafo e producendo il grafico che mostra il percorso dei bitcoin.

In [None]:
import pandas as pd
from concurrent.futures import ThreadPoolExecutor
import os
import time
from graphic import plot_creator
from utilities import calculate_chunk_size
from utilities import unix_to_date
from dataset_analysis import analizer
from scraping import scraper
from utilities import LOG_LEVELS, SETTINGS

pd.set_option("mode.copy_on_write", True)

#csv file paths : 
CURRENT_PATH = os.path.dirname(__file__)
DATASET_ANALYSIS_PATH = os.path.join(CURRENT_PATH, "dataset_analysis")
DATASET_PATH = os.path.join(DATASET_ANALYSIS_PATH, "datasetCSV")

INPUTS_CSV_PATH = os.path.join(DATASET_PATH, "inputs.csv")
MAP_CSV_PATH = os.path.join(DATASET_PATH, "map.csv")
OUTPUTS_CSV_PATH = os.path.join(DATASET_PATH, "outputs.csv")
TRANSACTIONS_CSV_PATH = os.path.join(DATASET_PATH, "transactions.csv")

MAX_THREAD_QUANTITY = SETTINGS['MAX_THREAD_QUANTITY']
CHUNK_SIZE = 10000


inputs_columns = ["txId", "prevTxId", "prevTxpos"]
map_columns = ["hash", "addressId"]
outputs_columns = ["txId", "position", "addressId", "amount", "scripttype"]
transactions_columns = ["timestamp", "blockId", "txId", "isCoinbase", "fee"]



# --- read datas by csv files (for calculation of network congestion and script type data) :
 
def read_csv_chunk(schema, columns, usecols, filePath, chunkSize = CHUNK_SIZE, parseDate = False):                    
    """Read a CSV by chunks and returns chunks
    @param schema : schema of data types (to reduce dimension)
    @param columns : column'n names 
    @param usecols : columns indexes of csv file
    @param filePath : path of the CSV file
    @param [optional] chunkSize : size of 1 chunk (default = CHUNK_SIZE)
    @param [optional] parseDate : boolean to parse timestamp dates (default = False)
    @return chunks
    """
    chuncks = pd.read_csv(filePath, usecols=usecols, dtype=schema, names=columns, parse_dates=parseDate, chunksize= chunkSize)           
    return chuncks
   
def readInputs():
    """Read inputs csv by chunk 
    (calculating the chunk size using the number of rows in csv file)
    @no params
    @return inputs dataframe
    """
    startT = time.time()
    print('\nStarted reading inputs csv')
    schema = {        
        "txId": "int32",                 
    }
    usecols = [0]
    rowsInFile = 21378771
    _ , chunk_size = calculate_chunk_size(rowsInFile, 200000)
    columns = ["txId"]
    chunks = read_csv_chunk(schema,columns,usecols,INPUTS_CSV_PATH,chunk_size)
    df = pd.concat(chunks)
    df = df.drop_duplicates(subset=['txId'])
    endT = time.time()
    diff = endT - startT
    if LOG_LEVELS['time']:
        print(f"\nInputs csv read in {diff} seconds")
    return df

def readOutputs():
    """Read outputs csv by chunk 
    (calculating the chunk size using the number of rows in csv file)
    @no params
    @return outputs dataframe
    """
    startT = time.time()
    print('\nStarted reading outputs csv ')
    schema = {        
        "txId": "int32",                 
        "scriptType": "int8",                 
        "amount": "int64",                 
        "addressId": "int32",                 
    }
    usecols = [0,2,3,4]
    rowsInFile = 24613799
    _ , chunk_size = calculate_chunk_size(rowsInFile, 210000)
    columns = ["txId", "addressId","amount", "scriptType"]
    chunks = read_csv_chunk(schema,columns,usecols,OUTPUTS_CSV_PATH,chunk_size)
    df = pd.concat(chunks)
    df = df.drop_duplicates(subset=['txId'])
    endT = time.time()
    diff = endT - startT
    if LOG_LEVELS['time']:
        print(f"\nOutputs csv read in {diff} seconds")
    return df

def readTransaction():
    """Read transactions csv by chunk 
    (calculating the chunk size using the number of rows in csv file)
    @no params
    @return outputs dataframe
    """   
    startT = time.time()
    print('\nStarted reading transactions csv')
    schema = {
        "timestamp": "int32",
        "blockId": "int32",
        "txId": "int32",
        "isCoinbase": "int8",
        "fee": "int32",                
    }
    usecols = [0, 1, 2, 3, 4]
    
    rowsInFile = 10572829
    _ , chunk_size = calculate_chunk_size(rowsInFile, 200000)
    columns = ["timestamp", 'blockId' ,"txId", "isCoinbase", "fee"]    
    chunks = read_csv_chunk(schema,columns,usecols,TRANSACTIONS_CSV_PATH,chunk_size)
    df = pd.concat(chunks)    
    df = df.drop_duplicates(subset=['txId'])    
   
    df['timestamp'] = df['timestamp'].apply(unix_to_date)
    df['timestamp'] = pd.to_datetime(df['timestamp'], unit='s')
    
    endT = time.time()
    diff = endT - startT
    if LOG_LEVELS['time']:
        print(f"\nTransaction csv read in {diff} seconds")
    return df 

def takeCSV_data():
    """Take CSV data from inputs, outputs, transactions csv and convert them to dataframes.
    Use threads to improve performances.
    
    @no params 
    @return inputs, outputs, transactions dataframes
    """
    with ThreadPoolExecutor(max_workers=MAX_THREAD_QUANTITY) as executor:
        inputs_future = executor.submit(readInputs)
        inputs = inputs_future.result()
        
    with ThreadPoolExecutor(max_workers=MAX_THREAD_QUANTITY) as executor:
        outputs_future = executor.submit(readOutputs)
        outputs = outputs_future.result()        
        
    with ThreadPoolExecutor(max_workers=MAX_THREAD_QUANTITY*3) as executor:
        tx_future = executor.submit(readTransaction)        
        tx = tx_future.result()       
            
    return inputs, outputs, tx

# --- read datas by csv files (for scraping) :

def readMap():
    """Read map csv by chunk 
    (calculating the chunk size using the number of rows in csv file)
    @no params
    @return map dataframe
    """ 
    startT = time.time()
    print('\nStarted reading map csv')
    schema = {        
        "txHash":'category',
        "addressId": "int32"              
    }
    usecols = [0,1]
    rowsInFile = 8708821
    _ , chunk_size = calculate_chunk_size(rowsInFile, 200000)
    columns = ["txHash","addressId"]
    chunks = read_csv_chunk(schema,columns,usecols,MAP_CSV_PATH,chunk_size)
    df = pd.concat(chunks)
    endT = time.time()
    diff = endT - startT
    if LOG_LEVELS['time']:
        print(f"\nMap csv read in {diff} seconds")
    return df
        
def takeMapCSV_data():  
    """Take map CSV data from map csv and convert them to dataframes.
    Use threads to improve performances.
    
    @no params 
    @return map dataframes
    """ 
    with ThreadPoolExecutor(max_workers=MAX_THREAD_QUANTITY*3) as executor:        
        map_future = executor.submit(readMap)                        
        mapDF = map_future.result()       
                    
    return mapDF

# --- main : 

def main():
    main_execution_start_time = time.time()
    
    # --- read input, output, transactions csv and create DF
    input_dataframe, outputs_dataframe, transaction_dataframe = takeCSV_data() 
    line = "------------------------"
        
    if LOG_LEVELS['time']:
        print(f"\nTime for read csv = {time.time()-main_execution_start_time} seconds")
    
    if LOG_LEVELS['debug'] or LOG_LEVELS['all infos']:
        print(f"\n{line}Dataframes:\n\ninputs for congestion:\n{input_dataframe}\n{line}\noutputs for congestion:\n{outputs_dataframe}\n{line}\ntransaction:\n{transaction_dataframe}\n{line}")
    
    # --- obtain datas of network congestion & script type per month 
    notCoinbaseTX = transaction_dataframe.loc[transaction_dataframe['isCoinbase'] != 1] 
    notCoinbaseTX = notCoinbaseTX.drop('blockId', axis=1)
    notCoinbaseTX.set_index('timestamp',inplace=True)
    outputsToCalculateCongestion = outputs_dataframe.drop('addressId', axis=1)
    month_data_DF = analizer.processTransactions(input_dataframe, outputsToCalculateCongestion, notCoinbaseTX)
    
    # --- create plots for stats (network congestion & fees | script type)
    plot_creator.plot_fees_vs_network_congestion(month_data_DF)
    plot_creator.plot_script_type_usage(month_data_DF)
    plot_creator.plot_annual_script_type_usage(month_data_DF)
    
    # --- scraping & mining pool analysis :
    coinbaseTX = transaction_dataframe.loc[transaction_dataframe['isCoinbase'] == 1] #filter coinbase tx
    coinbaseTX = coinbaseTX.drop('fee', axis=1) # delete fee column
    coinbaseTX = coinbaseTX.drop_duplicates(subset=['txId']) #delete any duplicates
    if LOG_LEVELS['debug'] or LOG_LEVELS['all infos']:
        print(f"coinbaseTX:{coinbaseTX}")
    
    outputsToMiningPoolsAnalisis = outputs_dataframe.drop('scriptType', axis=1) #delete script type 
    outputsToMiningPoolsAnalisis = outputsToMiningPoolsAnalisis.drop_duplicates(subset=['txId']) #delete any duplicates
    
    if LOG_LEVELS['debug'] or LOG_LEVELS['all infos']:
        print(f"outputs To Mining Pools Analisis:\n{outputsToMiningPoolsAnalisis}")
    
    mapDF = takeMapCSV_data() #take map dataframe
    if LOG_LEVELS['debug'] or LOG_LEVELS['all infos']:
        print(f"mapDF:\n\n{mapDF}")
    
    
    parsedTxDF = pd.merge(coinbaseTX, outputsToMiningPoolsAnalisis, on='txId') #merge coinbase tx with (parsed) outputs 
    if LOG_LEVELS['debug'] or LOG_LEVELS['all infos']:
        print(f"parsedTxDF (1):\n\n{parsedTxDF}")
    
    parsedTxDF = pd.merge(parsedTxDF, mapDF, on='addressId') #merge previous resoult with map dataframe (to insert hash column)
    if LOG_LEVELS['debug'] or LOG_LEVELS['all infos']:
        print(f"parsedTxDF (2):\n\n{parsedTxDF}")
    
    
    miningPoolAddressesDF = scraper.getPools() #get dataframe with txHash<-->mining pool association
    if LOG_LEVELS['debug'] or LOG_LEVELS['all infos']:
        print(f'\nminingPoolAddressesDF:\n{miningPoolAddressesDF}')
    
    #merge previous resoult with parsedTxDF to obtain all coinbase tx associated to a mining pool :
    # -> included coinbase tx that have addresses that do not belong to any of the 4 mining pools
    allCoinbaseTxWithPools = pd.merge(parsedTxDF,miningPoolAddressesDF, on='txHash', how='left') 
    if LOG_LEVELS['debug'] or LOG_LEVELS['all infos']:
        print(f'\nallCoinbaseTxWithPools:\n{allCoinbaseTxWithPools}')
  

    #filter transactions that not have the address associated to any mining pool
    coinbaseNotAssociated = allCoinbaseTxWithPools[allCoinbaseTxWithPools['pool'].isna()]
    coinbaseNotAssociated.drop('pool', axis=1)
    if LOG_LEVELS['debug'] or LOG_LEVELS['all infos']:
        print(f'\ncoinbaseNotAssociated:\n{coinbaseNotAssociated}')
    
    #filter transactions that have address associated to a mining pool
    coinbase_associated = allCoinbaseTxWithPools[~allCoinbaseTxWithPools['pool'].isna()]
    if LOG_LEVELS['debug'] or LOG_LEVELS['all infos']:
        print(f'\ncoinbase_associated:\n{coinbase_associated}')

        
    #group by 'txHash' and count transactions for each miner
    miner_counts = coinbaseNotAssociated.groupby('addressId')['txHash'].count().reset_index(name='blocks_mined')
    
    #sort miners     
    sorted_miners = miner_counts.sort_values(by='blocks_mined', ascending=False)

    #select top 4 miners 
    top_4_miners = sorted_miners.head(4)

    if LOG_LEVELS.get('debug', False):
        print(f"Top 4 miners:\n{top_4_miners}")
        
    
    others_miners = sorted_miners.iloc[4:]
    if LOG_LEVELS.get('debug', False):
        print(f"others miners:\n{others_miners}")

        
  
    # Calculate global statistics
    global_blocks_mined, global_total_rewards = analizer.calculate_pool_statistics(coinbase_associated)
    if LOG_LEVELS['debug'] or LOG_LEVELS['all infos']:
        print(f'\nglobal_blocks_mined:\n{global_blocks_mined}')
        print(f'\nglobal_total_rewards:\n{global_total_rewards}')

    # Calculate bi-monthly statistics
    bi_monthly_blocks_mined, bi_monthly_total_rewards = analizer.calculate_bi_monthly_statistics(coinbase_associated)
    if LOG_LEVELS['debug'] or LOG_LEVELS['all infos']:
        print(f'\npbi_monthly_blocks_mined:\n{bi_monthly_blocks_mined}')
        print(f'\nbi_monthly_total_rewards:\n{bi_monthly_total_rewards}')
    
    # Plot statistics
    plot_creator.plot_blocks_mined_by_top_4_miners(top_4_miners)
    plot_creator.plot_total_blocks_mined(global_blocks_mined)
    plot_creator.plot_bi_monthly_blocks_mined(bi_monthly_blocks_mined)
    plot_creator.plot_total_rewards(global_total_rewards)
    plot_creator.plot_bi_monthly_rewards(bi_monthly_total_rewards)

    #Eligius taint analysis
    nodes = scraper.getEligius_taint_analysis()    
    if LOG_LEVELS['debug'] or LOG_LEVELS['all infos']:
        print(f'\nnodes:\n{nodes}')
    
    graph_DF = pd.DataFrame(nodes)
    print(graph_DF)
    
    plot_creator.plot_Eligius_path(graph_DF)
    
        
def test_eligius_graph():
    #Eligius taint analysis
    nodes = scraper.getEligius_taint_analysis()    
    
    graph_DF = pd.DataFrame(nodes)
    print(graph_DF)
    
    plot_creator.plot_Eligius_path(graph_DF)
    


if __name__ == "__main__":
    os.system("cls" if os.name == "nt" else "clear")  # clear console
    execution_start_time = time.time()
    main()
    #test_eligius_graph()
    end_time = time.time()
    elapsed_time = end_time - execution_start_time
    if LOG_LEVELS['time']:
        print(f"\nTotal execution time: {elapsed_time} seconds")


## Considerazioni finali 

Il progetto è stato consegnato in formato .pdf, tuttavia allego il link della 