In [4]:
from elasticsearch import Elasticsearch, helpers
import csv

# Connexion au cluster Elasticsearch
es = Elasticsearch(hosts="http://localhost:9200")

# Nom de l'index
index_name = "reviews"

# Mapping pour l'index
mapping = {
    "mappings": {
        "_meta": {
            "created_by": "yaya"
        },
        "properties": {
            "Company": {"type": "keyword"},
            "Customer": {"type": "keyword"},
            "Date_experience": {"type": "date", "format": "yyyy-MM-dd"},
            "Date_reply": {"type": "date", "format": "yyyy-MM-dd"},
            "Date_review": {"type": "date", "format": "yyyy-MM-dd"},
            "Response_time":{"type": "long"},
            "Experience": {"type": "keyword"},
            "Language": {"type": "keyword"},
            "Number_review": {"type": "long"},
            "Rating": {"type": "long"},
            "Reply": {"type": "keyword"},
            "Status": {"type": "keyword"},
            "Title": {"type": "keyword"},
            "document_id": {"type": "keyword"}
        }
    }
}

def create_elasticsearch_index():
    # Index des avis clients
    index_name = "reviews"
    # Créer l'index avec le mapping (you can skip this if the index already exists)
    if not es.indices.exists(index=index_name):
        es.indices.create(index=index_name, body=mapping)

    # Lecture du fichier CSV et correction des champs vides
    documents = read_csv("../data/processed/reviews.csv")

    # Insérer les documents en vrac dans Elasticsearch
    bulk_insert_documents(documents, index_name)

    # Vérification des documents insérés
    verify_inserted_documents(index_name)

    # Requête pour obtenir les 5 mots clés positifs
    positive_words_aggregation(index_name)

    # Recherche des mots les plus récurrents dans les commentaires négatifs
    negative_words_aggregation(index_name)

    # Requête d'agrégation pour obtenir des statistiques sur le champ "Rating"
    rating_aggregation(index_name)

    # Liste des entreprises avec le plus de commentaires négatifs
    negative_reviews_by_company(index_name)

    # Le nombre de commentaires non répondues par les entreprises
    no_reply_comments_by_company(index_name)

    # Classement des entreprises selon leur note moyenne
    companies_by_average_rating(index_name)

    # Statuts des commentateurs
    reviewer_statuses(index_name)

    # Taux de No Reply et délai de réponse moyen
    no_reply_percentage_and_average_response_time(index_name)

def read_csv(csv_path):
    documents = []
    with open(csv_path, encoding="utf-8") as f:
        reader = csv.DictReader(f)
        for idx, row in enumerate(reader):
            if "" in row:
                row["document_id"] = str(idx)  # Ajout de l'ID du document
                for key in list(row.keys()):
                    if key == "":
                        row.pop(key)  # Suppression des champs avec des clés vides
                documents.append(row)
    return documents

def bulk_insert_documents(documents, index_name):
    try:
        response = helpers.bulk(es, documents, index=index_name)
        print("Documents insérés avec succès :", response)
    except helpers.BulkIndexError as e:
        print("Erreur lors de l'insertion des documents :")
        for err in e.errors:
            print("Erreur :", err)

def verify_inserted_documents(index_name):
    result = es.search(index=index_name, size=5)
    for hit in result["hits"]["hits"]:
        print(hit["_source"])

def search_documents(index_name, query):
    result = es.search(index=index_name, body=query)
    for hit in result["hits"]["hits"]:
        print(hit["_source"])

def positive_words_aggregation(index_name):
    query = {
        "query": {
            "match_all": {}
        },
        "aggs": {
            "positive_words": {
                "terms": {
                    "field": "Experience",
                    "size": 5,
                    "min_doc_count": 5
                },
                "aggs": {
                    "rating_filter": {
                        "filter": {
                            "range": {
                                "Rating": {
                                    "gte": 5,
                                    "lte": 5
                                }
                            }
                        }
                    }
                }
            }
        }
    }
    search_documents(index_name, query)

