In [None]:
import numpy as np
import math
import collections
import re
from scipy.optimize import minimize
from scipy.integrate import quad
from collections import defaultdict

In [None]:
import os
import re

def read_brown_corpus_files(directory="C:\\Users\\saidk\\Курсач Python\\brown"):
    if not os.path.exists(directory):
        print(f"Error: Directory '{directory}' not found.")
        return None

    files = [f for f in os.listdir(directory) if f.startswith("c")]
    corpus_texts = dict()
    for file in files:
        filepath = os.path.join(directory, file)
        try:
            with open(filepath, 'r', encoding='utf-8') as f:
                corpus_texts[file] = [f.read()]
        except (FileNotFoundError, UnicodeDecodeError) as e:
            print(f"Error reading file {filepath}: {e}")
    return corpus_texts

def parse_text_to_words(text):
    # Удаляет лишние знаки пунктуации и разделает текст на слова.
    text = re.sub(r'/\w+', '', text) 
    text = re.sub(r'[^\w\s]', '', text)
    words = text.lower().split()
    
    return words


In [None]:
corpus_texts = read_brown_corpus_files()
print(corpus_texts)
if corpus_texts:
    all_words = dict()
    for key in corpus_texts.keys():
        words = parse_text_to_words(*corpus_texts[key])
        all_words[key] = words

In [None]:
all_words["ca01"]

In [None]:
import numpy as np
from scipy.optimize import minimize
from collections import defaultdict

class ZipfMandelbrotMixtureEM:
    def __init__(self, data, num_components=2, ns=None, qs=None, ss=None):
        self.data = np.array(data)  # Преобразуем data в NumPy array
        self.num_data = len(self.data)
        self.num_components = num_components
        self.pdf_cache = defaultdict(dict) # Кэш для pdf

        # Инициализация параметров (без изменений)
        if ns is None:
            self.ns = np.random.randint(low=self.num_data, high=self.num_data*2, size=num_components)
        else:
            assert len(ns) == num_components
            self.ns = np.array(ns) # Преобразуем в numpy array
        if qs is None:
            self.qs = np.random.uniform(low=0.0, high=10.0, size=num_components)
            print("qs ", self.qs)
        else:
            assert len(qs) == num_components, "Number of initial qs must match number of components."
            self.qs = qs
        if ss is None:
            self.ss = np.random.uniform(low=0.0, high=10.0, size=num_components)
            print("ss ", self.ss)
        else:
            assert len(ss) == num_components, "Number of initial ss must match number of components."
            self.ss = ss
        self.weights = np.full(num_components, 1/num_components)


    def pdf(self, x, params):
        q, s, n = params
        if np.any(x <= 0) or np.any(x > n):
            return np.zeros_like(x)

        key = (n, q, s)
        if key in self.pdf_cache and q in self.pdf_cache[key]:
            denominator = self.pdf_cache[key][q]
        else:
            denominator = np.sum((np.arange(1, n + 1) + q)**(-s))
            self.pdf_cache[key][q] = denominator

        numerator = (x + q)**(-s)
        return numerator / denominator if denominator > 0 else 0

    def mix_pdf(self):
        return np.sum([self.weights[i] * self.pdf(np.arange(1, max(self.ns)+1, 1), [self.qs[i], self.ss[i], self.ns[i]]) for i in range(self.num_components)], axis = 0)
    
    def Estep(self):
        qs = np.array(self.qs)
        ss = np.array(self.ss)
        ns = np.array(self.ns)
        # print("ns: ", ns)
        weights = np.array(self.weights)

        pdf_values = np.zeros((max(self.ns), self.num_components))
        for i in range(self.num_components):
            pdf_values[:, i] = pdf_values[:, i] = np.concatenate((self.pdf(np.arange(1, self.ns[i] + 1), [qs[i], ss[i], ns[i]]) * weights[i], np.zeros(int(max(self.ns) - self.ns[i]))))
        print("wps_all: ", pdf_values)
        den = np.sum(pdf_values, axis=1, keepdims=True)
        # print("den: ", den)
        pdf_values /= den
        self.loglike = np.sum(np.log(den))
        # print("pdf_values: ", pdf_values)
        return pdf_values


    def Mstep(self, weights):
        distr_weights = weights
        print(distr_weights, distr_weights.shape)
        bounds = [(0, 10), (0, 10), (self.num_data, sum(self.data))]
        for i in range(self.num_components):
            initial_guess = [self.qs[i], self.ss[i], self.ns[i]]

            result = minimize(
                lambda params: -np.sum(distr_weights[:self.ns[i], i] @ np.log(self.pdf(np.arange(1, self.ns[i]+1).T, params)+ 1)),
                initial_guess,
                bounds=bounds,
                method='Powell',
                options={'maxiter': 1000, 'disp': True, 'return_all': False}
            )
            print(result.x)
            self.qs[i], self.ss[i], self.ns[i] = result.x
