## Тестировочный файл
В данном блокноте отрабатываются следующие направления:
- Подключение к SQLite базе(ам)
- Использование в качестве файла хранения и промежуточного запросного Parquet файла
- Разработка визуальных элементов (графиков, индикаторов, чаты?) и таблиц 
- Тестирование библиотек для работы в качестве отдельного приложения (Dash, Streamlit)
- Персонализация и частичный контроль доступа
- Разбивка на составляющие (ipynb, py) файлы
- Тестирование работы всего приложения (скорость, нагрузка на ЦП и ОЗУ, ошибки и пр.)

### Доработки

- Добавить запросы sql из sqlite +
- добавить загрузку в parquet ...>>>
- добавить проверку наличия бд на диске перед подключением
- Настроить через обратные вызовы связь sqlite > parquet > pandas query
- Начать добавлять визуализацию plotly
- Добавить таблицу и форматированные
- Доработать обработчик ошибок, чтобы краткая ошибка выходила на экран, остальное в Лог
- Посмотреть оболочку, которая будет выводить приложение в отдельную веб страницу или сделать через jupyter
- Карты, письмо от ФД, у них есть сервер с тайтлами!
- добавить ролевой доступ и проработать структуру
- добавить резервирование БД
- добавить файл со статусом, где будет: 1) дата изменения файла, которая будет сравниваться с той что на ресурсе 2) статус базы, желательно её имя т.е например чтобы загрузка из резервной подгружалась


## Набор методов для обращения к SQLite БД

In [None]:
# ПЕРЕНЕСТИ В МЕТОД SQLITE_CN! ФУНКЦИЯ ДЛЯ ПРОВЕРКИ НАЛИЧИЯ БД НА ДИСКЕ
def check_db(filename):
    return os.path.exists(filename)
#ТАКЖЕ ДОБАВИТЬ ФУНКЦИЮ, КОТОРАЯ БУДЕТ ПРОВЕРЯТЬ: 1) ВРЕМЯ ИЗМЕНЕНИЯ ФАЙЛА 2) ЗАНЯТОСТИ ФАЙЛА

In [1]:
#создаем класс, для обращения с БД SQLite
class sqlite_cn:
#загружаем обязательные модули
    import sqlite3 as sql3
    import pandas as pd
#при вызове сразу осуществляем подключение к БД и присоединенным базам
    def __init__(self, db_folder, db_basic, db_attached = []):
        try:
            global cn #объявляем cn глобальной переменной, чтобы она была видна в остальных методах
            err_msg = None
            #проверяем существует ли подключение сейчас, если да отключаем
            if 'cn' in vars(__builtins__):
                if (cn):
                    cn.close(); cn = None
            #подключение к основной БД
            full_path = db_folder + db_basic + '.db'
            print('Connecting to: ', full_path)
            cn = self.sql3.connect(r'' + full_path)
            #соединение баз, соединяем перебором (если список)
            if db_attached:
                for db in db_attached:
                    qr_att = r'ATTACH DATABASE "' + db_folder + db + '.db' + '" AS ' + db
                    cn.execute(qr_att)
                    cn.commit()
                    print('DB "' + db + '" attached')
        except self.sql3.Error as error:
            err_msg = error #если ошибка, выводим
            print("Error: ", err_msg)
        finally:
            if err_msg == None and (cn): #отписываемся по итогу
                print('Succesfully connected')
            else:
                print("Connection failed!")
#создаем функцию отключения, после выполнения отправки данных      
    def disconnect_db(self):
        try:
            err_msg = None
            cn.close() #закрываем соединение
        except self.sql3.Error as error:
            err_msg = error #если ошибка, выводим
            print("Error: ", err_msg)
        finally:
            if err_msg == None and (cn): #отписываемся по итогу
                print('Connection closed')
            else:
                print("Closing connection failed!")
#вызов отправки
    def send (self, file_folder, file_name,  sql_ifex, xlsx_sheets = [], dtypes = {}): #file_type - xlsx, csv; file_folder: местонахождение файла, file - файл, xlxs_sheets - если пусто, то загружаем всё
        try:
            err_msg = None
            full_path = file_folder + file_name #для удобства соединяем сразу путь к файлу
            file_type = full_path.split('.')[-1] #выделяем тип файла
            if file_type == 'xlsx': #открываем файл и загружаем его в df или в переменную таблицы
                with self.pd.ExcelFile(open(r'' + full_path,'rb')) as wb: #открываем эксельный файл и нам оттуда нужны будут только наименования листов
                    for sh in wb.sheet_names:
                        if xlsx_sheets:
                            if not sh in xlsx_sheets:
                                continue
                        print ('Sending sheet:', sh)
                        self.pd.read_excel(wb,sheet_name = sh).to_sql(sh, 
                                                                       cn, 
                                                                       if_exists = sql_ifex, 
                                                                       index = False,
                                                                       dtype = dtypes
                                                                      )
                shs = None #очищаем переменную
            elif file_type == 'csv':
                #читаем и отправляем
                self.pd.read_csv(open(r'' + full_path,'r'), sep=';', decimal='.').to_sql(file_name, #!!! для CSV формата, наименование таблицы будет исходить из названия файла
                                                                                    cn, 
                                                                                    if_exists = ifex, 
                                                                                    index = False,
                                                                                    dtype = dtypes)
        except self.sql3.Error as error:
            err_msg = error
            print("Error: ", err_msg)
        finally:
            if err_msg == None and (cn):
                print("Data successfully sent!")
            else:
                print("Sending failed!")
            self.disconnect_db()