def negative_words_aggregation(index_name):
    query = {
        "query": {
            "match_all": {}
        },
        "aggs": {
            "negative_words": {
                "terms": {
                    "field": "Title",
                    "size": 3,
                    "min_doc_count": 10
                },
                "aggs": {
                    "rating_filter": {
                        "filter": {
                            "range": {
                                "Rating": {
                                    "gte": 1,
                                    "lte": 1
                                }
                            }
                        }
                    }
                }
            }
        }
    }
    search_documents(index_name, query)

def rating_aggregation(index_name):
    query = {
        "size": 0,
        "aggs": {
            "value_count": {
                "value_count": {
                    "field": "Rating"
                }
            },
            "rating_stats": {
                "stats": {
                    "field": "Rating"
                }
            },
            "extended_ratings_stats": {
                "extended_stats": {
                    "field": "Rating"
                }
            }
        }
    }
    result = es.search(index=index_name, body=query)

    # Afficher les résultats de l'agrégation
    aggregations = result.get("aggregations", {})
    value_count = aggregations.get("value_count", {})
    rating_stats = aggregations.get("rating_stats", {})
    extended_ratings_stats = aggregations.get("extended_ratings_stats", {})

    print("Nombre total de valeurs de rating:", value_count.get("value", 0))
    print("Statistiques de rating:")
    print("    Minimum:", rating_stats.get("min", 0))
    print("    Maximum:", rating_stats.get("max", 0))
    print("    Moyenne:", rating_stats.get("avg", 0))
    print("    Somme:", rating_stats.get("sum", 0))
    print("    Écart-type:", extended_ratings_stats.get("std_deviation", 0))

def negative_reviews_by_company(index_name):
    query = {
        "query": {
            "match_all": {}
        },
        "sort": [
            {
                "Rating": {
                    "order": "asc"  # Tri ascendant (du plus bas au plus haut)
                }
            }
        ]
    }
    result = es.search(index=index_name, body=query)

    negative_reviews = []
    for hit in result["hits"]["hits"]:
        rating = int(hit["_source"]["Rating"])  # Convertir la note en entier
        if rating < 3:  # Supposons que 1 à 2 sont considérés comme des notes négatives
            negative_reviews.append(hit["_source"]["Company"])

    company_count = Counter(negative_reviews)
    sorted_companies = sorted(company_count.items(), key=lambda x: x[1], reverse=True)

    for company, count in sorted_companies[:6]:
        print(f"Entreprise : {company}, Nombre de commentaires négatifs : {count}")

def no_reply_comments_by_company(index_name):
    result = es.search(index=index_name, 
        body={
            "query": {
                "match": {
                    "Reply": {
                        "query": "No Reply"
                    }
                }
            },
            "aggs": {
                "companies": {
                    "terms": {
                        "field": "Company",
                        "size": 10
                    }
                }
            }
        })

    companies = result["aggregations"]["companies"]["buckets"]

    for company in companies:
        print(f"Entreprise : {company['key']} - Nombre de No Reply : {company['doc_count']}")

def companies_by_average_rating(index_name):
    query = {
        "aggs": {
            "companies": {
                "terms": {
                    "field": "Company"
                },
                "aggs": {
                    "average_rating": {
                        "avg": {
                            "field": "Rating"
                        }
                    }
                }
            }
        }
    }
    result = es.search(index=index_name, body=query)

    companies = result["aggregations"]["companies"]["buckets"]
    companies.sort(key=lambda company: company["average_rating"]["value"], reverse=False)

    for company in companies:
        average_ratings = round(company['average_rating']['value'], 2)
        print(f"Entreprise : {company['key']} - Note moyenne : {average_ratings}")

def reviewer_statuses(index_name):
    query = {
        "aggs": {
            "statuses": {
                "terms": {
                    "field": "Status"
                }
            }
        }
    }
    result = es.search(index=index_name, body=query)

    statuses = result["aggregations"]["statuses"]["buckets"]

    for status in statuses:
        print(f"Status : {status['key']} - Nombre de commentateurs : {status['doc_count']}")

