In [None]:
from typing import List
from abc import ABC, abstractmethod
from sqlalchemy import create_engine, Column, Integer, String, Float
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker
Base = declarative_base()

In [None]:
class TradeRecord(Base):
    __tablename__ = 'TradeRecord'
    id = Column(Integer, primary_key=True)
    source_currency = Column(String)
    dest_currency = Column(String)
    amount = Column(Integer)
    price = Column(Float)

    def __init__(self, source, dest, amount, price):
        self.source_currency = source
        self.dest_currency = dest
        self.amount = amount
        self.price = price
        
    def __str__(self):
        return f'{self.source_currency}, {self.dest_currency}, {self.amount}, {self.price}'

## Abstractions

In [None]:
class Logger(ABC):
    @abstractmethod
    def log(self, message):
        pass

In [None]:
class Mapper(ABC):
        @abstractmethod
        def map(self, fields: List[str]):
            pass

In [None]:
class Validator(ABC):
    def __init__(self, logger: Logger):
        self.logger = logger

    @abstractmethod
    def validate(self, fields: List[str], record_line: int) -> bool:
        pass

In [None]:
from abc import ABC, abstractmethod

class Parser(ABC):
    def __init__(self, validator: Validator, mapper: Mapper):
        self.validator = validator
        self.mapper = mapper

    def parse_trades(self, trade_data: List[str]):
        trades: T = []
        for index, record in enumerate(trade_data):
            fields = self.parse(record)
            if not self.validator.validate(fields, index + 1):
                continue
            trade = self.mapper.map(fields)
            trades.append(trade)
        return trades

    @abstractmethod
    def parse(self, record: str) -> List[str]:
        pass

In [None]:
class Repository(ABC):
    def __init__(self, logger: Logger):
        self.logger = logger

    @abstractmethod
    def store_records(self, records: List[TradeRecord]) -> None:
        pass

In [None]:
class Reader(ABC):
    @abstractmethod
    def read_data(self) -> List[str]:
        pass

## Implementations

In [None]:
class ConsoleLogger(Logger):
    def log(self, message):
        print(message)

In [None]:
class FileReader(Reader):
    def __init__(self, filename):
        self.filename = filename
        
    def read_data(self):
        trade_records: List[str] = []
        with open(self.filename) as data_source:
            for trade_record in data_source: 
                trade_records.append(trade_record.rstrip())
        return trade_records

In [None]:
class CommaParser(Parser):
    def __init__(self, validator: Validator, mapper: Mapper):
        Parser.__init__(self, validator, mapper)

    def parse(self, record: str)-> List[str]:
        return record.split(',')

In [None]:
class PostgresRepo(Repository):
    def __init__(self, logger: Logger, cnn_string: str):
        self.cnn_string = cnn_string
        Repository.__init__(self, logger)

    def store_records(self, records: List[TradeRecord]) -> None:
        engine = create_engine(self.cnn_string)
        Session = sessionmaker(bind=engine)
        Base.metadata.create_all(engine)
        session = Session()
        for trade in records:
            session.add(trade)
        session.commit()
        session.close()
        self.logger.log(f'{len(records)} records have saved')

In [None]:
class TradeRecordMapper(Mapper):
    def map(self, processed_record: List[str]) -> TradeRecord:
        in_curr = slice(0, 3);
        out_curr = slice(3, None)
        source_curr_code = processed_record[0][in_curr]
        dest_curr_code = processed_record[0][out_curr]
        trade_amount = int(processed_record[1])
        trade_price = float(processed_record[2])
        trade_record = TradeRecord(source_curr_code, dest_curr_code,trade_amount, trade_price)
        return trade_record

In [None]:
class SimpleValidator(Validator):
    def __init__(self, logger: Logger):
        Validator.__init__(self, logger) #dependecy
        
    def validate(self, record: List[str], index: int) -> bool:
        if len(record) != 3:
            self.logger.log(f'Line {index} malformed. Only {len(record)} field(s) found.')
            return False
        if len(record[0]) != 6:
            self.logger.log(f'Trade currencies on line {index} malformed: {record[0]}')
            return False
        try:
            trade_amount = float(record[1])
        except ValueError:
            self.logger.log(f"Trade amount on line {index} not a valid integer: '{record[1]}'")
            return False
        try:
            trade_price = float(record[2])
        except ValueError:
            self.logger.log(f'Trade price on line {index} not a valid decimal:{record[2]}')
            return False
        return True

In [None]:
class TradeProcessor:
    def __init__(self,reader: Reader, parser: Parser, repo: Repository) -> None:
        self.reader = reader
        self.parser = parser
        self.repo = repo

    def process_trades(self):
        lines = self.reader.read_data()
        trades = self.parser.parse_trades(lines)
        self.repo.store_records(trades)

Initialize TradeProcessor

In [None]:
logger = ConsoleLogger()
reader = FileReader('Data.txt')
mapper = TradeRecordMapper()

validator = SimpleValidator(logger)
parser = CommaParser(validator, mapper)
repo = PostgresRepo(logger, 'postgresql://postgres:u2402/501@localhost:5432/python')

trade_processor = TradeProcessor(reader, parser, repo)
trade_processor.process_trades()