### **Блок импорта библиотек, подключения к GDrive**

In [None]:
import glob
import requests
import datetime
import pandas as pd
from io import StringIO 
from IPython.display import display  # Для удобного вывода df (вместо print)
from google.oauth2 import service_account

В рамках конкретной задачи нужно создать на своем Google Drive папку с файлами для проекта, где вы разместите датасеты, сертификаты для доступа к БД клиента, readme, другие инструкции и т.д. (в настоящей инструкции это папка **Colab_Notebooks**).

In [None]:
# Подключаем Google Drive к Google Сolab (при работе с файлами на собственном GDrive)
# Можно иначе, подробнее здесь: https://colab.research.google.com/notebooks/io.ipynb

from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


### **Блок формирования класса, где описываем набор функций для работы с CH**

In [None]:
class simple_ch_client():
    
    # Инициализация

    def __init__(self, CH_HOST, CH_USER, CH_PASS, cacert):
        self.CH_HOST = CH_HOST
        self.CH_USER = CH_USER
        self.CH_PASS = CH_PASS
        self.cacert = cacert

    # Проверка подключения к CH (в случае успеха выводит в результате используемую версию CH, например, 22.5.4.19)

    def get_version(self):
        url = '{host}/?database={db}&query={query}'.format(
                host=self.CH_HOST,
                db='default',
                query='SELECT version()')

        auth = {
                'X-ClickHouse-User': self.CH_USER,
                'X-ClickHouse-Key': self.CH_PASS,
            }

        rs = requests.get(url, headers=auth, verify=self.cacert)
        # 
        rs.raise_for_status()

        print(rs.text)

    # SQL-запрос в CH
    
    def get_clickhouse_data(self, query, connection_timeout = 1500):
        r = requests.post(self.CH_HOST, params = {'query': query, 'user': self.CH_USER, 'password':self.CH_PASS}, timeout = connection_timeout, verify=self.cacert)
        if r.status_code == 200:
            return r.text
        else:
            raise ValueError(r.text)

    # Выгрузка данных из CH в формате pandas dataframe

    def get_clickhouse_df(self, query, columns_name, connection_timeout = 1500):
        data = self.get_clickhouse_data(query, connection_timeout=connection_timeout) 
        df = pd.read_csv(StringIO(data), names = columns_name, index_col=False, sep = '\t')
        return df

    # Загрузка данных в CH клиента
    
    def upload(self, table, content, data_format='TabSeparatedWithNames'): #CSVWithNames TabSeparatedWithNames JSONEachRow
        content = content.encode('utf-8')
        query_dict = {
                'query': 'INSERT INTO {table} FORMAT {data_format} '.format(table=table, data_format=data_format),
                'user': self.CH_USER, 
                'password':self.CH_PASS,
                'input_format_import_nested_json': 1
            
            }
        r = requests.post(self.CH_HOST, data=content, params=query_dict, verify=self.cacert)
        result = r.text
        if r.status_code == 200:
            return result
        else:
            raise ValueError(r.text)

### **Блок ввода данных для доступа и проверки подключения к CH**

Данные для доступа (CH_HOST_NAME, CH_USER, CH_PASS, CH_DB_NAME) необходимо запросить у своего руководителя или аккаунт-менеджера 

In [None]:
Shard1a = 'example1.mdb.yandexcloud.net' # Имя хоста в CH, к которому хотим подключиться (вариант №1)
Shard1b = 'example2.mdb.yandexcloud.net' # Имя хоста в CH, к которому хотим подключиться (вариант №2)

#----------Вводим данные доступов в переменные CH_USER, CH_PASS, CH_DB_NAME--------------
CH_HOST_NAME =  Shard1b #выбираем имя хоста
CH_USER      = 'example****' #вводим имя пользователя
CH_PASS      = 'exa*******' #вводим пасс
CH_DB_NAME   = 'example*****' #вводим имя датасета
CH_HOST      = f'https://{CH_HOST_NAME}:8443'
CH_CASERT    = r'/content/drive/MyDrive/Colab_Notebooks/CA.pem' #пропишите путь к сертификату CA.pem (сам сертификат можно скачать на сайте ClickHouse либо запросить инфо у аккаунт-менеджера или руководителя )

