# Лабораторная работа №2 и №3
## Кластеризация текстовых данных и изображений

# Часть 1: Кластеризация текстовых данных

In [1]:
import json
import os

# Загрузка текстовых данных
text_messages_clean = []

with open(r"posts.json") as file:
    for line in file.readlines():
        wall = json.loads(line)
        parsed_response = json.JSONDecoder().decode(json.dumps(wall))
        nodes = []
        for key, value in parsed_response.items():
            nodes.append(value.get('items'))
        for node in nodes:
            for post in node:
                text = post.get("text")
                if text is not None:
                    text_messages_clean.append(text)

print(f"Загружено сообщений: {len(text_messages_clean)}")

Загружено сообщений: 486


In [2]:
# Подсчет слов
words = {}

for message in text_messages_clean:
    if message is None:
        continue
    for word in message.split():
        if word not in words:
            words[word] = 1
        else:
            words[word] += 1

words = dict(sorted(words.items(), key=lambda item: item[1], reverse=True))
print(f"Уникальных слов: {len(words)}")
print("Топ-20 слов:")
for i, (word, count) in enumerate(list(words.items())[:20]):
    print(f"{i+1}. {word}: {count}")

Уникальных слов: 7280
Топ-20 слов:
1. в: 390
2. и: 376
3. на: 213
4. с: 141
5. не: 139
6. —: 118
7. что: 85
8. -: 82
9. по: 80
10. В: 71
11. для: 55
12. это: 52
13. за: 52
14. из: 44
15. или: 44
16. а: 42
17. до: 41
18. к: 41
19. как: 40
20. –: 32


In [3]:
# Импорт библиотек для обработки текста
import numpy as np
import pandas as pd
import nltk
import re
import matplotlib.pyplot as plt

# Загрузка ресурсов NLTK
# nltk.download('punkt')
# nltk.download('stopwords')

from nltk.stem.snowball import SnowballStemmer
stemmer = SnowballStemmer("russian")

In [4]:
# Функции токенизации и стемминга
def token_and_stem(text):
    tokens = [word for sent in nltk.sent_tokenize(text) 
              for word in nltk.word_tokenize(sent)]
    
    filtered_tokens = []
    for token in tokens:
        if re.search('[а-яА-Я]', token):
            filtered_tokens.append(token)
    
    stems = [stemmer.stem(t) for t in filtered_tokens]
    return stems

def token_only(text):
    tokens = [word.lower() for sent in nltk.sent_tokenize(text) 
              for word in nltk.word_tokenize(sent)]
    
    filtered_tokens = []
    for token in tokens:
        if re.search('[а-яА-Я]', token):
            filtered_tokens.append(token)
    
    return filtered_tokens

In [5]:
# TF-IDF векторизация
from sklearn.feature_extraction.text import TfidfVectorizer

stopwords = nltk.corpus.stopwords.words('russian')
stopwords.extend(['что', 'это', 'так', 'вот', 'быть', 'как',
                  'в', 'к', 'на', 'а', 'от', 'о', 
                  'чут', 'даж', 'еще'])

tfidf_vectorizer = TfidfVectorizer(
    max_df=0.75,
    max_features=10000,
    min_df=0.01,
    stop_words=stopwords,
    use_idf=True,
    tokenizer=token_and_stem,
    ngram_range=(1,3)
)

tfidf_matrix = tfidf_vectorizer.fit_transform(text_messages_clean)

print(f"Размерность TF-IDF матрицы: {tfidf_matrix.shape}")



Размерность TF-IDF матрицы: (486, 321)


In [None]:
# Применение методов кластеризации
from sklearn.cluster import KMeans, MiniBatchKMeans, AgglomerativeClustering, DBSCAN

num_clusters = 5

km = KMeans(n_clusters=num_clusters)
km.fit(tfidf_matrix)
clusters_km = km.labels_.tolist()

mbk = MiniBatchKMeans(init='random', n_clusters=num_clusters, random_state=42)
mbk.fit(tfidf_matrix)
clusters_mbk = mbk.labels_.tolist()

db = DBSCAN(eps=0.3, min_samples=10).fit(tfidf_matrix)
clusters_dbscan = db.labels_

agglo = AgglomerativeClustering(n_clusters=num_clusters, metric='euclidean')
clusters_agglo = agglo.fit_predict(tfidf_matrix.toarray())

print(f"K-means: {pd.Series(clusters_km).value_counts().sort_index().to_dict()}")
print(f"MiniBatchKMeans: {pd.Series(clusters_mbk).value_counts().sort_index().to_dict()}")
print(f"DBSCAN: {pd.Series(clusters_dbscan).value_counts().sort_index().to_dict()}")
print(f"Аггломеративная: {pd.Series(clusters_agglo).value_counts().sort_index().to_dict()}")

In [None]:
# Анализ косинусных расстояний
from sklearn.metrics.pairwise import cosine_similarity

