Поиск отзывов по сходству с запросом от пользователя

In [None]:
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.metrics.pairwise import cosine_similarity
from collections import defaultdict
from nltk.corpus import stopwords
stop_words = stopwords.words('russian')

In [None]:
MAX_REVIEWS = 5000  #Берем выборку побольше для лучшего поиска

In [None]:
#Создаем класс для поиска отзывов
class ReviewSearcher:
    def __init__(self):
        self.vectorizer = TfidfVectorizer(
            max_features=10000,
            ngram_range=(1, 2),
            stop_words=stop_words
        )
        self.review_data = []
        self.tfidf_matrix = None
        
    #Загружаем отзывы из базы данных для анализа
    def load_data_from_db(self):
        conn = psycopg2.connect(**DB_CONFIG)
        cursor = conn.cursor()
        
        print(f"Загрузка до {MAX_REVIEWS} отзывов из БД...")
        cursor.execute(f"SELECT review_id, text FROM reviews LIMIT {MAX_REVIEWS}")
        self.review_data = cursor.fetchall()
        
        conn.close()
        
        if not self.review_data:
            raise ValueError("В базе данных нет отзывов")
            
        print(f"Загружено {len(self.review_data)} отзывов (ограничение: {MAX_REVIEWS})")

    #Создаем поисковой индекс и проодим TF-IDF анализ
    def prepare_search_index(self):
        if len(self.review_data) > MAX_REVIEWS:
            self.review_data = self.review_data[:MAX_REVIEWS]
            print(f"Ограничение количества отзывов до {MAX_REVIEWS}")
            
        texts = [text for _, text in self.review_data]
        print("Построение TF-IDF матрицы...")
        self.tfidf_matrix = self.vectorizer.fit_transform(texts)

    def find_similar_reviews(self, query_text, top_n=5):
        """Поиск похожих отзывов"""
        if self.tfidf_matrix is None:
            raise RuntimeError("Индекс не построен. Сначала вызовите prepare_search_index()")
            
        #Векторизация запроса
        query_vec = self.vectorizer.transform([query_text])
        
        #Вычисляем косинусовое сходство
        similarities = cosine_similarity(query_vec, self.tfidf_matrix).flatten()
        
        #Группируем отзывы по диапазонам сходства
        similarity_groups = defaultdict(list)
        
        for idx, similarity in enumerate(similarities):
            similarity_rounded = round(similarity, 1)
            review_id, text = self.review_data[idx]
            similarity_groups[similarity_rounded].append({
                'review_id': review_id,
                'text': text,
                'similarity': similarity
            })
        
        #Сортируем группы по убыванию сходства
        sorted_groups = sorted(similarity_groups.items(), key=lambda x: x[0], reverse=True)
        
        #Формируем результат с топом отзывов из каждой группы
        results = []
        for similarity, reviews in sorted_groups:
            reviews_sorted = sorted(reviews, key=lambda x: x['similarity'], reverse=True)
            
            group_info = {
                'similarity_range': f"{similarity:.1f}-{similarity+0.1:.1f}",
                'count': len(reviews),
                'top_reviews': reviews_sorted[:top_n]
            }
            results.append(group_info)
            
        return results

In [None]:
#Вызываем главную функцию
if __name__ == "__main__":
    try:
        searcher = ReviewSearcher()
        searcher.load_data_from_db()
        searcher.prepare_search_index()
        
        while True:
            print("\nВведите текст для поиска похожих отзывов (или 'exit' для выхода):") #Инпут для пользователя
            query = input().strip()
            
            if query.lower() == 'exit': #Условие остановки анализа - введение слова exit
                break
                
            if not query:
                continue
                
            results = searcher.find_similar_reviews(query)
            
            print(f"\nРезультаты поиска для запроса: '{query}'")
            for group in results:
                print(f"\nГруппа сходства {group['similarity_range']} - найдено отзывов: {group['count']}")
                print(f"Топ-5 отзывов из этой группы:")
                for i, review in enumerate(group['top_reviews'], 1):
                    print(f"  {i}. ID: {review['review_id']}, Сходство: {review['similarity']:.4f}")
                    print(f"     Текст: {review['text'][:100]}...")
                
    except Exception as e:
        print(f"Ошибка: {str(e)}")

