### Задание 2.1
На любом языке программирования реализовать реконсиляцию транзакций клиентов банка из двух источников (как минимум один - таблица БД, второй на выбор). 
В данных должен присутствовать уникальный ключ - uid/id на выбор;
Реконсиляция должна быть масштабируема и применима к Big Data

Дополнительно сделать реконсиляцию:

- для разных типов данных (дата, текст, числа)
- для числовых данных должна быть возможность сконфигурировать толеранс (допустимую погрешность в %)

In [None]:
import os
import random
import csv
import vertica_python
import configparser
import pandas as pd
from datetime import datetime, timedelta
from dateutil import parser

In [None]:
## Создадим тестовую таблицу с банковскими транзакциями

In [None]:
# Сформируем датасет с мастер-данными
start_date = datetime.now().date() - timedelta(days=1365)
end_date = datetime.now().date()
user_list = []
for i in range(1,200000):
    user_list.append(i)
df = []
while start_date != end_date:
    for i in range(0,1000):
        df.append([random.choice(user_list),start_date,round(random.uniform(1, 1000), 5)])
    start_date = start_date + timedelta(days=1)
%time

In [None]:
# Загрузим в БД

In [None]:
config = configparser.ConfigParser()
path = os.path.join(os.getcwd(), 'config.ini')
settings = config.read(path)
connection_info = {'host': config['CONNECTION']['host'],
                   'port': config['CONNECTION']['port'],
                   'user': config['CONNECTION']['user'],
                   'password': config['CONNECTION']['password'],
                   'database': config['CONNECTION']['database']
                   }

In [None]:
with open("test_df.csv", "w", newline="") as f:
    writer = csv.writer(f)
    writer.writerows(df)
connection = vertica_python.connect(**connection_info)
cur = connection.cursor()
with open('test_df.csv', "rb") as fs:
    my_file = fs.read().decode('utf-8', 'ignore')
    cur.copy(
        """COPY DB_DEFAULT.tb3_transactions
        FROM STDIN PARSER fcsvparser(delimiter=',', header='true') 
        """,
        my_file)
    connection.commit()
cur.close()

In [None]:
#сформируем датасет с данными для проверки

In [None]:
connection = vertica_python.connect(**connection_info)
cur = connection.cursor()
cur.execute("""SELECT * FROM DB_DEFAULT.tb3_transactions WHERE transaction_date between '2019-11-10' and '2019-12-10' """)
df_test = cur.fetchall()
df_test = pd.DataFrame(df_test)

In [None]:
#сформируем csv для использования в процессе реконсиляции
df_test.to_csv('test_data.csv', sep=',', header=False, index=False, encoding='utf-8', na_rep=' ')

### Пояснение к решению
Перед разработкой решения было рассмотрено несколько вариантов реализации системы:</p>
- решение для транзакционной БД</p>
- решение для аналитической БД</p>
<p>Ключевое отличие заключается в размере батча реконсилируемых данных. В первом случае это был бы небольшой набор данных от 1 до нескольких записей, так как должно соблюдаться условие поддержки потока данных максимально приближенному к реальному времени. Во втором случае такое решение не подошло бы в силу большой нагрузки на СУБД. Оптимальным решением здесь выступает выбор большого набора данных.</p>
В силу характера предстоящей работы и возможных задач было принято решение в пользу второго варианта.</p>
Для оптимизации запроса к СУБД было принято решение о реализации механизма предварительного анализа данных и выделение ключевого условия для фильтра в зависимости от характера данных. Выделены следующие основные сценарии использования:</p>
- анализ определенного временного интервала</p>
- анализ транзакций определенного пользователя</p>
- анализ конкретных транзакций</p>