dist = 1 - cosine_similarity(tfidf_matrix)

plt.figure(figsize=(10, 8))
plt.imshow(dist, cmap='coolwarm')
plt.colorbar()
plt.title('Косинусные расстояния между документами')
plt.xlabel('Документ')
plt.ylabel('Документ')
plt.show()

In [None]:
# PCA 2D визуализация
from sklearn.decomposition import IncrementalPCA

icpa_2d = IncrementalPCA(n_components=2, batch_size=16)
icpa_2d.fit(dist)
demo2d = icpa_2d.transform(dist)
xs_2d, ys_2d = demo2d[:, 0], demo2d[:, 1]

plt.figure(figsize=(8, 6))
plt.scatter(xs_2d, ys_2d, marker='.', s=5, alpha=0.6)
plt.xlabel('PC1')
plt.ylabel('PC2')
plt.title('2D визуализация PCA')
plt.show()

In [None]:
# PCA 3D визуализация
from mpl_toolkits.mplot3d import Axes3D

icpa_3d = IncrementalPCA(n_components=3, batch_size=16)
icpa_3d.fit(dist)
demo3d = icpa_3d.transform(dist)
xs_3d, ys_3d, zs_3d = demo3d[:, 0], demo3d[:, 1], demo3d[:, 2]

fig = plt.figure(figsize=(10, 8))
ax = fig.add_subplot(111, projection='3d')
ax.scatter(xs_3d, ys_3d, zs_3d, marker='.', s=5, alpha=0.6)
ax.set_xlabel('PC1')
ax.set_ylabel('PC2')
ax.set_zlabel('PC3')
ax.set_title('3D визуализация PCA')
plt.show()

# Часть 2: Кластеризация изображений

In [11]:
from dotenv import load_dotenv
# Импорт библиотек для работы с API ВКонтакте и загрузки изображений
from urllib.request import urlretrieve
import vk
import time
import math

# Токен доступа (необходимо заполнить)
load_dotenv()
secret_token = os.getenv("VK_ACCESS_TOKEN") # Измените на ваш токен
# Авторизация
try:
    vkapi = vk.API(access_token=secret_token, v="5.81")
    print("Авторизация выполнена успешно")
except Exception as e:
    print(f"Ошибка авторизации: {e}")
    print("Используйте действительный токен доступа")

Авторизация выполнена успешно


In [12]:
# Извлечение ID альбома и владельца из URL
url = "https://vk.com/album-97980111_248419714"

album_id = url.split('/')[-1].split('_')[1]
owner_id = url.split('/')[-1].split('_')[0].replace('album', '')

print(f"ID альбома: {album_id}")
print(f"ID владельца: {owner_id}")

# Получение информации об альбоме
try:
    photos_count = vkapi.photos.getAlbums(owner_id=owner_id, album_ids=album_id)['items'][0]['size']
    print(f"Всего фотографий в альбоме: {photos_count}")
except Exception as e:
    print(f"Ошибка при получении информации об альбоме: {e}")
    photos_count = 0

ID альбома: 248419714
ID владельца: -97980111
Всего фотографий в альбоме: 1679


In [None]:
# Создание директорий для сохранения фотографий
if not os.path.exists('saved'):
    os.mkdir('saved')

photo_folder = f'saved/album{owner_id}_{album_id}'

if not os.path.exists(photo_folder):
    os.mkdir(photo_folder)

print(f"Директория для сохранения: {photo_folder}")

In [None]:
# Загрузка фотографий из альбома (пропущена в целях демонстрации)
# Раскомментируйте код ниже если хотите загрузить реальные фотографии

# counter = 0
# prog = 0
# batch_size = 100
# time_now = time.time()
#
# for j in range(math.ceil(photos_count / batch_size)):
#     try:
#         photos = vkapi.photos.get(owner_id=owner_id, album_id=album_id,
#                                    count=batch_size, offset=j*batch_size, v=5.95)['items']
#     except:
#         continue
#     for photo in photos:
#         counter += 1
#         sizes = photo['sizes']
#         s = photo['sizes'][0]
#         value_x, value_y = 0, 0
#         for size in sizes:
#             if value_x < size['width']:
#                 value_x = size['width']
#                 s = size
#         url_ = s['url']
#         try:
#             filename = os.path.split(url_)[1].split('?')[0]
#             urlretrieve(url_, os.path.join(photo_folder, filename))
#         except:
#             pass

print("Загрузка фотографий требует действительный токен доступа")
print(f"Фотографии сохраняются в: {photo_folder}")

In [None]:
# Импорт библиотек для обработки изображений
import cv2
from sklearn.cluster import KMeans
from sklearn.decomposition import PCA
from mpl_toolkits.mplot3d import Axes3D

