## Analisi Performance Inverted Index Distribuito

### Installazione Librerie

In [1]:

!pip install elasticsearch
!pip install tqdm



### Connessione ad Elastic Search

In [2]:
# Importazione librerie
from elasticsearch import Elasticsearch
from elasticsearch.helpers import bulk
from tqdm import tqdm
import json

# Configurazione connessione Elasticsearch
ELASTIC_ENDPOINT = "elasticsearch"
ELASTIC_ENDPOINT_PORT = 9200
ELASTIC_INDEX_NAME = "comuni"
ELASTICSEARCH_USERNAME = "progettoArchi"
ELASTICSEARCH_PASSWORD = "progettoArchi"

client_elastic = Elasticsearch(
    hosts=f"http://{ELASTIC_ENDPOINT}:{int(ELASTIC_ENDPOINT_PORT)}", 
    request_timeout=60, 
    max_retries=10, 
    retry_on_timeout=True, 
    basic_auth=(ELASTICSEARCH_USERNAME, ELASTICSEARCH_PASSWORD)
)

# Verifica della connessione
if not client_elastic.ping():
    raise ValueError("Connection to Elasticsearch failed")
else:
    print("Connected to Elasticsearch")

Connected to Elasticsearch


### Creazione indice per i comuni

In [3]:
# Definizione del mapping e delle impostazioni dell'indice
settings = {
    "index": {
        "number_of_shards": 1,
        "number_of_replicas": 1, #replica la shard per contrastare gli errori 
        "similarity": {
            "my_similarity": {
                "type": "IB",
                "distribution": "ll",
                "lambda": "df",
                "normalization": "h2",
                "normalization.h2.c": "1.0"
            }
        }
    },
    "analysis": {
        "filter": {
            "trigrams_filter": {
                "type": "ngram",
                "min_gram": 3,
                "max_gram": 3
            }
        },
        "analyzer": {
            "trigrams": {
                "type": "custom",
                "tokenizer": "whitespace",
                "filter": [
                    "lowercase",
                    "trigrams_filter"
                ]
            },
            "my_analyzer": {
                "type": "custom",
                "tokenizer": "whitespace",
                "filter": [
                    "lowercase"
                ]
            }
        }
    }
}

mappings =  {
    "properties": {
        "nome": {"type": "text", "analyzer": "my_analyzer"},
        "codice": {"type": "keyword"},
        "zona": {
            "properties": {
                "codice": {"type": "keyword"},
                "nome": {"type": "text"}
            }
        },
        "regione": {
            "properties": {
                "codice": {"type": "keyword"},
                "nome": {"type": "text"}
            }
        },
        "provincia": {
            "properties": {
                "codice": {"type": "keyword"},
                "nome": {"type": "text"}
            }
        },
        "sigla": {"type": "keyword"},
        "codiceCatastale": {"type": "keyword"},
        "cap": {"type": "keyword"},
        "popolazione": {"type": "integer"}
    }
}

MAPPING = {
    "settings": settings,
    "mappings": mappings
}

response = client_elastic.options(ignore_status=[400]).indices.create(
    index=ELASTIC_INDEX_NAME,
    body=MAPPING,
)

# Caricamento dei dati dal file JSON comuni
with open("comuni.json") as f:
    comuni = json.load(f)

# Buffer per indicizzazione batch (accumula i dati nel buffer per inviarli tutti insieme)
BATCH = 100
buffer = []

for index, comune in enumerate(tqdm(comuni)):
    doc = {
        "_index": ELASTIC_INDEX_NAME,
        "_source": comune
    }
    buffer.append(doc)
    
    if len(buffer) >= BATCH:
        bulk(client_elastic, buffer)
        buffer = []

if len(buffer) > 0:
    bulk(client_elastic, buffer)

print("Data indexed successfully")

100%|██████████| 7904/7904 [00:04<00:00, 1829.19it/s]

Data indexed successfully





### Test Scrittura e Lettura

In [4]:
# Funzione per parse_response (per future query)
def parse_response(response):
    if 'hits' in response and 'hits' in response['hits']:
        for result in response['hits']['hits']:
            print(result['_source'])

In [5]:
# Esempio query di ricerca
response = client_elastic.search(
    index=ELASTIC_INDEX_NAME,
    body={
        "query": {
            "match_all": {}
        }
    }
)