#             print(-np.sum(distr_weights[:, i].reshape(1, self.num_data) * np.log(self.pdf(np.arange(1, self.num_data+1), [self.qs[i], self.ss[i], self.ns[i]])+ 1)) + 1)
            self.weights[i] = np.sum(weights[:, i]) / max(self.ns)
        # print(sum(self.weights), self.weights)

    def iterate(self, max_iter=100, tol=1e-6, verbose=False):
        prev_loglikelihood = -np.inf
        for i in range(max_iter):
            weights = self.Estep()
            self.Mstep(weights)
            loglikelihood = self.compute_loglikelihood()
            if verbose:
                print(f'Iteration {i+1}: Log Likelihood = {loglikelihood:.4f}')
            # if abs(loglikelihood - prev_loglikelihood) < tol:
            #     print("Stop")
            #     break
            prev_loglikelihood = loglikelihood

    def compute_loglikelihood(self):
        loglikelihood = 0 
        component_probs = [weight * self.pdf(np.arange(1, len(self.data)+1), [q, s, n]) for weight, n, q, s in zip(self.weights, self.ns, self.qs, self.ss)]
        sum_probs = sum(component_probs)
        for probs in range(len(sum_probs)):
            if  sum_probs[probs] == 0:
                sum_probs[probs] = 1
        loglikelihood += np.sum(np.log(sum_probs))
        return loglikelihood

In [None]:
#Альтернативное разделение данных по клаасам литературы
import pandas as pd
import re
import seaborn as sns
from collections import Counter
from itertools import chain
import matplotlib.pyplot as plt
import seaborn as sns
from sklearn.linear_model import LinearRegression
from scipy.stats import chisquare
np.random.seed(49)

incorr_data = ["cr09", "cj21", "cc14"]

results = pd.DataFrame(columns=['Text category', "chi2", 'p-value'])
for i, cats in enumerate(all_words):
    np.random.seed(47) #43, 45, 48
    data_cat = np.array(all_words[cats])
    data_count = Counter(data_cat)
    sorted_data_cat = dict(sorted(data_count.items(), key=lambda item: item[1]))
    data_list = np.array(sorted(data_count.values(), reverse=True))
    print(cats, data_list)
    em_model = ZipfMandelbrotMixtureEM(data_list, num_components=2) #ss = [5.96850158, 4.45832753], qs = [1.8343479, 7.79691], ns = [64295, 49360]
    em_model.iterate(max_iter=2, verbose=True)
#     lr_model = LinearRegression()
#     lr_model.fit(np.array(data_list).reshape(len(data_list), 1), data_list)
#     l_result = lr_model.predict(np.array(data_list).reshape(len(data_list), 1))
    result = em_model.mix_pdf()
    print("sum_result: ", sum(result))
    result = np.round(result/sum(result), 10)
    print(result.shape, data_list.shape)
#     print(sum(np.round(data_list, sum(np.round(result/sum(result), 10)), np.round(data_list/sum(data_list), 8)[0], np.round(result, 10)[0])
#     chi1 = chi(data_list, result*sum(data_list))
    chi2, p = chisquare(result*sum(data_list), data_list)
    print("What ", chi2, p)
    new_row = pd.DataFrame({'Text category': [cats], "chi2": [chi2], 'p-value': [p]})
    results = pd.concat([results, new_row], ignore_index=True)
    plt.figure(i)
    sns.ecdfplot(data_list[:500])
    sns.ecdfplot((np.array(result[:500])*sum(data_list)).astype(int))
    plt.title("Эмпирическая плотность распределения")
    plt.ylabel("Значение эмпирической плотности распределения")
    plt.legend(["Истинная ЭПР", "Рассчитанная ЭПР"])
    plt.show()
    plt.figure(i)
    plt.bar(range(1, len(data_list[:250])+1), data_list[:250]/sum(data_list))
#     for i in range(em_model.num_components):
#         z_component = em_model.pdf(np.arange(1, len(data_list[:250])+1), [em_model.qs[i], em_model.ss[i], em_model.ns[0]])
#         plt.plot(range(1, len(data_list[:250])+1), z_component, label=f"Zipf-Mandelbrot {i} component")
    plt.plot(range(len(data_list[:250])), result[:250], label="Zipf-Mandelbrot mixture")
#     plt.plot(range(1, len(data_list[:250])+1), l_result[:250]/sum(data_list), label="Linear Regression")
    plt.xlabel("Ранги слов")
    plt.ylabel("Плотность вероятности")
    plt.legend()
    plt.show()

In [None]:
results.sort_values(by=['p-value', "chi2"], ascending=False)

In [None]:
results[results["p-value"] > 0.05].count()