# Solution - László Pogány

The reason of selecting __Jupyter/IPython Notebook__ against a simple Python project is the self documentation support. 
Although the solutions implemented in this way can't be integrated directly into the existing operative systems (without further work), but it is much easier to demonstrate the whole process and the implemented functionalities step by step.

This solution installs the external libraries, downloads the required historical data automatically and computes the relevant metrics.

## Installing prerequisites

In [None]:
!python --version
# Python 3.6.3 :: Anaconda, Inc.

The __pandas__ library is used for storing the data in dataframes.
<br />
The __beautifulsoup4__ library is used for scrapping data from web.
<br />
The __pandas-datareader__ library is used for downloading __Yahoo Finance__ and __Google Finance__ datasets.<br />
Further information: https://github.com/pydata/pandas-datareader

In [None]:
!pip install datetime
!pip install bs4
!pip install requests
!pip install numpy
!pip install scipy
!pip install pandas
!pip install pandas-datareader
!pip install intervaltree
!pip install beautifulsoup4

My local configuration (shown due to the experienced difficulties which are probably related to different versions of packages)

In [None]:
!pip show datetime | grep -i "Version:"
!pip show bs4 | grep -i "Version:"
!pip show requests | grep -i "Version:"
!pip show numpy | grep -i "Version:"
!pip show scipy | grep -i "Version:"
!pip show pandas | grep -i "Version:"
!pip show pandas-datareader | grep -i "Version:"
!pip show intervaltree | grep -i "Version:"
!pip show beautifulsoup4 | grep -i "Version:"
#Version: 4.2
#Version: 0.0.1
#Version: 2.18.4
#Version: 1.13.3
#Version: 0.19.1
#Version: 0.20.3
#Version: 0.5.0
#Version: 2.1.0
#Version: 4.6.0

## Downloader and preprocessor

Predefined directory names for the application.
The downloads and the calculated outputs are placed into special directories inside the repository.

In [None]:
import os

# constants and directory structure for the application
OUTPUT_DIRECTORY_PATH       = r'..\out'
LOG_DIRECTORY_PATH          = os.path.join(OUTPUT_DIRECTORY_PATH, "logs")
DOWNLOAD_DIRECTORY_NAME     = os.path.join(OUTPUT_DIRECTORY_PATH, "data")
DATABASE_DIRECTORY_NAME     = os.path.join(OUTPUT_DIRECTORY_PATH, "meta")
FINGERPRINTS_DIRECTORY_NAME = os.path.join(OUTPUT_DIRECTORY_PATH, "calc")

DATABASE_FILE_NAME          = 'metadata.db'

def createDirectory(path):
    if not os.path.exists(path):
        os.makedirs(path)
        
createDirectory(OUTPUT_DIRECTORY_PATH)
createDirectory(LOG_DIRECTORY_PATH)
createDirectory(DOWNLOAD_DIRECTORY_NAME)
createDirectory(DATABASE_DIRECTORY_NAME)
createDirectory(FINGERPRINTS_DIRECTORY_NAME)


The __PlatformLogger__ function provides logging services for the modules/classes. <br />
The __Utils__ and __InputPreprocessor__ classes are implementing common functionalities for data manipulation in form of static helper functions.

In [None]:
from datetime import datetime, timezone
import time
import doctest
import logging
import logging.handlers

doctest.testmod()


loggers = {}
def PlatformLogger(moduleName):
    """ Provides logging functionalities.
    
    The class configures the log service, the log handlers and returns a logger object.
    """
    
    global loggers

    if loggers.get(moduleName):
        
        return loggers.get(moduleName)
    
    else:
        
        # create logger with moduleName parameter
        logger = logging.getLogger(moduleName)
        logger.setLevel(logging.DEBUG)

        # create console handler with a higher log level
        ch = logging.StreamHandler()
        ch.setLevel(logging.INFO)

        # create file handler which logs even debug messages
        fh = logging.FileHandler(os.path.join(LOG_DIRECTORY_PATH, 'error.log'))
        fr = logging.handlers.RotatingFileHandler(os.path.join(LOG_DIRECTORY_PATH, 'event.log'))
        fh.setLevel(logging.WARN)
        fr.setLevel(logging.DEBUG)

        # create formatter and add it to the handlers
        formatter = logging.Formatter('[%(asctime)s] - %(name)s - %(levelname)s - %(message)s')
        fh.setFormatter(formatter)
        fr.setFormatter(formatter)
        ch.setFormatter(formatter)

        # add the handlers to the logger
        logger.addHandler(fh)
        logger.addHandler(fr)
        logger.addHandler(ch)
        
        # updating loggers
        loggers.update({moduleName: logger})

        return logger



