In [1]:
import pandas as pd
import numpy as np
import dask.dataframe as dd
from multiprocessing import Pool
import gc
from dask.distributed import Client
import matplotlib.pyplot as plt
import matplotlib.ticker as ticker 
from collections import Counter
from dask_ml.model_selection import train_test_split
import statsmodels.stats.api as sms
from scipy import stats as st
import pymc as pm
import requests 
from urllib.parse import urlencode 



# Считывание и процессинг

In [2]:
# используем api 
base_url = 'https://cloud-api.yandex.net/v1/disk/public/resources/download?' 
public_key = 'https://disk.yandex.ru/d/iGDvY2cr0w8ZJA'

In [3]:
# получаем url
final_url = base_url + urlencode(dict(public_key=public_key)) 
response = requests.get(final_url) 
download_url = response.json()['href'] 

In [None]:
client = Client(memory_limit='4GB')
client

# Определение типов данных для столбцов
dtypes = {
    'id': 'int64',
    'name': 'object',
    'amount': 'float64',
    'date_time': 'object',  
    'approved': 'object',  
    'comment': 'object'
}

# чтение файла CSV в Dask DataFrame с указанными типами данных
try:
    ddf = dd.read_csv(download_url, dtype=dtypes, na_values = [''])
    print("Файл CSV успешно прочитан.")
    print(ddf.head())
    
    # удаление пустых/ na строк  -- нам можно оставить пустые строки только в комментариях 
    # в остальных столбцах их лучше убрать 
    
    ddf = ddf.dropna(how='all')
    ddf = ddf.dropna(subset=['id','name','amount','date_time','approved']).reset_index(drop=True)

    # Переведем approved в правильный тип
    ddf['approved'] = ddf['approved'].astype('bool')

    
    # удаление дублей
    ddf = ddf.drop_duplicates(split_out=ddf.npartitions)

    
    # преобразование столбца date_time в datetime формат
    ddf['date_time'] = dd.to_datetime(ddf['date_time'], format='%Y-%m-%d %H:%M:%S')

    # добавление столбца 
    ddf['hour'] = ddf['date_time'].apply(lambda x:x.hour, meta=('date_time', 'int64'))

    # удалить записи в промежутке от 1 до 3 часов ночи
    ddf = ddf[(ddf['hour']>3)|(ddf['hour']<1)]
    
    # сохранение результата 
    ddf.to_parquet('work_direct_1')
    

except Exception as e:
    print(f"Ошибка при обработке файла: {e}")

# Закрытие клиент
client.close()


Файл CSV успешно прочитан.
   id             name    amount            date_time approved  \
0   1    Carl Campbell  342525.0  2021-09-16 13:47:00     True   
1   2        Ann Noble  247819.0  2019-07-30 22:31:22     True   
2   3  Jennifer Cooper  878271.0  2022-06-10 09:48:36     True   
3   4     Tracy Murphy  495827.0  2022-07-03 02:30:00    False   
4   5   Zachary Morton  559102.0  2020-11-26 00:21:37     True   

                                          comment  
0               Throughout he weight stand least.  
1                       Theory live office great.  
2     Plan forward decision tonight officer such.  
3  Out ago daughter movement improve table spend.  
4               But book cold like popular spend.  


# Расчет метрик
Агрегация по времени, для каждого часа рассчитать

In [None]:
client = Client(memory_limit='4GB')
client

# чтение файлов parquet в Dask DataFrame 
try:
    df = dd.read_parquet('work_direct_1')
    print("Файлы успешно загружены.")
    
    # Агрегация по времени, для каждого часа рассчитать
    # кол-во уникальных string
    name_n = df.groupby(['hour']).name.nunique(split_out=df.npartitions).to_frame().reset_index()
    name_n.columns = ['hour', 'name_nuniq']
    comment_n = df.groupby(['hour']).comment.nunique(split_out=df.npartitions).to_frame().reset_index()
    comment_n.columns = ['hour', 'comment_nuniq']
    
    # среднее и медиану для numeric
    num_mean = df.groupby(['hour']).amount.mean(split_out=df.npartitions).to_frame().reset_index()
    num_mean.columns = ['hour', 'amount_mean']
    num_median = df.groupby(['hour']).amount.median(split_out=df.npartitions).to_frame().reset_index()
    num_median.columns = ['hour', 'amount_median']

    part_1 = name_n.merge(comment_n, on = 'hour', how = 'outer')
    part_2 = part_1.merge(num_mean, on = 'hour', how = 'outer') 
    metrics = part_2.merge(num_median, on = 'hour', how = 'outer').compute()
