# Импортируем необходимые библиотеки

In [1]:
import json
import time
import requests
import yaml
import sql_handler
import postgresql_handler
import clickhouse_handler
import docker_service
from datetime import datetime, timezone
preprocessing_done = {}

# Опередляем объём импорта и необходимые объекты импорта

In [3]:
# with open('config.yaml', 'r') as file:
#     config = yaml.safe_load(file)

# database_import_mapper = {
#     'MySQL': sql_handler,
#     'PostgreSQL': postgresql_handler,
#     'ClickHouse': clickhouse_handler
# }
# database_type = config.get('database_type')
# print(database_type)
# if database_type in database_import_mapper:
#     handler = database_import_mapper[database_type]
# else:
#     print('Unsupported database type')
    
# handler.test_connection(**config['db'])

In [2]:
import warnings
has_errors = False

# MARK: MAPPERS
table_settings_mapper = {
    'deal': 'deal_fields',
    'contact': 'contact_fields',
    'company': 'company_fields',
    'lead': 'lead_fields',
}

database_import_mapper = {
    'MySQL': sql_handler,
    'PostgreSQL': postgresql_handler,
    'ClickHouse': clickhouse_handler
}

obligatory_fields = ['ID', 'DATE_CREATE']

# CONFIGURE
with open('config.yaml', 'r') as file:
    config = yaml.safe_load(file)
preprocessing_config = {}
list_of_errors = []
database_type = config.get('database_type')

if database_type in database_import_mapper:
    handler = database_import_mapper[database_type]
else:
    print('Unsupported database type')
    # docker_service.close()

# MARK: LAMBDAS
def smth_to_string(value):
    if value == None:
        return None
    elif value == "$$NULL$$" or not value:
        return ''
    else:
        return value
    
def void_to_nonnull_int(value):
    if value == "$$NULL$$" or not value:
        return 0
    else:
        return int(value)

def void_to_nonnull_float(value):
    if value == "$$NULL$$" or not value:
        return 0.0
    else:
        return float(value)
    
def void_to_nonnull_datetime(value):
    if value == "$$NULL$$" or not value:
        if database_type ==  "MySQL":
            return '0000-00-00 00:00:00'
        return 0
    else:
        
        dt = datetime.strptime(value, '%Y-%m-%dT%H:%M:%S%z')
        if database_type ==  "MySQL":
            return dt.strftime('%Y-%m-%d %H:%M:%S')
        return int(dt.replace(tzinfo=timezone.utc).timestamp())

def smth_to_int(value):
    if value == None:
        return None
    elif value == "$$NULL$$" or not value:
        return 0
    else:
        return int(value)
    
def void_to_null(value):
    if not value:
        return None
    else:
        return value
    
def smth_to_float(value):
    if value == None:
        return None
    elif value == "$$NULL$$" or not value:
        return 0.0
    else:
        return float(value)
    
def string_to_datetime(value):
    if value == None:
        return None
    elif value == "$$NULL$$" or not value:
        return void_to_nonnull_datetime(value)
    else:
        dt = datetime.strptime(value, '%Y-%m-%dT%H:%M:%S%z')
        if database_type ==  "MySQL":
            return dt.strftime('%Y-%m-%d %H:%M:%S')
        return int(dt.replace(tzinfo=timezone.utc).timestamp())

def safe_from_null(value):
    if value == None:
        return "$$NULL$$"
    else:
        return value



# MARK: CHECK CONNECTION
with open('config.yaml', 'r') as file:
    config = yaml.safe_load(file)
handler.test_connection(**config['db'])