class Utils(object):
    """ The class contains a collection of useful functions in relation to minor data manipulations """
    
    
    @staticmethod
    def checkDatetimeParam(dt, paramName):
        assert (type(dt) is datetime), \
            paramName + " parameter is not datetime, it is a(n) %s" % type(dt)
    
    
    @staticmethod
    def checkStringParam(string, paramName):
        assert (type(string) is str), \
            paramName + " parameter is not str, it is a(n) %s" % type(dt)
    
    
    @staticmethod
    def convertDateTimeToUnixTimeStamp(dt, useLocalTime=False):
        if (useLocalTime):
            return int(time.mktime(dt.timetuple()))
        else:
            return dt.replace(tzinfo=timezone.utc).timestamp()
    
    
    @staticmethod
    def convertUnixTimeStampToDateTime(ts, useLocalTime=False):
        if (useLocalTime):
            return datetime.fromtimestamp(ts)
        else:
            return datetime.utcfromtimestamp(ts)
    

    @staticmethod
    def convertDatetimeStrFormat(dtString, fromPattern, toPattern):
        """ The method converts a datetime object represented in formatted string to another formatted string 
        
        >>> Utils.convertDatetimeStrFormat('1987-08-14', "%Y-%m-%d", "%d/%m/%Y")
        '14/08/1987'
        >>> Utils.convertDatetimeStrFormat('5/1/1990', "%d/%m/%Y", "%Y-%m-%d")
        '1990-01-05'
        """
        
        return datetime.strptime(dtString, fromPattern).strftime(toPattern)
    
    
    @staticmethod
    def convertDateTimeToString(dt, datetime_pattern = "%Y%m%d"):
        """ Converter for datetime types
        
        In case of datetime parameter returns a formatted datetime string, otherwise returns the original
        
        >>> Utils.convertDateTimeToString(datetime(2016,1,2,3,4,5))
        '20160102'
        >>> Utils.convertDateTimeToString(datetime(2016,1,2,3,4,5), "%Y%m%d-%H%M%S")
        '20160102-030405'
        >>> Utils.convertDateTimeToString('2016-01-01')
        '2016-01-01'
        >>> Utils.convertDateTimeToString(2016)
        Traceback (most recent call last):
          ...
        AssertionError: dt parameter is not string nor datetime, it is a(n) <class 'int'>
        """
        
        # checking parameter type
        assert (type(dt) is datetime or type(dt) is str), \
            "dt parameter is not string nor datetime, it is a(n) %s" % type(dt)
        
        # converversion with formatting datetime to string if needed
        dtStr = dt
        if(type(dt) is datetime):
            dtStr = dt.strftime(datetime_pattern)
            
        return dtStr



class InputPreprocessor(object):
    """ The class contains a collection of functions in relation to preprocessing of the historical data """
    
    
    @staticmethod
    def cleanDataframe(dataframe, uniqueColNameList = None):
        """ Sorts, reindexes the dataframe and removes duplicates """
        
        if (uniqueColNameList == None):
            uniqueColNameList = dataframe.columns[0]
        
        # dropping duplicates, sorting and reindexing the table
        dataframe.drop_duplicates(subset=uniqueColNameList, inplace=True)
        dataframe.sort_values(uniqueColNameList, inplace=True)
        dataframe.reset_index(drop=True, inplace=True)
    
    
    
    @staticmethod
    def filterOutliers(candles, SIGMA = 3):
        """ Filters the outliers from the dataset according to N-sigma rule
        
        Determines the outliers by using the standard deviation and checks the non-zero condition on prices.
        Args:
            candles: the original candle dataset with columns: 'Open', 'Close', 'High', 'Low', 'Volume'.
        Returns:
            valid: dataframe of correct candles with a new index.
            filtered: dataframe of filtered candles with the original index.
            valid: dataframe of correct candles with the original index.
        """
        
        # abbreviating columns
        o = candles["Open"]
        h = candles["High"]
        l = candles["Low"]
        c = candles["Close"]
        v = volumes = candles["Volume"]
        
        # concatenating price columns
        aggr_prices = pd.concat([o, h, l, c])
           
        # calculating scalars for outlier detection
        price_mean = aggr_prices.mean()
        price_std  = aggr_prices.std()
        vols_mean  = volumes.mean()
        vols_std   = volumes.std()
        
        # getting outliers for prices
        x_o1 = candles[~(np.abs(o - price_mean) <= (SIGMA * price_std))]
        x_c1 = candles[~(np.abs(c - price_mean) <= (SIGMA * price_std))]
        x_h1 = candles[~(np.abs(h - price_mean) <= (SIGMA * price_std))]
        x_l1 = candles[~(np.abs(l - price_mean) <= (SIGMA * price_std))]
        x_o2 = candles[o <= 0.0]
        x_c2 = candles[h <= 0.0]
        x_h2 = candles[l <= 0.0]
        x_l2 = candles[c <= 0.0]
        
        # getting outliers for volumes
        x_v1 = candles[~(np.abs(v - vols_mean) <= (SIGMA * vols_std))]
        x_v2 = candles[v <= 0.0]
        
        # getting filtered outliers
        filtered = x_o1.append(x_o2)
        filtered = filtered.append(x_c1).append(x_c2)
        filtered = filtered.append(x_h1).append(x_h2)
        filtered = filtered.append(x_l1).append(x_l2)
        filtered = filtered.append(x_v1).append(x_v2)
        
        # getting valid data
        filtered = filtered.drop_duplicates()
        valid = candles[~candles.isin(filtered)].dropna()
        
        # returns: valid data with new index, filtered data with old index, valid data with old index
        return valid.reset_index(drop=True), filtered, valid


The downloading process of the datasets is implemented by using a __DownloadAdapter__ instance.
<br />
The __YahooFinanceDownloadAdapter__ and __GoogleFinanceDownloadAdapter__ classes are implementing download datasource specific solutions.

In [None]:
from abc import ABC, abstractmethod
from datetime import datetime
import requests
import pandas as pd
import pandas_datareader as pdr
import io
import doctest

doctest.testmod()