except Exception as e:
    print(f"Ошибка при обработке файла: {e}")

# Закрытие клиент
client.close()


SQL запрос для выполнения подобных расчетов напрямую в базе данных. 

In [None]:
# sql
#
# SELECT EXTRACT(HOUR FROM CAST(b.date_time AS date)) AS HOUR,
#        COUNT(DISTINCT(b.name)) AS count_uniq_name,
#	     COUNT(DISTINCT(b.comment)) AS count_uniq_comment,
#        PERCENTILE_CONT(0.5) WITHIN GROUP (ORDER BY b.amount) OVER 
#        (PARTITION  BY EXTRACT(HOUR FROM CAST(b.date_time AS date))) AS median_amount,
#        AVG(amount) AS mean_amount
# FROM baza AS b
# GROUP BY EXTRACT(HOUR FROM CAST(b.date_time AS date));


# Мерж с метриками
К каждой строке в исходном датасете примержить метрики ближайшего часа рассчитанные в предыдущем шаге

In [None]:
client = Client(memory_limit='4GB')
client

try:
    
    # К каждой строке в исходном датасете примержить метрики 
    # ближайшего часа рассчитанные в предыдущем шаге
    dfm = df.merge(metrics, on='hour', how = 'left')
    dfm.to_parquet('work_direct_2')

except Exception as e:
    print(f"Ошибка при обработке файла: {e}")

# Закрытие клиент
client.close()


# Аналитические метрики
Для колонки numeric по полному датасету построить

• Гистограмму

In [None]:
client = Client(memory_limit='4GB')
client

fig, ax = plt.subplots(figsize=(8, 5))
plt.hist(df['amount'].compute(), bins=1000)
plt.ylim(84000,91000)
xlabels = ['{:,.2f}'.format(x) + 'K' for x in ax.get_xticks()/1000]
ax.set_xticklabels(xlabels)
plt.title('Гистограмма по полю amount')
plt.xlabel('Значения')
plt.ylabel('Количество')
plt.show()

client.close()

Нам известно, что для генеральной совокупности, которой в данном случае является наш датасет, 95% всех выборочных средних разместятся вокруг настоящего среднего ГС в диапазоне ± 1,96 стандартного отклонения.
Поэтому легко найти среднее и стандартное отклонение, и на основе них рассчитать 95% доверительный интревал.

In [None]:
client = Client(memory_limit='4GB')
client

std_deviation = df['amount'].std().compute()
ma = df['amount'].mean().compute()
min_board = ma - (1.96 * std_deviation)
max_board = ma + (1.96 * std_deviation)
di = min_board, max_board 

client.close()

print(f'95% доверительный интервал: {di}')

# Визуализация

Отрисовать график среднего значения numeric колонки (y) по месяцам (x).

In [None]:
client = Client(memory_limit='4GB')
client

df['month'] = df['date_time'].apply(lambda x:x.month, meta=('date_time', 'int64'))
gm = df.groupby('month').amount.mean().to_frame().reset_index().compute()

client.close()

In [None]:
month_map = {1: 'Январь', 2: 'Февраль',
             3: 'Март', 4:  'Апрель', 5: 'Май',
             6: 'Июнь', 7: 'Июль', 8: 'Август', 9: 'Сентябрь',
            10: 'Октябрь',11: 'Ноябрь',12: 'Декабрь'}

gm['month'] = gm['month'].map(month_map)

In [None]:
fig = plt.figure(figsize =(12, 7))
 
# Горизонтальный Bar Plot

plt.bar(gm['month'], gm['amount'], color = '#62639B')
plt.ylim(490000,505000) 

# Рисуем 

plt.title('Распределение среднего значения "amount" по месяцам',fontsize=18)
plt.ylabel('Значение "amount"')
plt.xlabel('Месяц')
plt.legend(['amount'])
ax = plt.gca()
ax.grid(True,color ='#d7d7d7')
ax.set_facecolor('#F8F8FF')
ax.set_xticks(gm['month'])
plt.show()

Heatmap по частотности символов в колонке string

In [None]:
client = Client(memory_limit='4GB')
client

