# Работа с данными в Clickhouse. Кластеризация

Для удобства уберем лимит столбцов, отображающихся при выводе на экран DataFrame.

In [None]:
import pandas as pd
pd.options.display.max_columns = None

In [None]:
from google.colab.data_table import DataTable
DataTable.max_columns = 1000

In [None]:
from google.colab import drive
drive.mount('drive')

Drive already mounted at drive; to attempt to forcibly remount, call drive.mount("drive", force_remount=True).


Установим необходимые зависимости.

In [None]:
!pip install clickhouse-driver -q
!pip install dask[dataframe] -q
!pip install dask_ml -q

In [None]:
import dask.dataframe as dd

Создадим клиент, с помощью которого мы будем взаимодействовать с Clickhouse.


DaskClient — созданный мной класс наследник класса Client из clickhouse-driver. Он содержит методы, обеспечивающие интеграцию Dask и clickhouse-driver.

In [None]:
import sys
sys.path.append("/content/drive/MyDrive/Интеллектуальный анализ данных в бизнесе 2 Гурьянов 11-909/Задача 2")
from dfclient import DfClient, connection_parameters_from_json
from daskclient import DaskClient

#client = DfClient('2.tcp.eu.ngrok.io', port='13172', user='default', password='default', database='default', dask=True)
#client1 = DfClient('2.tcp.eu.ngrok.io', port='13172', user='default', password='default', database='default')
client = DaskClient(**connection_parameters_from_json("/content/drive/MyDrive/Интеллектуальный анализ данных в бизнесе 2 Гурьянов 11-909/Задача 2/ClickhouseConnectionParameters.json"))

Чтобы не изменять исходную таблицу с данными, создадим новую таблицу.

In [None]:
client.execute('DROP TABLE IF EXISTS fns')

[]

In [None]:
# TODO for real launch remove limit
client.execute("""
CREATE TABLE IF NOT EXISTS fns
    ENGINE = MergeTree()
    ORDER BY totalSum
    AS SELECT * FROM fns_fiscal
""")

[]

Получим первые 10 элементов таблицы.

In [None]:
client.select_all_columns('fns_fiscal', 'LIMIT 10').head(10)

Unnamed: 0_level_0,protocolVersion,operationType,shiftNumber,totalSum,ecashTotalSum,nds10,nds18,nds20,dateTime,requestNumber,taxationType,receiptCode,internetSign,senderAddress,buyerAddress,userProperty_key,userProperty_value,sellerAddress,fiscalDocumentFormatVer,paymentAgentType,propertiesUser_propertyName,retailAddress,code,propertiesUser_key,propertiesUser_value
npartitions=1,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1,Unnamed: 8_level_1,Unnamed: 9_level_1,Unnamed: 10_level_1,Unnamed: 11_level_1,Unnamed: 12_level_1,Unnamed: 13_level_1,Unnamed: 14_level_1,Unnamed: 15_level_1,Unnamed: 16_level_1,Unnamed: 17_level_1,Unnamed: 18_level_1,Unnamed: 19_level_1,Unnamed: 20_level_1,Unnamed: 21_level_1,Unnamed: 22_level_1,Unnamed: 23_level_1,Unnamed: 24_level_1,Unnamed: 25_level_1
0,object,object,int64,int64,int64,int64,int64,int64,object,int64,object,object,object,object,object,object,object,object,object,object,object,object,object,object,object
9,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...


Получим список столбцов, содержащихся в таблице.

In [None]:
client.columns('fns')

['protocolVersion',
 'operationType',
 'shiftNumber',
 'totalSum',
 'ecashTotalSum',
 'nds10',
 'nds18',
 'nds20',
 'dateTime',
 'requestNumber',
 'taxationType',
 'receiptCode',
 'internetSign',
 'senderAddress',
 'buyerAddress',
 'userProperty_key',
 'userProperty_value',
 'sellerAddress',
 'fiscalDocumentFormatVer',
 'paymentAgentType',
 'propertiesUser_propertyName',
 'retailAddress',
 'code',
 'propertiesUser_key',
 'propertiesUser_value']

In [None]:
#client.execute('ALTER TABLE fns DROP COLUMN IF EXISTS ndsNoFraction')

[]

Создадим новые признаки.

Создаваемые столбцы имеют тип ALIAS. Значения в этих столбцах не хранятся в базе данных физически, а вычисляются на основе заданного выражения.

Столбец cashTotalSum будет содержать сумму, оплаченную наличными. Этот столбец ранее, на этапе предобработки, удален.

In [None]:
#client.execute('ALTER TABLE fns ADD COLUMN сashTotalSum Int32 ALIAS totalSum - ecashTotalSum')
#client.execute('ALTER TABLE fns ADD COLUMN cashTotalSum UInt32 ALIAS totalSum - ecashTotalSum')
client.create_column('fns', 'cashTotalSum', 'UInt32', 'totalSum - ecashTotalSum')

Этот столбец будет означать, был ли в транзакции НДС или нет.

In [None]:
#client.execute('ALTER TABLE fns ADD COLUMN transactionWithoutNds ALIAS if(nds10 + nds18 + nds20 = 0, 1, 0)')
client.create_column('fns', 'transactionWithoutNds', 'UInt8', 'if(nds10 + nds18 + nds20 = 0, 1, 0)')

Следующие столбцы будут содержать долю транзакции, облагаемую, соответственно, 10, 18 и 20-процентным НДС, и долю, не облагаемую НДС.