В зависимости от сценария формируется оптимальный запрос к БД с целью выбора наиболее релевантных данных и снижения требований к оперативной памяти, увеличения скорости выполнения операции реконсиляции.</p>
Возможные альтернативные варианты реализации - с использованием ресурсов СУБД. Тоже неплохой вариант с возможностью работы на больших объемах данных. Минус заключается в меньшей функциональности.</p>
В текущем решении рассмотрены следующие возможные варианты расхождения данных:</p>
- данные есть в БД, но отсутствуют в csv (учтено)</p>
- данные отсутсвуют в БД, данные есть в csv (не учитывалось)</p>
- данные есть в двух системах, но имеют разные значения (учтено)</p>
- задвоенные данные в csv (учтено)</p>
Вариант 2 не учитывался, так как БД была выбрана в качестве мастер-системы, с которой происходило сравнение. Данный вариант был выбран как  наиболее реалистичный.</p>
***
#### Масштабируемость решения обеспечивается:
- возможностью включения новых источников данных (расширения методами get_web_api_data, get_xml_data ..)</p>
- адаптивной логикой в зависимости от типов данных, что сглаживает нагрузку на систему в зависимости от характера реконсиляции</p>
***
#### Дополнительно:
- Помимо этого были рассмотрены варианты использования хэшей в механизме реконсиляции (в частности был проработан MD5). Но в силу того, что необходимо учитывать толеранс при сравнении данных, хэширование значительно усложняло логику, что в итоге увеличивало скорость работы системы</p>
- Были рассмотрены варианты встраивания моделей машинного обучения в логику сравнения. От данного решения пришлось отказаться, так как требуется максимально точное сравнение, тк речь идет о деньгах и банковских транзакциях. ML можно использовать в качестве рекомендательного механизма для решения задач в этой области.</p>

In [None]:
import os
import random
import csv
import vertica_python
import configparser
import pandas as pd
from datetime import datetime, timedelta
from dateutil import parser