def no_reply_percentage_and_average_response_time(index_name):
    query = {
        "query": {
            "match": {
                "Reply": "No Reply"
            }
        },
        "aggs": {
            "total_no_reply": {
                "value_count": {
                    "field": "Reply"
                }
            },
            "companies": {
                "terms": {
                    "field": "Company"
                },
                "aggs": {
                    "number_of_no_reply": {
                        "value_count": {
                            "field": "Reply"
                        }
                    }
                }
            }
        }
    }

    result = es.search(index=index_name, body=query)
    total_no_reply = result["aggregations"]["total_no_reply"]["value"]
    companies = result["aggregations"]["companies"]["buckets"]

    for company in companies:
        number_of_no_reply = company["number_of_no_reply"]["value"]
        percentage = (number_of_no_reply / total_no_reply) * 100
        print(f"Entreprise : {company['key']} - Nombre de No Reply : {number_of_no_reply} - Taux de No Reply : {percentage:.2f}%")

def average_response_time_by_company(index_name):
    query = {
        "aggs": {
            "companies": {
                "terms": {
                    "field": "Company"
                },
                "aggs": {
                    "average_day": {
                        "avg": {
                            "field": "Response_time"
                        }
                    }
                }
            }
        }
    }
    result = es.search(index=index_name, body=query)

    companies = result["aggregations"]["companies"]["buckets"]
    companies.sort(key=lambda company: company["average_day"]["value"], reverse=False)

    for company in companies:
        average_days = round(company['average_day']['value'], 2)
        print(f"Entreprise : {company['key']} - Délai de réponse moyen : {average_days}")

if __name__ == "__main__":
    create_elasticsearch_index()


Error during document insertion:
Error: {'index': {'_index': 'bdd_test', '_id': 'shemkosBi_rSG5hd_seP', 'status': 400, 'error': {'type': 'document_parsing_exception', 'reason': '[1:2] failed to parse: field name cannot be an empty string', 'caused_by': {'type': 'illegal_argument_exception', 'reason': 'field name cannot be an empty string'}}, 'data': {'': '0', 'Company': 'Younited Credit', 'Customer': 'M françois GUYOT', 'Number_review': '2', 'Language': 'FR', 'Title': 'SUPER SOCIETE DE CREDITS TRES REACTIVE…', 'Date_review': '2023-07-28 06:28:34', 'Reply': 'No Reply', 'Date_reply': '2023-07-28 06:28:34', 'Rating': '5', 'Status': 'Verified', 'Experience': 'SUPER SOCIETE DE CREDITS TRES REACTIVE…SUPER SOCIETE DE CREDITS TRES REACTIVE sans problêmes merci pour votre pret accorde avec le moins d administratif possible', 'Date_experience': '2023-07-20 00:00:00', 'document_id': '0'}}}
Error: {'index': {'_index': 'bdd_test', '_id': 'sxemkosBi_rSG5hd_seP', 'status': 400, 'error': {'type': 'doc

In [None]:
# Define the aggregation query to find negative keywords
query = {
    "query": {
        "match_all": {}
    },
    "aggs": {
        "negative_words": {
            "terms": {
                "field": "Experience.keyword",  # Use '.keyword' for keyword fields
                "size": 5,
                "min_doc_count": 10
            },
            "sort": [
                {
                    "_count": {
                        "order": "desc"  # Sort in descending order (from highest to lowest)
                    }
                }
            ],
            "aggs": {
                "rating_filter": {
                    "filter": {
                        "range": {
                            "Rating": {
                                "gte": 1,
                                "lte": 1
                            }
                        }
                    }
                }
            }
        }
    }
}

# Execute the aggregation query
result = es.search(index=index_name, body=query)

# Retrieve and display the results
negative_words = result["aggregations"]["negative_words"]["buckets"]

for negative_word in negative_words:
    print(f"Negative Keyword: {negative_word['key']} - Occurrences: {negative_word['doc_count']}")