# MARK: CHECK ERRORS
for table_type in config['table_names'].keys():
    print(table_type)
    if config['table_names'][table_type]:
        preprocessing_config[table_type] = {}

        # MergeTree check for ClickHouse
        e,d = handler.check_tabletype_errors(config['table_names'][table_type], **config['db'])
        if e:
            list_of_errors.append(d)

        
        columns = handler.get_columns_and_types(
            config['table_names'][table_type], **config['db'])
        dict_of_columns = {column['name']: (
            column['type'], column['nullable']) for column in columns}
        for obligatory_field in obligatory_fields:
            if obligatory_field not in config[table_settings_mapper[table_type]].keys():
                if has_errors == False:
                    has_errors = True
                list_of_errors.append(
                    f"Column {obligatory_field} not found in config.yaml in {table_settings_mapper[table_type]} settings. \
It is obligatory field.")
        # set_of_columns = set([column['name'] for column in columns])
        # print(dict_of_columns)
        for column in config[table_settings_mapper[table_type]]:
            if config[table_settings_mapper[table_type]][column] not in dict_of_columns.keys():
                if has_errors == False:
                    has_errors = True
                list_of_errors.append(
                    f"Column {column} not found in table {table_settings_mapper[table_type]}. Check settings.")
            else:
                nullable = dict_of_columns[config[table_settings_mapper[table_type]][column]][1]
                target_data_type = str(dict_of_columns[config[table_settings_mapper[table_type]][column]][0])
                # print(target_data_type)
                current_preproc = preprocessing_config[table_type].get(
                    column, [])

                # TODO Подумать о других кейсах препроцессинга.
                if nullable:
                    current_preproc.append(void_to_null)
                else:
                    current_preproc.append(safe_from_null) 
                if ("String" in target_data_type) or ('VARCHAR' in target_data_type) or ('TEXT' in target_data_type) or ('CHAR' in target_data_type) or ('TINYTEXT' in target_data_type) or ('MEDIUMTEXT' in target_data_type) or ('LONGTEXT' in target_data_type):
                    current_preproc.append(smth_to_string)
                elif ("Int" in target_data_type) or ("INTEGER" in target_data_type) or ("BIGINT" in target_data_type) or ("TINYINT" in target_data_type) or ("SMALLINT" in target_data_type) or ("MEDIUMINT" in target_data_type) :
                    current_preproc.append(smth_to_int)
                elif ("Float" in target_data_type) or ("Decimal" in target_data_type) or ("DECIMAL" in target_data_type):
                    current_preproc.append(smth_to_float)
                elif ("DateTime" in target_data_type) or ("TIMESTAMP" in target_data_type) or ("DATE" in target_data_type) or ("TIME" in target_data_type) or ("YEAR" in target_data_type):
                    current_preproc.append(string_to_datetime)

                if current_preproc:
                    preprocessing_config[table_type][column] = current_preproc


if has_errors:
    print(*list_of_errors, sep='\n')
    print('Please, check your config.yaml file and database. After that, restart the container.')
    # docker_service.close()

# print(preprocessing_config)


Successfully connected to the database.
deal
lead
company
contact


## Выгружаем данные из Битрикс24

In [3]:
from tqdm import tqdm

basic_params = {}
if config['filter_date']['lower']:
    basic_params['filter[>=DATE_CREATE]'] = config['filter_date']['lower']
if config['filter_date']['upper']:
    basic_params['filter[<=DATE_CREATE]'] = config['filter_date']['upper']
URL = config['b24_key']
result = {}

import warnings
warnings.simplefilter(action='ignore', category=FutureWarning)
for table_type in preprocessing_config.keys():
    print(f'{table_type.capitalize()} import started')
    method = f'crm.{table_type}.list.json'
    params = basic_params.copy()
    for num, param in enumerate(config[table_settings_mapper[table_type]]):
        params[f'select[{num}]'] = param
    # print(params)
    r = requests.get(URL + method, params=params).json()
    if 'result' in r.keys():
        result[table_type] = r['result'].copy()
        k = 1
        total = r['total']
        progress_bar = tqdm(total=total, position=0, leave=True)
        time.sleep(0.5)
        while 'next' in r.keys():
            k += 1
            params['start'] = r['next']
            r = requests.get(URL + method, params=params).json()
            result[table_type] += r['result']
            progress_bar.update(50)
    progress_bar.close()