# Utilizzare la funzione parse_response per processare i risultati
parse_response(response)

{'nome': 'Druento', 'codice': '001099', 'zona': {'codice': '1', 'nome': 'Nord-ovest'}, 'regione': {'codice': '01', 'nome': 'Piemonte'}, 'provincia': {'codice': '001', 'nome': 'Torino'}, 'sigla': 'TO', 'codiceCatastale': 'D373', 'cap': ['10040'], 'popolazione': 8436}
{'nome': 'Cossato', 'codice': '096020', 'zona': {'codice': '1', 'nome': 'Nord-ovest'}, 'regione': {'codice': '01', 'nome': 'Piemonte'}, 'provincia': {'codice': '096', 'nome': 'Biella'}, 'sigla': 'BI', 'codiceCatastale': 'D094', 'cap': ['13836'], 'popolazione': 14810}
{'nome': 'Dualchi', 'codice': '091018', 'zona': {'codice': '5', 'nome': 'Isole'}, 'regione': {'codice': '20', 'nome': 'Sardegna'}, 'provincia': {'codice': '091', 'nome': 'Nuoro'}, 'sigla': 'NU', 'codiceCatastale': 'D376', 'cap': ['08010'], 'popolazione': 668}
{'nome': 'Cossignano', 'codice': '044016', 'zona': {'codice': '3', 'nome': 'Centro'}, 'regione': {'codice': '11', 'nome': 'Marche'}, 'provincia': {'codice': '044', 'nome': 'Ascoli Piceno'}, 'sigla': 'AP', 

### Scrittura e lettura massiva

In [6]:
# Stress test write (write massive)
import threading
from elasticsearch import Elasticsearch
from elasticsearch.helpers import bulk
from tqdm import tqdm
import json

# Funzione di indicizzazione batch
def index_data(client, data, batch_size=100):
    buffer = []
    for comune in data:
        doc = {
            "_index": ELASTIC_INDEX_NAME,
            "_source": comune
        }
        buffer.append(doc)
        
        if len(buffer) >= batch_size:
            bulk(client, buffer)
            buffer = []

    if len(buffer) > 0:
        bulk(client, buffer)

# Funzione per simulare il carico
def stress_test(client, data, repetitions=5, batch_size=100):
    threads = []
    for _ in range(repetitions):
        thread = threading.Thread(target=index_data, args=(client, data, batch_size))
        threads.append(thread)
        thread.start()
    
    for thread in threads:
        thread.join()

# Verifica della connessione
if not client_elastic.ping():
    raise ValueError("Connection to Elasticsearch failed")
else:
    print("Connected to Elasticsearch")

# Creazione dell'indice (come da codice originale)
client_elastic.options(ignore_status=[400]).indices.create(
    index=ELASTIC_INDEX_NAME,
    body=MAPPING
)

# Caricamento dei dati dal file JSON
with open("comuni.json") as f:
    comuni = json.load(f)

# Esecuzione del test di carico
stress_test(client_elastic, comuni, repetitions=10, batch_size=100)

print("Stress test completed successfully")

Connected to Elasticsearch
Stress test completed successfully


In [7]:
#Stress test read (read massive)
from time import time

def search_test(client, index, query, repetitions=1000):
    start_time = time()
    for _ in range(repetitions):
        client.search(index=index, body=query)
    end_time = time()
    print(f"Executed {repetitions} searches in {end_time - start_time} seconds")

# Esempio di query di ricerca
query = {
    "query": {
        "match_all": {}
    }
}

# Esecuzione del test di ricerca
search_test(client_elastic, ELASTIC_INDEX_NAME, query)

Executed 1000 searches in 5.066514253616333 seconds


In [8]:
#info sul cluster 
cluster_health = client_elastic.cluster.health()
print(cluster_health)

{'cluster_name': 'docker-cluster', 'status': 'green', 'timed_out': False, 'number_of_nodes': 1, 'number_of_data_nodes': 1, 'active_primary_shards': 10, 'active_shards': 10, 'relocating_shards': 0, 'initializing_shards': 0, 'unassigned_shards': 0, 'delayed_unassigned_shards': 0, 'number_of_pending_tasks': 0, 'number_of_in_flight_fetch': 0, 'task_max_waiting_in_queue_millis': 0, 'active_shards_percent_as_number': 100.0}