# Функция для подсчета частот символов в одной партиции
def count_characters(partition):
    # Удаление пропущенных значений
    partition = partition.dropna()
    all_strings = ''.join(partition['comment'])
    
    frequency_dict = {}
    for char in all_strings:
        if char in frequency_dict:
            frequency_dict[char] += 1
        else:
            frequency_dict[char] = 1
    return pd.DataFrame(list(frequency_dict.items()), columns=['Символ', 'Частотность'])

# Применение функции ко всему DataFrame
char_counts = df.map_partitions(count_characters).compute()

# Объединение результатов подсчета из всех партиций
total_counts = char_counts.groupby('Символ')['Частотность'].sum().reset_index()

# Конвертация в DataFrame для heatmap
freq_df = total_counts.sort_values(by='Частотность', ascending=False)

client.close()

In [None]:
# Подготовка данных для heatmap
characters = freq_df['Символ'].values
frequencies = freq_df['Частотность'].values

# Создание heatmap
fig, ax = plt.subplots(figsize=(10, 8))
heatmap = ax.imshow(frequencies.reshape(-1, 1), cmap='viridis', aspect='auto')

# Настройка осей
ax.set_yticks(np.arange(len(characters)))
ax.set_yticklabels(characters)
ax.set_xticks([])

# Добавление цветовой шкалы
cbar = ax.figure.colorbar(heatmap, ax=ax)
cbar.ax.set_ylabel('Частотность', rotation=-90, va="bottom")

# Добавление подписей
for i in range(len(characters)):
    text = ax.text(0, i, frequencies[i], ha="center", va="center", color="w")

plt.title('Частотность символов Heatmap')
plt.show()

# Доп. задание
    1. Случайно поделить датасет на 3 части - в одной 25% записей, во второй 25% и 50% в третьей.
    2. Проверить на статистическую значимость различий для среднего по колонке numeric
    3. Оценить силу эффекта

In [None]:
df = dd.read_parquet('work_direct_1')
print("Файлы успешно загружены.")

In [None]:
# Делим датасет на 50% и 50%
df_50, df_temp = train_test_split(df, test_size=0.5, random_state=42, shuffle=True)

# Делим временный датасет на 25% и 25%
df_25_1, df_25_2 = train_test_split(df_temp, test_size=0.5, random_state=42, shuffle=True)

In [None]:
client = Client(memory_limit='4GB')
client

s_1 = df_25_1['amount'].compute()
s_2 = df_25_2['amount'].compute()
s_3 = df_50['amount'].compute()


client.close() 

In [None]:
alpha = 0.05 # уровень статистической значимости

print('Для df_25_1 и df_25_2:')

results = st.ttest_ind(s_1, s_2)

print('p-значение:', results.pvalue)

if results.pvalue < alpha:
    print('Отличия статистически значимы')
else:
    print('Отличия статистически не значимы')

print('Для df_25_1 и df_50:')

results_1 = st.ttest_ind(s_1, s_3)

print('p-значение:', results_1.pvalue)

if results.pvalue < alpha:
    print('Отличия статистически значимы')
else:
    print('Отличия статистически не значимы')

print('Для df_25_2 и df_50:')

results_1 = st.ttest_ind(s_2, s_3)

print('p-значение:', results_1.pvalue)

if results.pvalue < alpha:
    print('Отличия статистически значимы')
else:
    print('Отличия статистически не значимы')

In [None]:
client = Client(memory_limit='4GB')
client

size_1 = df_25_1.shape[0].compute()
size_2 = df_50.shape[0].compute()

sigma_1 = df_25_1['amount'].std().compute()
sigma_2 = df_50['amount'].std().compute()


client.close() 

In [None]:
# расчет мощности
effect_size = (s_1.mean() - s_2.mean()) / sigma_1
power_analysis = sms.TTestPower()
power = power_analysis.solve_power(effect_size=effect_size, nobs=size_1, alpha=alpha, alternative='two-sided')

print(f"Мощность теста для одинаковых выборок: {power:.4f}")

In [None]:
effect_size_2 = (s_1.mean() - s_3.mean())/np.sqrt(((size_1- 1) * sigma_1**2 + (size_2 - 1) * sigma_2**2) / (size_1 + size_2 - 2))

analysis = sms.TTestIndPower()
power = analysis.solve_power(effect_size=effect_size_2, nobs1=size_1, alpha=alpha, ratio=size_2/size_1, alternative='two-sided')

print(f'Мощность теста для маленькой и большой выборки: {power:.4f}')