class DataReconsilation:
    
    __config = configparser.ConfigParser()
    __path = os.path.join(os.getcwd(), 'config.ini')
    __settings = __config.read(__path)
    __connection_info = {'host': __config['CONNECTION']['host'],
                         'port': __config['CONNECTION']['port'],
                         'user': __config['CONNECTION']['user'],
                         'password': __config['CONNECTION']['password'],
                         'database': __config['CONNECTION']['database']
                         }
    
    def __init__(self, __base_system):
        self.__base_system = __base_system

    def _get_mastersystem_data(self,num_id,how='date'):
        """Выгрузка данных из мастер-системы. В данном случае используется СУБД Vertica
        """

        connection = vertica_python.connect(**self.__connection_info)
        cur = connection.cursor()
        sql = """SELECT * FROM DB_DEFAULT.tb3_transactions WHERE {how} in {num_id}"""
        if how=='date':
            cur.execute(sql.format(how='transaction_date', num_id=num_id))
        elif how=='client':
            cur.execute(sql.format(how='user_id', num_id=num_id))
        elif how=='transaction':
            cur.execute(sql.format(how='transaction_id', num_id=num_id))
        temp = cur.fetchall()
        cur.close()
        return pd.DataFrame(temp,columns=None)
    
    def get_csv_data(self, path):
        """ Загрузка данных из csv источника
        """
        df_csv = pd.read_csv(path,header=None)
        return df_csv
    
    def __get_web_api_data(self):
        """Возможный вариант масштабирования решения через подключения нового источника
        """
        return None
    
    def __get_xml_data(self):
        """Возможный вариант масштабирования решения через подключения нового источника
        """
        return None
    
    def _get_last_job(self,job_type,row_count):
        """ Выбор из технической таблицы id последнего запущенного job'а
        """
        connection = vertica_python.connect(**self.__connection_info)
        cur = connection.cursor()
        cur.execute("""SELECT max(job_id) FROM DB_DEFAULT.tb3_jobs""")
        temp = cur.fetchall()
        sql = """INSERT INTO DB_DEFAULT.tb3_jobs(job_id,match_type,start_date,row_count) 
                 VALUES ({job},'{job_type}',now(),{row_count});"""
        cur.execute(sql.format(job=temp[0][0]+1,job_type=job_type,row_count=row_count))
        connection.commit()
        cur.close()
        return int(temp[0][0])
    
    def __upload_new_df(self,df,table_name):
        """Метод для загрузки реконсилированных данных
        """
        connection = vertica_python.connect(**self.__connection_info)
        cur = connection.cursor()
        pd.DataFrame(df,columns=None).to_csv('temp_data.csv',
                                             sep=',', header=False, index=False,
                                             encoding='utf-8', na_rep=' ')
        with open('temp_data.csv', "rb") as fs:
            my_file = fs.read().decode('utf-8', 'ignore')
            sql = """COPY {table_name} FROM STDIN PARSER 
                     fcsvparser(delimiter=',', header='false')"""
            cur.copy(sql.format(table_name=table_name),my_file)
            connection.commit()
        cur.close()
    
    def _get_match_list(self,how,df_csv):
        """Выбор значений для выгрузки данных из БД, формирование sql-запроса
        """
        if how == 'transaction':
            j=0
        elif how == 'client':
            j=1
        elif how == 'date':
            j=2
            
        new_list = "('" + str(df_csv[j].unique()[0]) + "'"
        for i in range(1,len(df_csv[j].unique())):
            new_list = new_list + ',' + "'" + str(df_csv[j].unique()[i]) + "'"
        new_list = new_list + ')'
        
        return new_list
    
    def __find_duplicates(self,df_csv):
        """Поиск возможного дублирования в данных
        """
        duplicates_list = df_csv[df_csv.groupby(0)[0].transform('size') > 1][0].drop_duplicates().values.tolist()
        return duplicates_list
    
    def get_matches_num(self,how,df_csv,tolerance,match_type,job,rows,df_bad,df_good):
        """Реконсиляция числовых данных
        """
        for i in range(len(df_csv)):
            if df_csv[0][i] in rows[0].values:
                match = rows[rows[0] == df_csv[0][i]]
                if float(match[3])*(1-tolerance/100) <= float(df_csv[3][i]) <= float(match[3])*(1+tolerance/100) :
                    df_good.append([job,df_csv[0][i],df_csv[1][i],df_csv[2][i],df_csv[3][i]])
                else:
                    df_bad.append([job,df_csv[0][i],df_csv[1][i],df_csv[2][i],df_csv[3][i]])
            else:
                df_bad.append([job,df_csv[0][i],df_csv[1][i],df_csv[2][i],df_csv[3][i]])

        return df_bad,df_good
    
    def get_matches_date(self,how,df_csv,match_type,job,rows,df_bad,df_good):
        """Реконсиляция данных даты-времени
        """
        for i in range(len(df_csv)):
            if df_csv[0][i] in rows[0].values:
                match = rows[rows[0] == df_csv[0][i]]
                if parser.parse(df_csv[2][i]).date() == match[2].values[0] :
                    df_good.append([job,df_csv[0][i],df_csv[1][i],df_csv[2][i],df_csv[3][i],])
                else:
                    df_bad.append([job,df_csv[0][i],df_csv[1][i],df_csv[2][i],df_csv[3][i]])
            else:
                df_bad.append([job,df_csv[0][i],df_csv[1][i],df_csv[2][i],df_csv[3][i]])

        return df_bad,df_good
    
    def get_matches_string(self,how,df_csv,match_type,job,rows,df_bad,df_good):
        """Реконсиляция строковых данных
        """
        for i in range(len(df_csv)):
            if df_csv[0][i] in rows[0].values:
                match = rows[rows[0] == df_csv[0][i]]
                if str(df_csv[1][i]) == str(match[1].values[0]) :
                    df_good.append([job,df_csv[0][i],df_csv[1][i],df_csv[2][i],df_csv[3][i]])
                else:
                    df_bad.append([job,df_csv[0][i],df_csv[1][i],df_csv[2][i],df_csv[3][i]])
            else:
                df_bad.append([job,df_csv[0][i],df_csv[1][i],df_csv[2][i],df_csv[3][i]])

        return df_bad,df_good
    
    def get_matches_all(self,how,df_csv,tolerance,match_type,job,rows,df_bad,df_good):
        """Реконсиляция полной строки
        """
        for i in range(len(df_csv)):
            if df_csv[0][i] in rows[0].values:
                match = rows[rows[0] == df_csv[0][i]]
                if ((str(df_csv[1][i]) == str(match[1].values[0])) and 
                    (parser.parse(df_csv[2][i]).date() == match[2].values[0]) and 
                    (float(match[3])*(1-tolerance/100) <= float(df_csv[3][i]) <= float(match[3])*(1+tolerance/100))):
                    df_good.append([job,df_csv[0][i],df_csv[1][i],df_csv[2][i],df_csv[3][i]])
                else:
                    df_bad.append([job,df_csv[0][i],df_csv[1][i],df_csv[2][i],df_csv[3][i]])
            else:
                df_bad.append([job,df_csv[0][i],df_csv[1][i],df_csv[2][i],df_csv[3][i]])

        return df_bad,df_good
    
    def select_match_type(self,how,df_csv,match_type,tolerance):
        """Выбор способа раконсиляции
        """
        job = self._get_last_job(match_type,len(df_csv))+1
        new_list = self._get_match_list(how,df_csv)
        rows = db._get_mastersystem_data(new_list,how=how)
        df_good =[]
        df_bad =[]
        
        if match_type == 'date':
            df_bad,df_good = self.get_matches_date(how,df_csv,match_type,job,rows,df_bad,df_good)
            
        elif match_type == 'numeric':
            df_bad,df_good = self.get_matches_num(how,df_csv,tolerance,match_type,job,rows,df_bad,df_good)
            
        elif match_type == 'string':
            df_bad,df_good = self.get_matches_string(how,df_csv,match_type,job,rows,df_bad,df_good)
        
        elif match_type == 'all':
            df_bad,df_good = self.get_matches_all(how,df_csv,tolerance,match_type,job,rows,df_bad,df_good)
        
        duplicates = self.__find_duplicates(df_csv)
        for i in duplicates:
            df_bad.append([job,i[0],i[1],i[2],i[3]])
        
        return df_bad,df_good
    
    def get_match(self,path,match_type,tolerance):
        """ Основной метод, запускающий реконсиляцию и загружающий реконсилированные данные в БД
            match_types = [date,numeric,string]
            tolerance is a number between 1 and 100 (%)
        """
        
        df_csv = self.get_csv_data(path)
        transaction_len = len(df_csv[0].unique())
        client_len = len(df_csv[1].unique())
        date_len = len(df_csv[2].unique())
        
        if transaction_len <= client_len and transaction_len <= date_len:
            df_bad,df_good = self.select_match_type('transaction',df_csv,match_type,tolerance)
                    
        elif client_len < transaction_len and client_len <= date_len:
            df_bad,df_good = self.select_match_type('client',df_csv,match_type,tolerance)
                    
        elif date_len <= client_len and date_len < transaction_len:
            df_bad,df_good = self.select_match_type('date',df_csv,match_type,tolerance)
            
        self.__upload_new_df(df_bad,'DB_DEFAULT.tb3_transactions_bad')
        self.__upload_new_df(df_good,'DB_DEFAULT.tb3_transactions_norm')

        return print('количество отличающихся записей,размер датасета -',len(df_bad),len(df_csv))