Анализ эмоциональной окраски отзывов по категориям

In [None]:
#Создаем класс для эмоционального анализа
class RussianEmotionAnalyzer:
    def __init__(self):
        self.review_data = []
        #Создаем структуру для хранения статистики
        self.emotion_stats = {
            'радость': {'count': 0, 'examples': []},
            'грусть': {'count': 0, 'examples': []},
            'ярость': {'count': 0, 'examples': []},
            'нейтральный': {'count': 0, 'examples': []}
        }
        self.total_reviews = 0

        
        #Создаем словари эмоций 
        self.emotion_keywords = {
            'радость': [
                'отличн', 'прекрасн', 'рекоменд', 'довол', 'рад', 'восхит', 
                'супер', 'замечат', 'хорош', 'потряс', 'восторг', 'любим', 
                'удовольств', 'счаст', 'благодар', 'восхищен', 'превосходн', 'шикарн'
            ],
            'грусть': [
                'плох', 'груст', 'разочар', 'жал', 'печал', 'неудов', 
                'сожален', 'тоск', 'скорб', 'несчаст', 'обид', 'тяжел', 
                'печаль', 'скучн', 'уныл', 'неприятн', 'отказ'
            ],
            'ярость': [
                'ужас', 'кошмар', 'злит', 'бесит', 'возмущ', 'отврат', 
                'ненавист', 'гнев', 'бешен', 'раздраж', 'мерзк', 'противн', 
                'худш', 'гадост', 'отвратительн', 'неприятн'
            ]
        }

    #Загрузка отзывов из базы данных, начиная с первого review_id (была проблема - брал для анализа записи не из анчала таблицы)
    def load_data_from_db(self, limit=10000):
        
        conn = psycopg2.connect(**DB_CONFIG)
        cursor = conn.cursor()
        
        #Получаем минимальный review_id
        cursor.execute("SELECT MIN(review_id) FROM reviews")
        min_id = cursor.fetchone()[0]
        
        print(f"Загрузка отзывов из БД начиная с ID {min_id} (лимит: {limit})...")
        cursor.execute("""
            SELECT review_id, text 
            FROM reviews 
            WHERE review_id >= %s
            ORDER BY review_id
            LIMIT %s
        """, (min_id, limit))
        
        self.review_data = cursor.fetchall()
        conn.close()
        
        if not self.review_data:
            raise ValueError("В базе данных нет отзывов")
            
        print(f"Загружено {len(self.review_data)} отзывов (первый ID: {self.review_data[0][0]}, последний ID: {self.review_data[-1][0]})")

    #Анализ эмоциональной окраски текста
    def analyze_emotion(self, text):
        
        text = text.lower()
        
        for emotion, keywords in self.emotion_keywords.items():
            if any(keyword in text for keyword in keywords):
                return emotion
        
        return 'нейтральный'

    #Обрабатываем отзывовы
    def process_reviews(self):
        print("Анализ эмоциональной окраски отзывов...")
        
        for review_id, text in self.review_data:
            try:
                emotion = self.analyze_emotion(text)
                
                #Обновляем счетчик
                self.emotion_stats[emotion]['count'] += 1
                self.total_reviews += 1
                
                #Сохраняем примеры (ограничение - не более 5)
                if len(self.emotion_stats[emotion]['examples']) < 5:
                    self.emotion_stats[emotion]['examples'].append({
                        'id': review_id,
                        'text': text[:200] + '...' if len(text) > 200 else text #Ограничиваем вывод текста отзыва, если он слишком большой
                    })
                    
            except Exception as e:
                print(f"Ошибка при анализе отзыва {review_id}: {str(e)}")
                continue

    #Выводим результаты
    def print_results(self):
        print("\nРезультаты анализа эмоциональной окраски отзывов:")
        print("=" * 60)
        print(f"Обработано отзывов: {self.total_reviews}")
        print(f"Диапазон ID: {self.review_data[0][0]} - {self.review_data[-1][0]}")
        print("=" * 60)
        
        for emotion, stats in self.emotion_stats.items():
            percentage = (stats['count'] / self.total_reviews * 100) if self.total_reviews > 0 else 0 #Рассчитываем процент от общего числа обработанных отзывов
            
            print(f"\n{emotion.upper():^60}")
            print("-" * 60)
            print(f"Количество: {stats['count']} ({percentage:.1f}%)") #Выводим количество и процент по каждой группе
            
            #Выводим примеры по каждой группе
            if stats['examples']:
                print("\nПримеры отзывов:")
                for i, example in enumerate(stats['examples'], 1):
                    print(f"{i}. ID: {example['id']}")
                    print(f"   Текст: {example['text']}\n")
            
            print("=" * 60)