Deal import started


 97%|█████████▋| 400/414 [00:03<00:00, 111.46it/s]


## Выгружаем получившися массивы в базу

In [4]:
def print_dict(dictionary):
    for key, value in dictionary.items():
        print(f'{key}: {value}', sep='\n')

print_dict(preprocessing_config)

preprocessing_done

for table_type in result.keys():

    # TODO У нас очень должго идеёт импорт данных в БД, из за проверки каждого идентификатора на присутствие в БД
    # TODO Нужно сделать проверку оптимальной

    if not preprocessing_done.get(table_type, False):
        print(f'Preprocessing {table_type} data')
        for row in result[table_type]:
            for column in preprocessing_config[table_type].keys():
                for func in preprocessing_config[table_type][column]:
                    row[column] = func(row[column])

    preprocessing_done[table_type] = True
    print(f'Loading {table_type} data to Database')
    handler.load_data_to_sql(result[table_type],
                            config['table_names'][table_type], 
                            config[f'{table_type}_fields'], 
                            **config['db'])

deal: {'ID': [<function safe_from_null at 0x7fdbcb0d9a60>, <function smth_to_int at 0x7fdbcb0d9820>], 'TITLE': [<function safe_from_null at 0x7fdbcb0d9a60>, <function smth_to_string at 0x7fdbc9dda310>], 'DATE_CREATE': [<function safe_from_null at 0x7fdbcb0d9a60>, <function string_to_datetime at 0x7fdbcb0d99d0>], 'OPPORTUNITY': [<function safe_from_null at 0x7fdbcb0d9a60>, <function smth_to_float at 0x7fdbcb0d9940>], 'UF_CRM_YMCLIENTID': [<function void_to_null at 0x7fdbcb0d98b0>, <function smth_to_int at 0x7fdbcb0d9820>]}
Preprocessing deal data
Loading deal data to Database


 97%|█████████▋| 400/414 [00:14<00:00, 27.00it/s]


## Полезное

In [14]:
columns = handler.get_columns_and_types_sql(
config['table_names']['deal'], **config['db'])
columns