In [None]:
#client.execute('ALTER TABLE fns ADD COLUMN nds10Fraction ALIAS (nds10 / 0.1) / totalSum')
client.create_column('fns', 'nds10Fraction', 'Float32', '(nds10 / 0.1) / totalSum')

In [None]:
#client.execute('ALTER TABLE fns ADD COLUMN nds18Fraction ALIAS (nds18 / 0.18) / totalSum')
client.create_column('fns', 'nds18Fraction', 'Float32', '(nds18 / 0.18) / totalSum')

In [None]:
#client.execute('ALTER TABLE fns ADD COLUMN nds20Fraction ALIAS (nds20 / 0.2) / totalSum')
client.create_column('fns', 'nds20Fraction', 'Float32', '(nds20 / 0.2) / totalSum')

In [None]:
#client.execute('ALTER TABLE fns ADD COLUMN ndsNoFraction ALIAS (totalSum - nds10Fraction - nds18Fraction - nds20Fraction) / totalSum')
client.create_column('fns', 'ndsNoFraction', 'Float32', '1 - nds10Fraction - nds18Fraction - nds20Fraction')

In [None]:
#import math

#a = math
#a.ceil(5.3)

Загрузим данные из Clickhouse в Dask, чтобы провести кластеризацию.

Чтобы убедиться в том, что данные загрузились и столбцы были созданы, выведем на экран выборку загруженных данных.

In [None]:
data = client.select_all_columns('fns')
data.head(100)

Unnamed: 0,protocolVersion,operationType,shiftNumber,totalSum,ecashTotalSum,nds10,nds18,nds20,dateTime,requestNumber,taxationType,receiptCode,internetSign,senderAddress,buyerAddress,userProperty_key,userProperty_value,sellerAddress,fiscalDocumentFormatVer,paymentAgentType,propertiesUser_propertyName,retailAddress,code,propertiesUser_key,propertiesUser_value,cashTotalSum,transactionWithoutNds,nds10Fraction,nds18Fraction,nds20Fraction,ndsNoFraction
0,Missing,1,52,3395,813,153,117,243,2020-09-14,1135,8,3.0,Missing,Missing,Missing,Missing,Missing,Missing,Missing,Missing,Missing,Missing,Missing,Маршрут,ВЛАДИМИР (ЮГО-ЗАПАДНАЯ) Ц,2582,0,0.450663,0.191458,0.357879,1.490116e-08
1,Missing,1,391,3518,3271,38,190,360,2020-09-16,487,1,3.0,1.0,Missing,+79951251890,Missing,Missing,support@go.yandex.com,2.0,64.0,trust_purchase_token,Missing,Missing,Missing,Missing,247,0,0.108016,0.300044,0.511654,8.028550e-02
2,Missing,1,251,3809,3351,33,305,276,2020-09-16,943,1,3.0,Missing,Missing,Missing,Missing,Missing,Missing,Missing,Missing,Missing,Missing,Missing,Missing,Missing,458,0,0.086637,0.444853,0.362300,1.062104e-01
3,2.0,1,482,4385,3943,13,584,202,2020-09-14,529,8,3.0,Missing,Missing,Missing,Missing,Missing,Missing,Missing,Missing,Missing,Missing,Missing,Missing,Missing,442,0,0.029647,0.739896,0.230331,1.266841e-04
4,Missing,1,22,4559,1438,30,249,575,2020-09-14,1037,8,3.0,Missing,Missing,Missing,Missing,Missing,Missing,2.0,Missing,Missing,Missing,Missing,Маршрут,ВЛАДИМИР (ЮГО-ЗАПАДНАЯ) Ц,3121,0,0.065804,0.303429,0.630621,1.461878e-04
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
95,Missing,1,51,8647,471,115,1010,377,2020-09-14,2453,1,3.0,Missing,Missing,Missing,Missing,Missing,Missing,2.0,Missing,Missing,Missing,Missing,Missing,Missing,8176,0,0.132994,0.648908,0.217995,1.027882e-04
96,Missing,1,46,8665,6250,265,154,801,2020-09-14,5513,1,3.0,Missing,noreply@ofd.ru,vpolivchuk@mail.ru,Missing,Missing,Missing,Missing,Missing,Missing,Missing,Missing,Missing,Missing,2415,0,0.305828,0.098737,0.462204,1.332307e-01
97,Missing,1,317,8687,7894,142,313,119,2020-09-14,223,8,3.0,Missing,Missing,Missing,Missing,Missing,Missing,Missing,Missing,Missing,Missing,Missing,Missing,Missing,793,0,0.163463,0.200171,0.068493,5.678728e-01
98,Missing,1,47,8697,1587,349,551,429,2020-09-14,19,1,3.0,Missing,Missing,Missing,Missing,Missing,Missing,2.0,Missing,Missing,Missing,Missing,Missing,Missing,7110,0,0.401288,0.351973,0.246637,1.021922e-04


Проведем One-Hot кодирование данных.

In [None]:
data1 = data.categorize()
df = dd.get_dummies(data1)

In [None]:
from dask_ml.cluster.spectral import SpectralClustering
from dask_ml.cluster import KMeans

sc = KMeans()
sc.fit(df.to_dask_array(lengths=True))
sc_result = sc.predict(df.to_dask_array(lengths=True))
np.array(sc_result)