class DownloadAdapter(ABC):
    """ Base class for download adapters
    
    The subclasses are inherit the 'downloaderFunction' abstract method.
    The abstract method's implementation will specify the real download method.
    """
    
    def __init__(self, datasourceName):
        super().__init__()
        self.datasourceName = datasourceName
    
    
    @abstractmethod
    def downloaderFunction(self, symbolCode, startDateTime, endDateTime):
        """ This method will be called by the data manager """
        pass



class YahooFinanceDownloadAdapter(DownloadAdapter):
    """ Specifies Yahoo Finance downloading functionalities by implementing DownloadAdapter """
    
    def __init__(self):
        super().__init__('yahoo')
    
    
    def downloaderFunction(self, symbolCode, startDateTime, endDateTime):
        """ This method downloads the historical data from Yahoo Finance
        """
        
        # checking parameter type
        Utils.checkDatetimeParam(startDateTime, 'startDateTime')
        Utils.checkDatetimeParam(endDateTime, 'endDateTime')
        
        # downloads the historical data
        data = pdr.get_data_yahoo(symbolCode, start=startDateTime, end=endDateTime)
        
        # drops an unnecessary column
        del data['Adj Close']
        
        # sorting table
        data.sort_index(inplace=True)
        
        return data
        


class GoogleFinanceDownloadAdapter(DownloadAdapter):
    """ Specifies Google Finance downloading functionalities by implementing DownloadAdapter 
    
    This method implements an URL based solution. The reason of not using 'pandas_datareader' library here is that
    it seems not to work on older data. According to the experiments a templated direct URL can dowload much more
    historical data.
    """
    
    def __init__(self):
        super().__init__('google')
    
    
    DOWNLOAD_URL_PATTERN = "http://finance.google.ca/finance/historical?q=%s:%s&startdate=%s&enddate=%s&output=csv"
    DATE_QUERY_PATTERN   = "%m+%d+%Y"
    
    
    def downloaderFunction(self, symbolCode, startDateTime, endDateTime):
        """ This method downloads the historical data from Google Finance
        
        >>> gfda = GoogleFinanceDownloadAdapter()
        >>> resultSet = gfda.downloaderFunction("GOOGL", datetime(2016,1,1), datetime(2017,1,1))
        >>> len(resultSet)
        252
        >>> resultSet = gfda.downloaderFunction("GOOGL", datetime(1990,1,1), datetime(2017,1,1))
        >>> len(resultSet)
        3114
        """
        
        # checking parameter type
        Utils.checkDatetimeParam(startDateTime, 'startDateTime')
        Utils.checkDatetimeParam(endDateTime, 'endDateTime')
        
        # getting the data 
        startdate = startDateTime.strftime(self.DATE_QUERY_PATTERN)
        enddate = endDateTime.strftime(self.DATE_QUERY_PATTERN)
        
        stock_url = "http://finance.google.ca/finance/historical?q=" + \
                    symbolCode + "&startdate=" + startdate + "&enddate=" + enddate + "&output=csv"
        raw_response = requests.get(stock_url).content
        
        # reading data into pandas dataframe
        data = pd.read_csv(io.StringIO(raw_response.decode('utf-8')))
                
        # identifying the name of the first column
        keyColumnName = data.columns[0]    # According to the downloaded CSV this sould be a column named 'Date'

        # formatting Date column
        def changeDateStr(string):
            dt = datetime.strptime(string, '%d-%b-%y')
            return dt
        data[keyColumnName] = data[keyColumnName].apply(changeDateStr)

        # use Date column as key
        data.set_index(keyColumnName, inplace=True)

        # sorting table
        data.sort_index(inplace=True)

        return data


Global function __getSP400ListFromWikipedia()__ searches in the web for current list of SP400 companies.

The reason of using a webscrapper is that the symbols of the SP400 are changing in time and the script always needs the current values.
<br />
The current values may be found here: <https://en.wikipedia.org/wiki/List_of_S%26P_400_companies>

In [None]:
# a fresh list of 400 companies (changes by time, needs to be scraped at runtime)
URL_WIKIPEDIA_SP400 = "https://en.wikipedia.org/wiki/List_of_S%26P_400_companies"

In [None]:
from bs4 import BeautifulSoup
import urllib.request
import doctest

doctest.testmod()


def getSP400ListFromWikipedia():
    """ Returns current SP400 symbol codes from Wikipedia
    
    Downloads HTML content, selects the first table and returns the values from the first column of the selected table.
    
    >>> len(getSP400ListFromWikipedia())
    400
    """
    
    TABLE_NUMBER = 0   # the first table's content should be downloaded from wikipedia
    
    with urllib.request.urlopen(URL_WIKIPEDIA_SP400) as response:
        
        # downloading html content
        html = response.read()
        
        # parsing html content
        htmlSoup = BeautifulSoup(html, 'lxml')
        if htmlSoup is None:
            return None
        
        # finding table in html code
        tableHtml = htmlSoup.findAll('table', class_='wikitable sortable')[TABLE_NUMBER]
        tableSoup = BeautifulSoup(str(tableHtml), 'lxml')
        
        # filtering relevant values from table
        return [x.td.a.text for x in tableSoup('tr') if x.td]
        

## Data layer

