In [None]:
import pandas as pd
import os
import pyodbc
import shutil
from datetime import datetime
from tqdm import tqdm

In [None]:
encd = 'utf-8'
delim = '\t'
table_name = 'schema.table_name'

In [None]:
#Блок для просмотра фрагмента файла-образца
print(f'in/{os.listdir("in")[0]}')
with open(f'in/{os.listdir("in")[0]}', encoding = encd) as sample:
    i = 5
    for line in sample:
        print(line)
        i-=1
        if not i:
            break

In [None]:
ceiling = 2**63-1

def convert_int(x):
    if pd.isnull(x):
        return 0
    if x[0] == '0' and len(x)>1:
        raise ValueError
    if abs(int(x)) >= ceiling:
        raise ValueError
    return int(x)
    
def convert_float(x):
    if pd.isnull(x):
        return None
    if len(x) > 1:
        if x[0] == 0 and x[1] != '.': 
            #два условия стоят раздельно, чтобы не вызывать исключений
            raise ValueError
    return float(x)
    
def convert_date(x):
    if pd.isnull(x):
        return None
    return datetime.strptime(x,'%Y-%m-%d %H:%M:%S')

In [None]:
class SampleTable:
    def __init__(self):
        self.df = None #DataFrame-образец
        self.dtypes = dict() #типы данных Python
        self.dt_cols = list() #список колонок с датами
        self.sql_types = dict() #типы данных SQL
    #----------#
    def take_sample_df(self,file):
        self.df = pd.read_csv(f'in/{file}', encoding = encd, sep = delim, dtype = str)
        self.convert_columns() #метод для анализа типов Python
        self.compile_sql_types() #метод для анализа типов SQL
        self.col_names = self.df.columns.tolist()
        #после обработки файла-образца перемещаем его в папку out
        shutil.move(f'in/{file}',f'out/{file}') 
    #----------#
    def convert_columns(self):
        for column in self.df.columns:
            #если колонка пустая, то помечаем ее как строчный тип
            if not any(self.df[column]): 
                self.dtypes[column]=str
                continue
            #Сначала пробуем конвертировать значения в даты
            try:
                self.df[column] = self.df[column].apply(convert_date)
                self.dtypes[column]=datetime
                self.sql_types[column]='datetime'
            except ValueError:
                pass
            else:
                continue
            #Если в колонке не даты, пробуем конвертировать в числа
            try:
                self.df[column] = self.df[column].apply(convert_int)
                self.dtypes[column]=int
            except ValueError:
                pass
            else:
                continue
            try:
                self.df[column] = self.df[column].apply(convert_float)
                self.dtypes[column]=float
                self.sql_types[column]='float'
            except ValueError:
                #Если в колонке не даты и не числа, то оставляем значения в строках 
                self.dtypes[column]=str
    #----------#
    def compile_sql_types(self):
        for column in self.df.columns:
            #Если колонка содержит целые числа
            if self.dtypes[column] == int:
                #Находим длину самого большого числа в битах
                max_num = max(self.df[column])
                if max_num.bit_length() <=32:
                    self.sql_types[column]='int'
                else:
                    self.sql_types[column]='bigint'
                continue
            #Если колонка содержит строки и не содержит дат
            if self.dtypes[column] == str and column not in self.dt_cols:
                #Находим самую большую длину среди строк в байтах
                lens = self.df[column].apply(lambda x: 0 if pd.isnull(x) else len(x.encode('utf-8'))).tolist()
                max_len = max(lens)
                if max_len <= 255:
                    self.sql_types[column] = f'varchar(255)'
                else:
                    self.sql_types[column] = f'varchar(max)'
    #----------#
    def sql_create_table(self, conn):
        cols = []
        #Собираем пары вида "название_колонки тип"
        for column in self.df.columns:
            sql_col_type = self.sql_types[column]
            cols.append(f'{column} {sql_col_type}')
        #Объединяем пары через запятую для построения запроса
        cols = ','.join(cols)
        query = f"if not exists (select * from sys.tables where name='{table_name}') create table {table_name} ({cols})"
        try:
            cursor = conn.cursor()
            cursor.execute(query)
            cursor.commit()
        except Exception as err:
            #При неудаче откатываем операцию
            print(err)
            cursor.rollback()
        finally:
            cursor.close()
    #----------#
    def sql_load_sample(self, conn):
        #Шаблон запроса для pyodbc имеет вид "insert into table values (?,?,?,?,...)"
        #Количество вопросительных знаков должно соответствовать числу колонок в таблице
        query = 'insert into {} values ({})'.format(table_name,','.join(['?']*len(self.df.columns)))
        try:
            cursor = conn.cursor()
            #Опция cursor.fast_executemany доступна в pyodbc версии 4.0.19 и выше
            cursor.fast_executemany = True
            cursor.executemany(query, self.df.values.tolist())
            cursor.commit()
        except Exception as e:
            #При неудаче откатываем операцию
            print(e)
            cursor.rollback()
        finally:
            cursor.close()
    #----------#
    def sql_load_file(self, file, conn):
        df = pd.read_csv(f'in/{file}', encoding = encd, sep = delim, dtype = str)
        for column in df.columns:
            if self.dtypes[column]==datetime:
                df[column] = df[column].apply(convert_date)
            elif self.dtypes[column]==int:
                df[column] = df[column].apply(convert_int)
            elif self.dtypes[column]==float:
                df[column] = df[column].apply(convert_float)
        query = 'insert into {} values ({})'.format(table_name,','.join(['?']*len(df.columns)))
        try:
            cursor = conn.cursor()
            cursor.fast_executemany = True
            cursor.executemany(query, df.values.tolist())
            cursor.commit()
            shutil.move(f'in/{file}',f'out/{file}')
        except Exception as e:
            print(e)
            print(file)
            cursor.rollback()
        finally:
            cursor.close()

In [None]:
sample_table = SampleTable()
sample_table.take_sample_df(os.listdir('in')[0])
sample_table.df

In [None]:
sample_table.dtypes

In [None]:
sample_table.sql_types

In [None]:
conn = pyodbc.connect("Driver={SQL Server native Client 11.0};"
                      "Server=data_warehouse;"
                      "Port=0000;"
                      "Database=sample_db;"
                      "Trusted_Connection=yes", autocommit=False)

In [None]:
sample_table.sql_create_table(conn)
sample_table.sql_load_sample(conn)

In [None]:
for file in tqdm(os.listdir('in')):
    sample_table.sql_load_file(file,conn)

In [None]:
conn.close()