my_client = simple_ch_client(CH_HOST, CH_USER, CH_PASS, CH_CASERT)
my_client.get_version()

22.5.4.19



### **Блок удаления/создания таблиц в CH**

In [None]:
# Удаление таблицы из CH
sql = f'drop table if exists example_costs.costs_YD_tmp_test_kr'
my_client.get_clickhouse_data(sql)


# Создание пустой таблицы в CH (формирование нейминга столбцов необходимо осуществить исходя из требований вашей задачи)
sql = '''
CREATE TABLE example_costs.costs_YD_tmp_test_kr
(
    `date` Date,
    `source` Nullable(String),
    `medium` Nullable(String),
    `campaign` Nullable(String),
    `CampaignType` Nullable(String),
    `CampaignId` Nullable(String),
    `AdGroupName` Nullable(String),
    `AdGroupId` Nullable(String),
    `AdId` Nullable(String),
    `CriterionType` Nullable(String),
    `Criterion` Nullable(String),
    `CriterionId` Nullable(String), 
    `views` Nullable(UInt32),
    `clicks` Nullable(UInt32),
    `cost` Nullable(Int32),
    `cost_nds` Nullable(Int32),
    `cost_with_nds_AK` Nullable(Int32),
    `product` Nullable(String)

)
ENGINE = MergeTree
PARTITION BY toYYYYMM(toDate(date))
ORDER BY toDate(date)
'''
my_client.get_clickhouse_data(sql)

''

### **Блок выгрузки таблиц из CH в формате pandas dataframe** 

In [None]:
# Формируем переменную с набором имен столбцов для датафрейма, который планируем выгружать из CH

cols_df = ['date','source', 'medium','campaign', 'CampaignType', 'CampaignId', 'AdGroupName', 'AdGroupId', 'AdId', 'CriterionType', 'Criterion', 'CriterionId', 'views', 'clicks', 'cost', 'cost_nds', 'cost_with_nds_AK', 'product']

# Пишем запрос, получаем данные из CH в формате pandas dataframe
query = """
SELECT*    
FROM example_costs.costs_YD_tmp_test_kr
"""
df_ch = my_client.get_clickhouse_df(query, columns_name=cols_df)

### **Блок выгрузки таблиц из BigQuery в формате pandas dataframe**

In [None]:
# Прописываем адрес к файлу с данными по сервисному аккаунту и получаем credentials для доступа к данным
credentials = service_account.Credentials.from_service_account_file(
    'example.json')

# Указываем project_id в BQ

project_id = 'example-152714'

In [None]:
# Пишем запрос, получаем данные из BQ в формате pandas dataframe
query = '''
SELECT *
FROM
    `example-152714`.`example_project`.`af_purchase_events`
'''

df_bq = pd.read_gbq(query, project_id=project_id, credentials=credentials)

### **Блок выгрузки таблиц (форматы .xls, .xlsx, .csv) из GDrive или локального диска с последующим преобразованием данных**  

*в настоящей инструкции:*

*- таблицы предварительно были загружены на GDrive*

*- в качестве примера приведены данные по расходам на Яндекс-Директ, банковский продукт РКО*


In [None]:
# Создаем переменную, которая содержит названия столбцов подгружаемой таблицы костов (xls, xlsx, csv)

cols = ['date',
        'source',
        'medium',
        'campaign',
        'CampaignType',
        'CampaignId',
        'AdGroupName',
        'AdGroupId',
        'AdId',
        'CriterionType',
        'Criterion',
        'CriterionId', 
        'views',
        'clicks',
        'cost',
        'cost_nds',
        'cost_with_nds_AK']

# Функция для открытия таблицы костов (.xls, .xlsx, .csv) в формате pandas dataframe 