An instance of the __DataManager__ class can download historical data by using the __DownloadAdapter__'s method.
<br />
The downloaded historical data are loaded into pandas dataframes, the contents of the dataframes are saved into CSV files in ___data___ directory.
<br />
Because the download process sometimes fails and not all of the ticker symbols can be found in both of the datasources, therefore a retry logic is implemented which tries to download the files several times, but after that it gives up the process.
<br />
According to the task's criteria each OHLCV record should be downloaded exactly once, therefore a metadata about the properties of the dowload is being kept in an __sqlite__ database. The data is downloaded only if there was no download before for the given symbol name, datasource and the given interval. The downloaded CSV files are kept separately in the _data_ folder (where the name of the files are the keys which contain the symbol name, the interval and the datasource) until a new download process identifies their segmentation and merges them into a common file (if segments are overlaping by time).

In [None]:
from datetime import datetime, timedelta
import itertools
from intervaltree import Interval, IntervalTree
import doctest
import random
import sqlite3
import os
doctest.testmod()


class DataManager:
    """ Downloads and manages historical OHLCV and volume data
    
    Downloads each record exactly only once.
    Stores the downloaded history as CSV files locally.
    Stores the downloaded history's index in SQL database locally.
    Makes queries in the database after the index, checks the already stored data, if data is not present downloads it.
    The sqlite does not have datetime column type, text or integer type can be used for storing datetimes.
    
    TODOs:
    - Since INTEGER based UNIX timestamps are used in metadata database, TEXT based datetimes can be removed.
    """
    
    FILE_NAME_PATTERN = "%s.%s_%s.%s.csv"
    
    def __init__(self, downloadAdapterList = None):
        self.logger = PlatformLogger('DataManager')
        self.__checkAdapterUniqueness(downloadAdapterList)
        self.downloadAdapterList = downloadAdapterList
        self.dbPath = os.path.join(DATABASE_DIRECTORY_NAME, DATABASE_FILE_NAME)
        self.__initializeDataBase()
        
        
    
    def __initializeDataBase(self):
        """ Initializes the database for metadata if needed """
        
        conn = sqlite3.connect(self.dbPath)
        conn.execute('''create table if not exists METADATA
                (ID                 integer    primary key,
                 SYMBOL_CODE        char(10)   not null,
                 DATASOURCE         text       not null,
                 FILE_NAME          text       not null,
                 INTERVAL_START     text       not null,
                 INTERVAL_END       text       not null,
                 INTERVAL_START_TS  integer    not null,
                 INTERVAL_END_TS    integer    not null);''')
        conn.close()


        
    def __queryDatabase(self, queryString):
        """ Executes a query statement and returns the result set """
        
        conn = sqlite3.connect(self.dbPath)
        cursor = conn.execute(queryString)
        result = cursor.fetchall()
        conn.close()
        return result
    
    
    
    def __saveDataframeToCsv(self, symbol, dataframe, startDateTime, endDateTime, datasourceName, \
                             colNames = ['Date', 'Open', 'High', 'Low', 'Close', 'Volume']):
        """ Saves a pandas dataframe into a CSV file """
        
        Utils.checkStringParam(symbol, 'symbol')
        Utils.checkStringParam(datasourceName, 'datasourceName')
        Utils.checkDatetimeParam(startDateTime, 'startDateTime')
        Utils.checkDatetimeParam(endDateTime, 'endDateTime')
        
        # in downloaded dataframe the 'Date' column is used as key (due to the used API)
        assert(len(dataframe.columns) == len(colNames)-1), \
            "dataframe's column count (%s) does not match to given number of columns %s" % \
            (len(dataframe.columns), len(colNames))
            
        dataframe.index.name = colNames[0]
        dataframe.columns = colNames[1:]
        
        fileName = DataManager.FILE_NAME_PATTERN % (symbol, \
                                        Utils.convertDateTimeToString(startDateTime), \
                                        Utils.convertDateTimeToString(endDateTime), \
                                        datasourceName)
        filePath = os.path.join(DOWNLOAD_DIRECTORY_NAME, fileName)
        dataframe.to_csv(filePath, sep=',', encoding='utf-8')
        return filePath
    
    
    
    def __saveMetadataToDatabase(self, symbol, datasourceName, fileName,
                                 intervalStart, intervalEnd, intervalStartTs, intervalEndTs):
        """ Inserts a row into METADATA table in metadata.db"""
        
        Utils.checkStringParam(symbol, 'symbol')
        Utils.checkStringParam(datasourceName, 'datasourceName')
        Utils.checkStringParam(fileName, 'fileName')
        Utils.checkStringParam(intervalStart, 'intervalStart')
        Utils.checkStringParam(intervalEnd, 'intervalEnd')
        Utils.checkStringParam(intervalStartTs, 'intervalStartTs')
        Utils.checkStringParam(intervalEndTs, 'intervalEndTs')
        
        conn = sqlite3.connect(self.dbPath)
        conn.execute("INSERT INTO METADATA (SYMBOL_CODE, DATASOURCE, FILE_NAME, " + \
                     "INTERVAL_START, INTERVAL_END, INTERVAL_START_TS, INTERVAL_END_TS) \
              VALUES ('" + symbol + "', '" + datasourceName + "', '" + fileName + "', '" 
                         + intervalStart + "', '" + intervalEnd + "', '" 
                         + intervalStartTs + "', '" + intervalEndTs + "' )");
        conn.commit()
        conn.close()
    
    
    
    def __calculateIntervalsToDownload(self, plannedIntervalToDownload, alreadyStoredIntervals):
        """ Generates time intervals which were not dowloaded before
        
        Gets an interval Tuple(fromDatetime, toDatetime), check the already donwloaded intervals 
        List[Tuple(fromDatetime, toDatetime)] and computes the intervals which were never downloaded before.
        """
        
        # an offset must be used due to the inclusive interval borders of already downloaded data
        deltatime = timedelta(days=1)
        
        # planned interval will be choped by already stored intervals
        ti = IntervalTree([Interval(plannedIntervalToDownload[0], plannedIntervalToDownload[1])])
        ti.merge_overlaps()
        
        # choping all previously downloaded interval
        for storedInterval in alreadyStoredIntervals:
            ti.chop(storedInterval[0] - deltatime, storedInterval[1] + deltatime)
            
        return sorted(ti)
    
    
    
    def __checkAdapterUniqueness(self, adapterList):
        """ Checks if the list of adapters are uniquely named 
        
        The unique naming of the adapters ensures the different values for 'datasource' property in database.
        If the naming is unique, returns their names.
        """
        
        if (adapterList != None):
            adapterNames = list(map(lambda x: x.datasourceName, adapterList))

            # if the adapters are not uniquely named raise error
            if (len(adapterList) != len(set(adapterNames))):
                raise ValueError('The adapters are not unique.')
            
            return adapterNames
        
        return
    
    
    
    def __downloadSymbolData(self, listOfSymbols, adapterList, intervalStart, intervalEnd, maxRetryCnt = 10):
        """ Tries to download data for ticker symbols
        
        The function generates download tasks for each (symbol, adapter, max_retry) triplets and according to 
        the generated task list it tries to download the historical data. The donwloaded data is saved into
        the 'data' directory, the metadata (e.q. interval borders, filename etc.) is saved into database.
        """
        
        if(len(adapterList) == 0):
            raise ValueError('There is no adapter to download the data with.')
             
        adapterNames = self.__checkAdapterUniqueness(adapterList)
        
        # generate the tasks for downloading
        dataToDownload = set([x for x in itertools.product(listOfSymbols, adapterNames, [maxRetryCnt])])
        
        # executing the download tasks, where a download task looks like:
        # (symbolCode, datasource, cntOfRemainingRetries)    e.g. ('AAPL', 'google', '5')
        while ( 0 < len(dataToDownload)):

            # seleting a random task
            selectedTask = random.sample(set(dataToDownload), 1)[0]
            symbol, datasource, retryCnt = selectedTask
            dataToDownload.discard(selectedTask)
            
            try:
                for adapter in adapterList:
                    
                    # execute the download process with the proper adapter and save file and metadata
                    if (adapter.datasourceName == datasource):
                        data = adapter.downloaderFunction(symbol, intervalStart, intervalEnd)                    
                        filePath = self.__saveDataframeToCsv(symbol, data, intervalStart, intervalEnd, datasource)
                        self.__saveMetadataToDatabase(symbol, datasource, filePath, \
                                        Utils.convertDateTimeToString(intervalStart), \
                                        Utils.convertDateTimeToString(intervalEnd), \
                                        str(Utils.convertDateTimeToUnixTimeStamp(intervalStart)), \
                                        str(Utils.convertDateTimeToUnixTimeStamp(intervalEnd)))
                        self.logger.info("downloaded: " + str((symbol, datasource)) + "\t" + filePath)
                        break
            
            except:
                
                # if the selected download task was not executed, the retry count is decremented
                if (retryCnt > 1):
                    dataToDownload.add((symbol, datasource, retryCnt-1))
                counter = maxRetryCnt - retryCnt + 1
                self.logger.warning("failed to download [" + str(counter) + "/" + str(maxRetryCnt) + "]: " \
                               + str((symbol, datasource)))
    
    
    
    def templatedSelectQuery(self, selectQuery):
        """ Returns a templated function with two parameters to fill """
        
        def concreteSelectQuery(selectColumnList, whereClauseParameterTuple):
            """ Executes the query with two parameters 
            
            The first parameter is a list of colums to select which will be joined into string inside a 1-tuple.
            The second parameter is a N-tuple containing parameter values for the given selectQuery.
            Eventualy 1+N parameters will be substituted into query string (1 from first parameter, N from the second).
            """
            
            params = (", ".join(selectColumnList), ) + whereClauseParameterTuple           
            query = selectQuery % params
            return self.__queryDatabase(query)
        
        return concreteSelectQuery
    
    
    
    def __readMultipleCSVsIntoSingleDataframe(self, listOfCSVs):
        """ Concatenates multiple dataframes into one big dataframe """
        
        # concatenating dataframes
        dataframes = map(lambda x: pd.read_csv(x), listOfCSVs)       
        retDf = pd.concat(dataframes)
        
        # converting column type
        dateColName = retDf.columns[0]
        retDf[dateColName] = pd.to_datetime(retDf[dateColName], format="%Y-%m-%d")
        return retDf
    
    
    
    def __dropIntervalInplace(self, df, intervalStart, intervalEnd, \
                       inclusiveFilterStart = False, inclusiveFilterEnd = False):
        """ Drops unnecessary rows from dataframe """
        
        dateColName = df.columns[0]
        
        # filtering the rows in case of CSV's contents are covering a wider interval
        if (inclusiveFilterStart):
            maskStart = df[dateColName] <= intervalStart
        else:
            maskStart = df[dateColName] < intervalStart
        if (inclusiveFilterEnd):
            maskEnd = df[dateColName] >= intervalEnd
        else:
            maskEnd = df[dateColName] > intervalEnd
        mask = maskStart | maskEnd
        df.drop(df[mask].index, inplace=True)
    
    
    
    def getData(self, symbolOrSymbolList, interval, downloadAdapterList = None, maxRetryCnt = 10, \
                startInclusiveFilter = False, endInclusiveFilter = False):
        """ Returns dataframes for the requested time interval
        
        According to the information stored in metadata determines the intervals which were never dowloaded before.
        Downloads the unseen data. Combines the relevant (already stored and downloaded) data into a pandas dataframe.
        
        The returned dictionaries can be addressed by the following ways:
        returnMultiIndex[_]                    # data can be accessed by any addressing of the following solutions
        returnTupleDict0[('ANN', 'google')]    # in case of both parameters are known
        returnMultiDict1['ANN']['google']      # in case of symbol parameter is known
        returnMultiDict2['google']['ANN']      # in case of datasource parameter is known
        """
        
        # checking the input parameters
        if (isinstance(symbolOrSymbolList, str)):
            symbolOrSymbolList = [symbolOrSymbolList]
            
        assert (isinstance(symbolOrSymbolList, (list, tuple))), \
            "symbolOrSymbolList parameter is not list nor tuple, it is a(n) %s" % type(symbolOrSymbolList)
        assert (len(interval) == 2), \
            "interval parameter is not a tuple with size of 2, it's size is %s" % len(interval)
            
        Utils.checkDatetimeParam(interval[0], "interval[0]")
        Utils.checkDatetimeParam(interval[1], "interval[1]")
                
        if (downloadAdapterList == None or len(downloadAdapterList) == 0):
            downloadAdapterList = self.downloadAdapterList
        
        # logging the valid request for getting data
        adapterNames = self.__checkAdapterUniqueness(downloadAdapterList)
        self.logger.info("getting data: " + str(adapterNames) + "\t" \
                         + str(symbolOrSymbolList) + "\t" + str(interval))
        
        # getting a templated query function for multiple execution
        selectTemplate = self.templatedSelectQuery("select %s from METADATA where SYMBOL_CODE='%s' and DATASOURCE='%s';")
        columnsToQuery1 = ['INTERVAL_START_TS', 'INTERVAL_END_TS']
        columnsToQuery2 = ['FILE_NAME']
        
        # dictionaries to return
        returnMultiIndex = {}
        returnTupleDict0 = {}
        returnMultiDict1 = {}
        returnMultiDict2 = {}
        
        # determines the unseen intervals for (symbol, adapter) pairs and downloads the data, then constructs the 
        # return object
        for symbol in symbolOrSymbolList:
            for adapter in downloadAdapterList:
                
                # queries the database for already stored CSV files what are matching to the given (symbol, adapter) pair
                timestampRecords = selectTemplate(columnsToQuery1, (symbol, adapter.datasourceName))
                
                # creates inputs for interval calculation and determines the missing intervals
                alreadyStoredIntervals = [(Utils.convertUnixTimeStampToDateTime(x[0]), \
                                           Utils.convertUnixTimeStampToDateTime(x[1])) \
                                          for x in timestampRecords]
                unknownIntervalsToDownload = self.__calculateIntervalsToDownload(interval, alreadyStoredIntervals)
                
                # if necessary, downloads the missing intervals
                if (unknownIntervalsToDownload != None and len(unknownIntervalsToDownload) > 0):
                    for intervalToDownload in unknownIntervalsToDownload:
                        self.__downloadSymbolData([symbol], [adapter], \
                                                  intervalToDownload[0], intervalToDownload[1], maxRetryCnt)
                
                # requery the database for the full (possibly some new) set of matching data
                csvPathRecords = selectTemplate(columnsToQuery2, (symbol, adapter.datasourceName))
                
                # generate a single dataframe of all relevant data regarding to the given symbol and datasource
                csvPathList = [x[0] for x in csvPathRecords]
                jdf = self.__readMultipleCSVsIntoSingleDataframe(csvPathList)
                
                # cleaning and filtering data
                InputPreprocessor.cleanDataframe(jdf)
                self.__dropIntervalInplace(jdf, interval[0], interval[1], startInclusiveFilter, endInclusiveFilter)
                InputPreprocessor.cleanDataframe(jdf)
                
                # returns the dataframes addressed by four dictionaries for convenient handling
                returnMultiDict1.setdefault(symbol, {})[adapter.datasourceName] = jdf
                returnMultiDict2.setdefault(adapter.datasourceName, {})[symbol] = jdf
                returnTupleDict0[(symbol, adapter.datasourceName)] = jdf
                returnMultiIndex.setdefault(symbol, {})[adapter.datasourceName] = jdf
                returnMultiIndex.setdefault(adapter.datasourceName, {})[symbol] = jdf
                returnMultiIndex[(symbol, adapter.datasourceName)] = jdf
                
        return returnMultiIndex, returnTupleDict0, returnMultiDict1, returnMultiDict2
    
    
    
    def getConsolidatedData(self, symbolOrSymbolList, interval, maxRetryCnt = 10, intervalExtension = 10):
        """ Returns the colsolidated OHLCV data of both datasources
        
        """
        
        deltatime = timedelta(days = abs(intervalExtension))
        extendedInterval = (interval[0]-deltatime, interval[1]+deltatime)
        
        self.logger.info("getting consolidated data: " \
                         + str(symbolOrSymbolList) + "\t" + str(interval) + "\t" + str(intervalExtension))
        
        gfda = GoogleFinanceDownloadAdapter()
        yfda = YahooFinanceDownloadAdapter()
        
        _, _, md1, _ = self.getData(symbolOrSymbolList, extendedInterval, \
                                         downloadAdapterList=[gfda, yfda], maxRetryCnt=maxRetryCnt)
        
        # dictionaries to return
        returnMultiIndex = {}
        returnTupleDict0 = {}
        returnMultiDict1 = {}
        returnMultiDict2 = {}
        
        # determines the unseen intervals for (symbol, adapter) pairs and downloads the data, then constructs the 
        # return object
        for symbol in md1.keys():
            
            keys = list(md1[symbol].keys())
            df1 = md1[symbol][keys[0]]
            df2 = md1[symbol][keys[1]]
            
            print(symbol, keys)
            print(df1)
            print(df2)
            
            
        
        return None, None, None, None
        