#вызов получения
    def get (self, qr, close_conn = True, **args):
        try:
            err_msg = None
            return self.pd.read_sql_query(qr, cn, **args)  #пилим датасет на части, чтобы экономить ОЗУ
        except self.sql3.Error as error:
            err_msg = error
            print("Error: ", err_msg)
        finally:
            if err_msg == None and (cn):
                print("Successfull get.")
            else:
                print("Getting failed!")
            #close conn
            if close_conn == True: self.disconnect_db() #по умолчанию закрываем соединение, но если мы знаем, что будет большой массив, то оставляем для итераций
#сжатие базы данных
    def vac (self):
        try:
            cn.execute("VACUUM") #сжатие
            cn.commit() #подтверждение
            print('Vacuum succesfull!')
        except:
            print('Vacuum failed!')
        finally:
            self.disconnect_db()

## Подключение к БД

In [None]:
#тестовое подключение
data_folder = r'*\sql_db\05_db_' #папка размещения БД-ек
x = sqlite_cn (data_folder, "basic", ['dics'])

## Отправка данных в SQLite из рабочих книг Excel

In [None]:
x.send(r"\\*\datasets\basic", "\\05_datasets_basic_10-20222.xlsx", 'replace','ds10')
#!!! доработать отправку определенных листов, если указываем ds10, то при переборе доступных листов, он также зацепить ds1 т.к. проходит по маске

## Получение данных из SQLite и конвертация в Parquet файл

In [None]:
#функция для формирования запросов
def sql_queries(type_query, q_vars):
    if type_query == 'main': #получение основного массива данных
        return f"""WITH data_v as (
        SELECT src,
        dic_fp.fp_id,
        dic_fp.fp_value,
        dic_fp.aur_kv,
        dic_fp.upr_kratko,
        dic_fp.upr_polnoe,
        dic_fp.gr_zatrat,
        dic_fp.podgr_zatrat,
        dic_fp.gr_1,
        dic_fp.gr_2,
        dic_fp.use_main as fp_use_main,
        dic_fp.use_re as fp_use_re,
        dic_mir.mir_id,
        dic_mir.mir_value,
        dic_mir.use_main as mir_use_main,
        dic_pfm.pfm_id,
        dic_pfm.pfm_value,
        dic_pfm.tip_seti,
        dic_pfm.go_filial,
        dic_pfm.filial,
        dic_pfm.pap,
        dic_pfm.region,
        dic_pfm.use_main as pfm_use_main,
        dic_proj.proj_id,
        dic_proj.proj_value,
        IFNULL(dic_proj.napravlenie_zatrat,'Текущая деятельность') as napravlenie_zatrat,
        IFNULL(dic_proj.use_main,'Да') as proj_use_main,
        IFNULL(dic_proj.use_re,'Да') as proj_use_re,
        date, 
        sum as value
        FROM (((((
        SELECT '{q_vars[4]}' AS src, * FROM {q_vars[0]} UNION ALL
        SELECT '{q_vars[5]}' AS src, * FROM {q_vars[1]} UNION ALL
        SELECT '{q_vars[6]}' AS src, * FROM {q_vars[2]} UNION ALL
        SELECT '{q_vars[7]}' AS src, * FROM {q_vars[3]}) d
        LEFT JOIN dic_fp ON d.fp_id = dic_fp.fp_id)
        LEFT JOIN dic_mir ON d.mir_id = dic_mir.mir_id)
        LEFT JOIN dic_pfm ON d.pfm_id = dic_pfm.pfm_id)
        LEFT JOIN dic_proj ON d.proj_id = dic_proj.proj_id))
        SELECT * FROM data_v"""

Запрашиваем таблицу из SQLite с чанками и сохраняем их в 1 файл Паркет