In [None]:
db = DataReconsilation('csv')

In [None]:
db.get_match('test_data.csv',match_type='all',tolerance=1)
%%time

### Задание 2.2
На любом языке программирования реализовать сервис по сбору агрегатов из таблицы БД с банковскими транзакциями.
Агрегаты должны быть собраны из реконсилированных данных по каждому клиенту в разрезах дней, месяцев и общий итог.

Прошу обратить внимание на то, что для задачи 2 пункт 2 агрегаты должны быть построены на реконсилированных данных. В противном случае задание считается заваленным.

In [None]:
class AggregationCreating:
    
    __config = configparser.ConfigParser()
    __path = os.path.join(os.getcwd(), 'config.ini')
    __settings = __config.read(__path)
    __connection_info = {'host': __config['CONNECTION']['host'],
                         'port': __config['CONNECTION']['port'],
                         'user': __config['CONNECTION']['user'],
                         'password': __config['CONNECTION']['password'],
                         'database': __config['CONNECTION']['database']
                         }
    
    def __init__(self):
        return None

    def __get_raw_data(self):
        """ Выбор данных для построения агрегатов. 
            Данные берутся из реконсилированных данных таблицы tb3_transactions_norm.
        """
        connection = vertica_python.connect(**self.__connection_info)
        cur = connection.cursor()
        sql = """select * from DB_DEFAULT.tb3_transactions_norm t1
                 where t1.job_id in (select max(job_id) from DB_DEFAULT.tb3_jobs);"""
        cur.execute(sql)
        temp = cur.fetchall()
        cur.close()
        return pd.DataFrame(temp,columns=['job_id','transaction_id','user_id','transaction_date','amount_USD'])
    
    def get_day_agg(self):
        """ Агрегация данных по дням
        """
        df = self.__get_raw_data()
        final = pd.DataFrame(df.groupby(['user_id', 'transaction_date'])['amount_USD'].sum().reset_index())
        final.sort_values(by='user_id',inplace=True)
        return final
    
    def get_month_agg(self):
        """ Агрегация данных по месяцам
        """
        df = self.__get_raw_data()
        df['transaction_date2'] = df['transaction_date'].apply(lambda x: x.strftime('%Y-%m'))
        final = pd.DataFrame(df.groupby(['user_id', 'transaction_date2'])['amount_USD'].sum().reset_index())
        final.sort_values(by='user_id',inplace=True)
        return final
    
    def get_total_agg(self):
        """ Агрегация данных в тотале
        """
        df = self.__get_raw_data()
        final = pd.DataFrame(df.groupby(['user_id'])['amount_USD'].sum().reset_index())
        final.sort_values(by='user_id',inplace=True)
        return final

In [None]:
db2 = AggregationCreating()

In [None]:
db2.get_total_agg()