Although the downloading of all records only one time is ensured, it can be seen that the multiple download requests can cause fragmentation on data and metadata, which is hidden from the user. The fragmented data and metadata can be compacted into coherent segments, but solving this problem was not in the scope of this task.
<br />
Further option to improve the DataManager is to store SP400 symbols in a database and update from Wikipedia only if needed. This solution is out of the current scope too.

## Downloading data

In [None]:
# interval border constants
DATA_START          = datetime(2016,1,1)
DATA_END            = datetime(2017,1,1)
VARIANCE_START      = datetime(2016,2,11)
VARIANCE_END        = datetime(2016,11,8)
MINMAX_START        = datetime(2016,1,18)    # including
MINMAX_END          = datetime(2016,10,18)   # excluding
STD_START           = datetime(2016,4,17)
STD_END             = datetime(2016,12,5)


In [None]:
listOfSymbols = getSP400ListFromWikipedia()
truncList = listOfSymbols[3:6]

gfda = GoogleFinanceDownloadAdapter()
yfda = YahooFinanceDownloadAdapter()

dm = DataManager([gfda, yfda])
_, _, _, multiDict2 = dm.getData(truncList, (DATA_START, DATA_END))


## Calculating data fringerprints

There are several ways to calculate the data fingerprints.<br/>
- A solution can be iterating over the datasources, then iterating over the possible days and query the symbol tables for matching records and calculate the (5 dimensional) value vector (the __Open__, __High__, __Low__, __Close__, __Volume__ aggregates), then save each calculated value into a new dataframe/CSV file.
- Another way to do is to append after each other the symbol tables, add new columns to the new table which are indicating the __Symbol__ and __Datasource__ values, then calculate the aggregations with a __group-by(Datasource, Symbol, Date)__ grouping.