# Класс для выделения доминирующих цветов
class DominantColors:
    def __init__(self, image_path, clusters):
        self.CLUSTERS = clusters
        self.IMAGE_PATH = image_path
        self.IMAGE = None
        self.COLORS = None
        self.LABELS = None
    
    def dominantColors(self):
        """Выделение доминирующих цветов при помощи K-means"""
        img = cv2.imread(self.IMAGE_PATH)
        if img is None:
            raise ValueError(f"Не удалось загрузить изображение: {self.IMAGE_PATH}")
        
        img = cv2.cvtColor(img, cv2.COLOR_BGR2RGB)
        img = img.reshape((img.shape[0] * img.shape[1], 3))
        self.IMAGE = img
        
        kmeans = KMeans(n_clusters=self.CLUSTERS, random_state=42)
        kmeans.fit(img)
        
        self.COLORS = kmeans.cluster_centers_
        self.LABELS = kmeans.labels_
        
        return self.COLORS.astype(int)
    
    def plotHistogram(self):
        """Отрисовка гистограммы доминирующих цветов"""
        numLabels = np.arange(0, self.CLUSTERS + 1)
        (hist, _) = np.histogram(self.LABELS, bins=numLabels)
        hist = hist.astype("float")
        hist /= hist.sum()
        
        colors = self.COLORS[(-hist).argsort()]
        hist = hist[(-hist).argsort()]
        
        chart = np.zeros((50, 500, 3), np.uint8)
        start = 0
        
        for i in range(self.CLUSTERS):
            end = start + hist[i] * 500
            r, g, b = int(colors[i][0]), int(colors[i][1]), int(colors[i][2])
            cv2.rectangle(chart, (int(start), 0), (int(end), 50), (b, g, r), -1)
            start = end
        
        fig = plt.figure(figsize=(12, 4))
        pic_box = fig.add_subplot(121)
        hist_box = fig.add_subplot(122)
        
        pic_box.axis("off")
        pic_box.set_title("Исходное изображение")
        try:
            pic_box.imshow(plt.imread(self.IMAGE_PATH))
        except:
            pic_box.text(0.5, 0.5, "Изображение не найдено", ha='center')
        
        hist_box.axis("off")
        hist_box.set_title("Доминирующие цвета")
        hist_box.imshow(chart)
        
        plt.show()

print("Класс DominantColors создан успешно")

In [None]:
# Пример использования класса DominantColors
# Замените путь на путь к вашему изображению

# img_path = 'saved/album-97980111_248419714/image.jpg'
# clusters = 5
# dc = DominantColors(img_path, clusters)
# colors = dc.dominantColors()
# print(f"Доминирующие цвета (RGB): {colors}")
# dc.plotHistogram()

print("Для использования на реальных изображениях укажите путь к файлу")
print("Пример: dc = DominantColors('saved/album.../image.jpg', 5)")
print("Затем вызовите: dc.dominantColors() и dc.plotHistogram()")


In [None]:
# Кластеризация всех изображений в папке
# directory = photo_folder
# filenames = []
# data = []
# files_total = len(os.listdir(directory))
#
# if files_total == 0:
#     print(f"В папке {directory} нет изображений")
# else:
#     for filename in os.listdir(directory):
#         if filename.lower().endswith(('.jpg', '.jpeg', '.png', '.bmp')):
#             data_iter = []
#             filenames.append(filename)
#             img = os.path.join(directory, filename)
#             try:
#                 clusters = 3
#                 dc = DominantColors(img, clusters)
#                 colors = dc.dominantColors()
#                 for i in colors:
#                     for j in i:
#                         data_iter.append(j)
#                 data.append(data_iter)
#                 print(f"{len(filenames)}/{files_total}")
#             except Exception as e:
#                 print(f"Ошибка: {e}")

print("Кластеризация всех изображений пропущена")


In [None]:
# PCA анализ данных изображений
# from sklearn.decomposition import PCA
#
# if len(data) > 0:
#     np_data = np.asarray(data, dtype=np.float32)
#     pca = PCA(n_components=3)
#     XPCAreduced = pca.fit_transform(np_data)
#     print(f"Объяснённая дисперсия: {pca.explained_variance_ratio_}")
#     print(f"Сумма: {pca.explained_variance_ratio_.sum():.2%}")
#     xs, ys, zs = np_data[:, 0], np_data[:, 1], np_data[:, 2]
#     fig = plt.figure(figsize=(10, 8))
#     ax = fig.add_subplot(111, projection='3d')
#     ax.scatter(xs, ys, zs, c='blue', marker='o', s=50)
#     ax.set_xlabel('Компонента 1 (R)')
#     ax.set_ylabel('Компонента 2 (G)')
#     ax.set_zlabel('Компонента 3 (B)')
#     ax.set_title('3D визуализация цветов')
#     plt.show()
# else:
#     print("Нет данных для анализа PCA")

print("PCA анализ пропущен (требуются обработанные данные)")