[{'name': 'ID',
  'type': INTEGER(display_width=10, unsigned=True),
  'default': None,
  'comment': None,
  'nullable': False,
  'autoincrement': False},
 {'name': 'TITLE',
  'type': VARCHAR(length=256),
  'default': None,
  'comment': None,
  'nullable': False},
 {'name': 'TYPE_ID',
  'type': VARCHAR(length=20),
  'default': None,
  'comment': None,
  'nullable': True},
 {'name': 'STAGE_ID',
  'type': VARCHAR(length=50),
  'default': None,
  'comment': None,
  'nullable': False},
 {'name': 'PROBABILITY',
  'type': VARCHAR(length=20),
  'default': None,
  'comment': None,
  'nullable': True},
 {'name': 'OPPORTUNITY',
  'type': DECIMAL(precision=20, scale=2),
  'default': None,
  'comment': None,
  'nullable': False},
 {'name': 'LEAD_ID',
  'type': VARCHAR(length=20),
  'default': None,
  'comment': None,
  'nullable': True},
 {'name': 'COMPANY_ID',
  'type': VARCHAR(length=20),
  'default': None,
  'comment': None,
  'nullable': True},
 {'name': 'CONTACT_ID',
  'type': VARCHAR(length=2

In [8]:
type(columns[0]['type'])

sqlalchemy.dialects.mysql.types.INTEGER

In [12]:
import sqlalchemy.dialects.mysql as mysql_dialect

types = dir(mysql_dialect.types)
print(*types, sep='\n')

BIGINT
BIT
CHAR
DATETIME
DECIMAL
DOUBLE
FLOAT
INTEGER
LONGBLOB
LONGTEXT
MEDIUMBLOB
MEDIUMINT
MEDIUMTEXT
NCHAR
NUMERIC
NVARCHAR
REAL
SMALLINT
TEXT
TIME
TIMESTAMP
TINYBLOB
TINYINT
TINYTEXT
VARCHAR
YEAR
_FloatType
_IntegerType
_MatchType
_NumericType
_StringType
__builtins__
__cached__
__doc__
__file__
__loader__
__name__
__package__
__spec__
datetime
exc
sqltypes
util


In [24]:
a = []
b = [1,2]
c = [3,4]
a += b
a += c
a

[1, 2, 3, 4]

In [71]:
import sqlalchemy.dialects.postgresql as postgresql_dialect
import sqlalchemy_clickhouse.base as clickhouse_dialect

types_pg = dir(postgresql_dialect)
types_ch = dir(clickhouse_dialect)

print("PostgreSQL types:")
print(*types_pg, sep='\n')

print("\nClickHouse types:")
print(*types_ch, sep='\n')

PostgreSQL types:
ARRAY
All
Any
BIGINT
BIT
BOOLEAN
BYTEA
CHAR
CIDR
CreateEnumType
DATE
DATERANGE
DOUBLE_PRECISION
DropEnumType
ENUM
ExcludeConstraint
FLOAT
HSTORE
INET
INT4RANGE
INT8RANGE
INTEGER
INTERVAL
Insert
JSON
JSONB
MACADDR
MONEY
NUMERIC
NUMRANGE
OID
REAL
REGCLASS
SMALLINT
TEXT
TIME
TIMESTAMP
TSRANGE
TSTZRANGE
TSVECTOR
UUID
VARCHAR
__all__
__builtins__
__cached__
__doc__
__file__
__loader__
__name__
__package__
__path__
__spec__
aggregate_order_by
array
array_agg
asyncpg
base
compat
dialect
dml
ext
hstore
insert
json
pg8000
psycopg2
psycopg2cffi
pygresql
pypostgresql
ranges

ClickHouse types:
ARRAY
BIGINT
BINARY
BOOLEAN
CHAR
ClickHouseCompiler
ClickHouseDialect
ClickHouseExecutionContext
ClickHouseIdentifierPreparer
ClickHouseTypeCompiler
DATE
DATETIME
DECIMAL
FLOAT
INTEGER
PGCompiler
PGIdentifierPreparer
REAL
SMALLINT
TIME
TIMESTAMP
VARCHAR
VERSION
__builtins__
__cached__
__doc__
__file__
__loader__
__name__
__package__
__spec__
colspecs
compiler
default
dialect
expression
isch

In [70]:
import sys
!{sys.executable} -m pip install sqlalchemy_clickhouse

Collecting sqlalchemy_clickhouse
  Downloading sqlalchemy-clickhouse-0.1.5.post0.tar.gz (13 kB)
Collecting infi.clickhouse_orm>=1.0.0
  Downloading infi.clickhouse_orm-2.1.3-py3-none-any.whl (43 kB)
[K     |████████████████████████████████| 43 kB 117 kB/s eta 0:00:011
Collecting iso8601>=0.1.12
  Downloading iso8601-2.0.0-py3-none-any.whl (7.5 kB)
Building wheels for collected packages: sqlalchemy-clickhouse
  Building wheel for sqlalchemy-clickhouse (setup.py) ... [?25ldone
[?25h  Created wheel for sqlalchemy-clickhouse: filename=sqlalchemy_clickhouse-0.1.5.post0-py3-none-any.whl size=18658 sha256=063d5b73c0a98e88b00f3cbdb85c79512e05f86cbac8436b9ee8baaaeb6d0edd
  Stored in directory: /Users/master/Library/Caches/pip/wheels/f9/8a/7e/b68fa72db2160e97c3338ff739577ac18fe8b6e9361f8d8dc6
Successfully built sqlalchemy-clickhouse
Installing collected packages: iso8601, infi.clickhouse-orm, sqlalchemy-clickhouse
Successfully installed infi.clickhouse-orm-2.1.3 iso8601-2.0.0 sqlalchemy-click