Since the tables are containing very few records (for a year), the sizes of the CSV files are typically less than 100 kbytes, storing approximately 2x400x100 kbytes in memory should not be a problem for an avarage personal computer. The problem does not require big data computation architecture or algorithms, that's why a mixture of the above solutions was selected for impelemtation.

In [None]:
import numpy as np

# function for adding a new column to a dataframe
def addColumnToDataframe(dataframe, columnName, columnContent):
    dataframe[columnName] = columnContent

# generates a new column into the dataframe with symbol's name
def fillDataframeWithSymbolName(dataframe, symbol):
    addColumnToDataframe(dataframe, "Symbol", np.full(dataframe.shape[0], symbol, dtype=object))


# getting data for fingerprint calculation by using the intervals
_, _, _, dataSetForMeanCalculation   = dm.getData(truncList, (DATA_START, DATA_END))
_, _, _, dataSetForVarCalculation    = dm.getData(truncList, (VARIANCE_START, VARIANCE_END))
_, _, _, dataSetForMinMaxCalculation = dm.getData(truncList, (MINMAX_START, MINMAX_END),     endInclusiveFilter=True)
_, _, _, dataSetForStdCalculation    = dm.getData(truncList, (STD_START, STD_END),           endInclusiveFilter=True)


In [None]:
# decorator for executing the same pre and post computation for each calculation type
def prepareDataAndProcessResults(fileNamePart):
    
    def decorator_func(calculationToExecute):
        
        def wrapper_func(*args, **kwargs):

            # abbreviating the variable name 
            datasource = args[0]
            df = args[1][datasource]

            # getting symbols from the dataframes
            symbols = list(df.keys())
            
            # adding 'symbol' column to each dataframe
            [fillDataframeWithSymbolName(df[symbol], symbol) for symbol in symbols]

            # creating and reindexing concatenated dataframe
            dataframe = pd.concat([df[symbol] for symbol in symbols])
            InputPreprocessor.cleanDataframe(dataframe, uniqueColNameList = ["Date", "Symbol"])
            
            # calculating the result table
            retDf = calculationToExecute(dataframe)
            
            # write the results into CSV files
            fileName = "%s__%s.csv" % (datasource, fileNamePart)
            filePath = os.path.join(FINGERPRINTS_DIRECTORY_NAME, fileName)
            retDf.to_csv(filePath, sep=',', encoding='utf-8')
            
            with pd.option_context('display.max_rows', None):
                print(fileName)
                display(retDf)
            
        return wrapper_func
    
    return decorator_func