Поиск аномалий в тексте (через питон)

In [None]:
import re
from collections import defaultdict
from difflib import SequenceMatcher

In [None]:
#Создаем класс для анализа выявления аномалий
class AnomalyDetector:
    def __init__(self):
        self.review_data = []
        self.anomalies = defaultdict(list)
        self.user_stats = defaultdict(dict)
        
        # Параметры для обнаружения аномалий
        self.params = {
            'max_caps_ratio': 0.3, #Максимальная доля заглавных букв
            'max_links': 1, #Максимальное количество ссылок
            'max_special_chars': 20, #Максимальное количество спецсимволов
            'min_unique_words': 3, #Минимальное количество уникальных слов
            'max_reviews_per_user': 3, #Максимальное количество отзывов от пользователя
            'min_similarity_ratio': 0.8 #Порог схожести отзывов
        }

    #Загрузка отзывов из базы данных с ограничением
    def load_data_from_db(self, limit=500):

        conn = psycopg2.connect(**DB_CONFIG)
        cursor = conn.cursor()
        
        print(f"Загрузка отзывов из БД (лимит: {limit})...")
        query = """
        SELECT r.review_id, r.text, r.mark, r.user_id
        FROM reviews r
        LIMIT %s
        """
        cursor.execute(query, (limit,))
        self.review_data = cursor.fetchall()
        
        conn.close()
        
        if not self.review_data:
            raise ValueError("В базе данных нет отзывов")
            
        print(f"Загружено {len(self.review_data)} отзывов")

    #Вычисление коэффициента схожести двух текстов отзывов
    def calculate_similarity(self, text1, text2):
        return SequenceMatcher(None, text1.lower(), text2.lower()).ratio()

    #Функция обнаружение аномальных отзывов
    def detect_anomalies(self):
        print("Поиск аномальных отзывов...")
        
        user_reviews = defaultdict(list)
        
        #Собираем отзывы по пользователям
        for review in self.review_data:
            review_id, text, mark, user_id = review
            user_reviews[user_id].append((review_id, text, mark))
        
        #Анализируем каждого пользователя
        for user_id, reviews in user_reviews.items():
            total_reviews = len(reviews)
            
            #Сохраняем статистику по пользователю
            self.user_stats[user_id] = {
                'total_reviews': total_reviews,
                'similar_reviews': set()
            }
            
            #Прводим проверку на схожесть отзывов
            for i in range(len(reviews)):
                for j in range(i+1, len(reviews)):
                    review_id1, text1, mark1 = reviews[i]
                    review_id2, text2, mark2 = reviews[j]
                    
                    similarity = self.calculate_similarity(text1, text2)
                    if similarity >= self.params['min_similarity_ratio']:
                        self.user_stats[user_id]['similar_reviews'].update([review_id1, review_id2])
                        self._add_anomaly(
                            review_id1,
                            text1,
                            mark1,
                            user_id,
                            f"Найден похожий отзыв (схожесть: {similarity:.1%}, ID: {review_id2})" #Показываем процент схожести
                        )
                        self._add_anomaly(
                            review_id2,
                            text2,
                            mark2,
                            user_id,
                            f"Найден похожий отзыв (схожесть: {similarity:.1%}, ID: {review_id1})" 
                        )
            
            #Проверка на количество отзывов (добавляем только для одного отзыва пользователя, так как 1 пользователь мог оставить несколько сходих комментариев, и нам не надо, чтобы вывод дублировался каждый раз)
            if total_reviews > self.params['max_reviews_per_user']:
                first_review_id = reviews[0][0]
                self._add_anomaly(
                    first_review_id,
                    reviews[0][1],
                    reviews[0][2],
                    user_id,
                    f"Пользователь оставил {total_reviews} отзывов (показан 1 из них)"
                )
        
        #Проверка других аномалий для всех отзывов
        for review in self.review_data:
            review_id, text, mark, user_id = review
            #Пропускаем отзывы, которые уже добавлены как "похожие" или "много отзывов"
            if review_id in self.anomalies:
                continue
            self._check_basic_anomalies(review_id, text, mark, user_id)

    #Добавляем аномалии в результаты
    def _add_anomaly(self, review_id, text, mark, user_id, message):
        if review_id in self.anomalies:
            if isinstance(message, list):
                self.anomalies[review_id]['anomalies'].extend(message)
            else:
                self.anomalies[review_id]['anomalies'].append(message)
        else:
            self.anomalies[review_id] = {
                'text': text[:200] + '...' if len(text) > 200 else text,  #Опять ограничиваем текст отзыва
                'mark': mark,
                'user_id': user_id,
                'anomalies': [message] if not isinstance(message, list) else message
            }

    #Проверка базовых аномалий в отзыве
    def _check_basic_anomalies(self, review_id, text, mark, user_id):
        anomalies_found = []
        
        #Проверка на капс
        caps_count = sum(1 for c in text if c.isupper())
        caps_ratio = caps_count / len(text) if len(text) > 0 else 0
        if caps_ratio > self.params['max_caps_ratio']:
            anomalies_found.append(f"Много заглавных букв ({caps_ratio:.0%})")
        
        #Проверка на ссылки
        link_count = len(re.findall(r'http[s]?://(?:[a-zA-Z]|[0-9]|[$-_@.&+]|[!*\\(\\),]|(?:%[0-9a-fA-F][0-9a-fA-F]))+', text))
        if link_count > self.params['max_links']:
            anomalies_found.append(f"Обнаружены ссылки ({link_count})")
        
        #Проверка спецсимволов
        special_chars = len(re.findall(r'[^\w\s]', text))
        if special_chars > self.params['max_special_chars']:
            anomalies_found.append(f"Много спецсимволов ({special_chars})")
        
        #Проверка уникальных слов
        words = re.findall(r'[а-яА-ЯёЁ]+', text.lower())
        unique_words = len(set(words))
        if unique_words < self.params['min_unique_words']:
            anomalies_found.append(f"Мало уникальных слов ({unique_words})")
        
        if anomalies_found:
            self._add_anomaly(review_id, text, mark, user_id, anomalies_found)

    #Выводим результаты
    def print_results(self):
        print(f"\nНайдено {len(self.anomalies)} аномальных отзывов:")
        print("=" * 100)
        
        for review_id, data in self.anomalies.items():
            print(f"\nID отзыва: {review_id}")
            print(f"Пользователь: {data['user_id']}")
            print(f"Оценка: {data['mark']}/5")
            print(f"Текст: {data['text']}")
            print("\nОбнаруженные аномалии:")
            for anomaly in data['anomalies']:
                print(f"- {anomaly}")
            
            print("-" * 100)


In [None]:
#Главная функция запуска анализа
if __name__ == "__main__":
    try:
        detector = AnomalyDetector()
        detector.load_data_from_db(limit=500)
        detector.detect_anomalies()
        detector.print_results()
        
    except Exception as e:
        print(f"Ошибка: {str(e)}")