In [None]:
import pyarrow as pa
import pyarrow.parquet as pq
#запрашиваемые таблицы
src_vars = {'ds2':'Факт 22','ds3':'Факт 21','ds5':'Прогноз 22','ds6':'Бюджет 22',}
src_q = list(src_vars.keys()) + list(src_vars.values())
#получаем датасет и сохраняем его в паркет
chunked_sql_data = x.get(sql_queries('main', src_q), False, chunksize=30000)
#перебираем чанки и сохраняем
first_itr = True #признак первой итерации
for data_chunk in chunked_sql_data: #перебираем чанки
    #создаем файл паркет
    table = pa.Table.from_pandas(data_chunk)
    if first_itr == True: #если первая итерация то сохраняем схему и первые данные
        pqwriter = pq.ParquetWriter(r'\\*s\datasets\prqt_db\05_main.parquet', table.schema) 
        first_itr = False
    #запись с добавлением в существующий файл
    pqwriter.write_table(table) 
    #pa.parquet.write_to_dataset(table , root_path='sample.parquet')
if pqwriter: #закрываем файл записи
    pqwriter.close()
#закрываем соединение SQLite
sqlite_cn.disconnect_db(sqlite_cn)

#получить полную сразу
#not_chunked_sql_data = x.get(sql_queries('main', src_q))
#not_chunked_sql_data.to_parquet('csv_snap2.parquet',compression='snappy')

## Получение данных из Parquet файла до выполнения аггрегации

In [None]:
import pandas as pd
#получаем необходимый набор данных
df = pd.read_parquet(r'*\05_main.parquet', 
                     engine = 'pyarrow',
                     filters=[
                         [('upr_polnoe','==','Управление МТО'),('aur_kv','==','АУР'), ('filial', 'in', ['ГО','СЗФО']), ('napravlenie_zatrat', '=', 'Текущая деятельность')]
                     ],
                     columns = ['src','date','filial','value']
                    )
#обрабатываем столбец с датой, добавляем новый с кварталом
df['date_q'] = pd.to_datetime(df2.date).dt.to_period('Q').dt.strftime('Q%q').astype(str)
#делаем предварительную группировку по значениям
dt = df2.query('filial in ["ГО"]').groupby(['src','filial','date_q']).agg('sum','value').reset_index().sort_values('date_q')
#добавляем столбец по значениям в млн.р
dt['val_th'] = (dt['value']/1000000).round(2)
#группируем и переворачиваем столбец по источникам
dt_t = dt.groupby(['src','filial','date_q']).agg({'val_th':'sum'}).unstack('src').reset_index().droplevel(0, axis=1)
#переименовываем столбцы
dt_t.columns = ['filial','date','Бюджет 22','Прогноз 22','Факт 21','Факт 22']

## Построение графиков и индикаторов

In [None]:
#тест! Подготовить макеты под построение графиков, в случае типизации, вывести в класс с вызовом через методы
import plotly.graph_objects as go

layout = go.Layout(
    autosize=False,
    width=800,
    height=300,
    paper_bgcolor='rgb(255,255,255)',
#    plot_bgcolor='rgba(0,0,0,0)',
    margin=go.layout.Margin(
        l=10,
        r=10,
        b=5,
        t=25,
        pad = 1
    )
)

fig = go.Figure(layout = layout)

fig.add_trace(go.Scatter(x=dt_t['date'], y=dt_t['Факт 22'],
                         name='Факт 22',
                         line=dict(dash = 'solid', color = "#1e90ff", width = 3),
                         text=dt_t['Факт 22'],textposition="bottom center",
                         ))
fig.add_trace(go.Scatter(x=dt_t['date'], y=dt_t['Факт 21'],
                         name='Факт 21',
                         line=dict(dash = 'solid', color = "grey", width = 3),
                         text=dt_t['Факт 21'],textposition="bottom center",
                         ))
fig.add_trace(go.Scatter(x=dt_t['date'], y=dt_t['Бюджет 22'],
                         name='Бюджет', 
                         line=dict(dash = 'dot', color = '#1034a6', width = 2),
                         text=dt_t['Бюджет 22'],textposition="top center",
                         ))
fig.add_trace(go.Scatter(x=dt_t['date'], y=dt_t['Прогноз 22'],
                         name='Прогноз',
                         line=dict(dash = 'dash', color = '#fdbe02', width = 2),
                         text=list(dt_t['Прогноз 22']),textposition="bottom center",
                         ))

fig.update_yaxes(visible=False)
fig.update_layout()
fig.update_traces(mode='lines+markers+text', hoverinfo="y+z+name", selector=dict(type='scatter'), line_shape='spline')

fig.update_xaxes(
    showspikes=True,
    spikecolor="#1e90ff",
    spikesnap="data",
    spikemode="toaxis",
    spikedash="solid",
    spikethickness = 1
)
fig.update_xaxes(showgrid=True, gridwidth=1, gridcolor='LightPink')
fig.update_yaxes(visible=True, showgrid=True, gridwidth=1, gridcolor='LightPink', showticklabels = False)

fig.show()