# defining calculation types

@prepareDataAndProcessResults(fileNamePart = "mean")
def calculateMean(dataframe):
    return dataframe.groupby( [ "Date"] ).mean()


@prepareDataAndProcessResults(fileNamePart = "std")
def calculateStd(dataframe):
    return dataframe.groupby( [ "Date"] ).std()


@prepareDataAndProcessResults(fileNamePart = "lowest_highest")
def calculateMinMax(dataframe):
    df = dataframe.groupby( [ "Symbol"] )["Close"].agg(['min', 'max'])
    df.columns = ["lowest_close", "highest_close"]
    df.index.name = "ticker"
    df.sort_index(inplace=True)
    return df


@prepareDataAndProcessResults(fileNamePart = "var")
def calculateVar(dataframe):
    return dataframe.groupby( [ "Date"] ).var()


# iterating over the datasets returned by the data layer
def calculateFingerprints(dict_dataset, calculateFingerprint):

    # iterating over datasources
    for datasource in dict_dataset.keys():
        
        calculateFingerprint(datasource, dict_dataset)


# calculating Mean, Variance values and Min/Max prices over all ticker symbol for each day and some or all data fields 
calculateFingerprints(dataSetForMeanCalculation,    calculateMean)
calculateFingerprints(dataSetForStdCalculation,     calculateStd)
calculateFingerprints(dataSetForMinMaxCalculation,  calculateMinMax)
calculateFingerprints(dataSetForVarCalculation,     calculateVar)