def read_files_YD(file_path, product, cols) -> pd.DataFrame:
    tmp = pd.read_excel(file_path, thousands=' ') # при считывании таблиц в формате .csv используем функцию pd.read_csv()   
    tmp = tmp.iloc[:, :17]
    tmp.columns = cols
    
    tmp['date'] = tmp['date'].astype('datetime64[ns]')
    tmp['views'] = tmp['views'].fillna(0).astype('int')
    tmp['clicks'] = tmp['clicks'].fillna(0).astype('int')    
    tmp[['source', 'medium', 'campaign', 'CampaignType', 'CampaignId', 'AdGroupName', 'AdGroupId', 'AdId', 'CriterionType', 'Criterion', 'CriterionId']] = tmp[['source', 'medium', 'campaign', 'CampaignType', 'CampaignId', 'AdGroupName', 'AdGroupId', 'AdId', 'CriterionType', 'Criterion', 'CriterionId']].fillna('')
    tmp['product'] = product
    
    return tmp

In [None]:
# Формируем перечень таблиц костов (файлов), которые предварительно были размещены в папке на GDrive (в настоящей инструкции это папка Colab_Notebooks) или локальном диске

file = glob.glob(r'/content/drive/MyDrive/Colab_Notebooks/*.xlsx') #прописать путь к папке с таблицами костов, указав нужный формат файлов
files = pd.DataFrame({'file': file})
files['product'] = None

# Указываем правило, по которому будем определять к какому банковскому продукту относится та или иная найденная таблица костов (файлы)

for i in range(len(files)):
    if 'Debit_Cards' in files['file'][i]:
        files['product'][i] = 'Debit_Cards'
        
    elif 'Invest' in files['file'][i]:
        files['product'][i] = 'Invest'
        
    elif 'Ipoteka' in files['file'][i]:
        files['product'][i] = 'Ipoteka'
        
    elif 'Credit_Cards' in files['file'][i]:
        files['product'][i] = 'Credit_Cards'

    elif 'Startup' in files['file'][i]:
        files['product'][i] = 'Startup'

    elif 'RegBiz' in files['file'][i]:
        files['product'][i] = 'RegBiz'

    elif 'PIL_knpz' in files['file'][i]:
        files['product'][i] = 'PIL_knpz' 
        
    elif 'PIL' in files['file'][i]:
        files['product'][i] = 'PIL'
        
    elif 'RKO_sbp' in files['file'][i]:
        files['product'][i] = 'RKO_sbp'
    
    elif 'RKO_acq' in files['file'][i]:
        files['product'][i] = 'RKO_acq'
        
    elif 'RKO_ved' in files['file'][i]:
        files['product'][i] = 'RKO_ved'        
        
    elif 'RKO_garant' in files['file'][i]:
        files['product'][i] = 'RKO_garant'     

    elif 'RKO_kdb' in files['file'][i]:
        files['product'][i] = 'RKO_kdb'  

    elif 'RKO' in files['file'][i]:
        files['product'][i] = 'RKO'                    

In [None]:
# Проверяем чтобы каждой таблице костов (файлу) соответствовал банковский продукт
files

In [None]:
# Обработка найденых таблиц костов (файлов) - пишем все данные в единый датафрейм
df = pd.DataFrame()

for i in range(len(files)):
    print(files['product'][i])
    tmp = read_files_YD(files['file'][i], files['product'][i], cols)
    
    
    df = df.append(tmp)
    #df = df.concat(tmp)

RKO


In [None]:
# Уточняем типы данных, содержащихся в датафрейме

df['cost'] = df['cost'].astype('float').round().astype('int64')
df['cost_nds'] = df['cost_nds'].astype('float').round().astype('int64')
df['cost_with_nds_AK'] = df['cost_with_nds_AK'].astype('float').round().astype('int64')

In [None]:
# Проверяем итоговый датафрейм

df.info()

### **Загрузка итогового датафрейма в CH клиента**

In [None]:
my_client.upload(
    f'example_costs.costs_YD_tmp_test_kr',
    df.to_csv(index = False, sep = '\t', line_terminator = '\n'))    

''