In the solution above an iteration went through the datasources where a new column with the symbol names was added to each dataframe.<br />
Four aggregation methods were defined (for calculating __mean__, __std__, __variance__ and __minmax__ values) which were executed with python's decorator pattern, where auxilary operations were executed before and after the concrete calculation. These supplementary operations were e.g.: creating a big, concatenated table, cleaning the concatenated table by using two keys (_Date_ and _Symbol_) and finally writing the results into dataframe after the computation was executed.

At this point each pandas dataframe (for computing datasource difference) is already extended with an extra column containing symbol values.

In [None]:
def computeStdOfDatasourceDifference(df1, df2, fileName):
    """ This function calculates the difference between the datasources """
    
    # getting the dataframes by their keys from the dictionary
    concatenated_dataframe_google = pd.concat([df1[symbol] for symbol in df1.keys()])
    concatenated_dataframe_yahoo  = pd.concat([df2[symbol] for symbol in df2.keys()])
    
    # cleaning data
    InputPreprocessor.cleanDataframe(concatenated_dataframe_google, uniqueColNameList = ["Date", "Symbol"])
    InputPreprocessor.cleanDataframe(concatenated_dataframe_yahoo,  uniqueColNameList = ["Date", "Symbol"])
    
    # complex key creation for table subtraction
    concatenated_dataframe_google.set_index(['Date', 'Symbol'], inplace=True)
    concatenated_dataframe_yahoo.set_index(['Date', 'Symbol'], inplace=True)
    
    # compute the subtracted table
    subtracted = concatenated_dataframe_google.subtract(concatenated_dataframe_yahoo)
    
    # compute the STD over the subtracted table
    retDf = subtracted.groupby( [ "Date"] ).std()
    
    # write the results into CSV files
    filePath = os.path.join(FINGERPRINTS_DIRECTORY_NAME, fileName)
    retDf.to_csv(filePath, sep=',', encoding='utf-8')
    
    with pd.option_context('display.max_rows', None):
        print(fileName)
        display(retDf)


In [None]:
# computing the deviation of the difference between the datasources
computeStdOfDatasourceDifference(dataSetForStdCalculation['google'], \
                                 dataSetForStdCalculation['yahoo'], \
                                 'google_yahoo__comparison.csv')


In the above solution two big tables were generated containing every OHLCV data of the two datasources which were cleaned, reindexed and identified duplicates were filtered out. Duplicates were identified by the key set: _Date_ and _Symbol_.<br />
A big table with the element wise subtraction of the two original tables was calculated (where the keys stayed the original _Data_ and _Symbol_).<br />On this subtracted table an aggregation operation (an __std__ calculation with a __group-by__) was applied by the _Date_ column.

# Merging the data

There are several ways to consolidate the data of the two different datasources, but in almost every solution this process consists of these sub steps:
- Identify missing values
- Identify outliers
- Calculate approximations for consolidated data (while considering the missing and outlier values)

In [None]:
intervalExtension = 5
dm.getConsolidatedData(truncList, (DATA_START, DATA_END), intervalExtension = intervalExtension)


The implemented solution does the following:
- Flags the outlier records by using the __3-sigma rule__ on the pricing and volume data
  - Since the pricing data in each column of the dataframe originates from a time series by aggregation operators, it's logical to extract and concatenate the values from all ___pricing data columns___ and the ___volume___ column, then apply a 1-dimensional filtering on them, instead of applying 1-dimensional filtering on each (_Open_, _High_, _Low_, _Close_) column
    - Using a filtering method for 2 dimensional space (over the _pricing_ and _volume_ data) probably would be a better but more complex solution here
  - Any other outlier detection mechanism could be used here instead of the 3-sigma rule. Probably a more effective solution would be using here a clustering algorithm in 2 dimensional space (_pricing data_ and _volume data_), but that would require much more experimentation and it's probably not in the scope ot this demonstrative application
- Creates a merged table by both datasources for the given symbol, which are containing every pricing and volume data columns of the original tables indexed by _Date_
- Uses __weighted K-Nearest Neighbors regression__ algorithm to approximate the real value

In [None]:
df = dataSetForMinMaxCalculation['google']['ACIW']
r, f, o = InputPreprocessor.filterOutliers(df)