## Dataset Pipeline

This code takes a financial market data file and runs it through a processing pipeline. The following operations are carried out :

- Localise the time data to market time
- Merge with existing dataset based on datetime
- Crop data
- Resample data
- Normalise data 
- Encode category data
- Concatenate additional columns
- Shuffle data
- Split into Traning/Validation/Test set
- Visualise data
- Save the results to HDF5
- Save the results to CSV

The pipeline functions are included below, but are also part of the quantutils library provided by this github repository where this notebook lives.

# Note : This notebook should be run on your local filesystem

In [None]:
## 
## Merge data
##

def merge(newData, existingData):
    print "Merging data..."
    return existingData.combine_first(newData)

In [None]:
##
## Shuffle data
##
def shuffle(data):
    return data.sample(frac=1).reset_index(drop=True)

In [None]:
##
## Concatenate Columns
##

def concat(data1, data2):
    print("Concatenating features %s with classifications %s" % (data1.shape, data2.shape))
    return pandas.DataFrame(numpy.concatenate([data1.values, data2.values], axis=1))

In [None]:
##
## Crop
##

def cropDate(data, start, end):
    return data[start:end]

def cropTime(data, start, end):
    return data.between_time(start, end, include_start=True, include_end=False)


In [None]:
##
## Resample
##

def resample(data, sample_unit):
    print("Resampling to %s periods" % sample_unit)
    order = data.columns
    return data.resample(sample_unit).agg({'Open': 'first', 'High': lambda x : x.max(skipna=False), 'Low': lambda x : x.min(skipna=False),'Close': 'last'})[order]


In [None]:
##
## Remove Missing Data (NaN)
##

def removeNaNs(data):
    return data.dropna()

In [None]:
##
## Convert to Feature Matrix
##

def toFeatureSet(data, feature_periods):
    n = data.values.shape[1] * feature_periods
    return pandas.DataFrame(reshape(data, n))

def reshape(ts, n):
    return numpy.reshape(ts.values[0:ts.values.size / n * n / ts.values.shape[1]], (ts.values.size / n, n))

In [None]:
##
## Encode classification
##

def encode(data, encoding):
    nanIndex = data.isnull().any(axis=1)
    if (encoding == "binary"):
        df = pandas.DataFrame((data.values[:,-1] > data.values[:,0]).astype(float))
    if (encoding == "one-hot"):
        df = pandas.DataFrame(numpy.column_stack
                                ([(data.values[:,-1] > data.values[:,0]).astype(float), 
                                  (data.values[:,0] > data.values[:,-1]).astype(float)])
                                )
    df[nanIndex] = numpy.nan
    return df


In [None]:
##
## Convert to local time zone
##
import pytz
def localize(data, datasource, dataset):
    print "Converting " + dataset["name"] + " from " + datasource["timezone"] + " to " + dataset["timezone"]
    timezone = pytz.timezone(datasource["timezone"])
    data.index = data.index.tz_localize("Europe/London").tz_convert(timezone)
    return data

In [None]:
##
## Normalise (Candlesticks)
##
def normaliseCandlesticks(data):
    X = data.values
    Xmax = X.max(axis=1)[numpy.newaxis].T
    Xmin = X.min(axis=1)[numpy.newaxis].T
    scale = Xmax - Xmin
    X = (X - Xmin) / scale
    return pandas.DataFrame(numpy.hstack((X,scale / numpy.nanmax(scale))))

In [None]:
##
## Split (Train/Val/Test)
##
def split(data, train=.6, val=.2, test=.2):
    idx = numpy.arange(0,len(data)) / float(len(data))
    msk1 = data[idx<train]
    msk2 = data[(idx>=train) & (idx<(train + val))]
    msk3 = data[(idx>=(train+val))]
    return [msk1, msk2, msk3]

In [None]:
##
## Merge New Data
##

def mergeNewData(data, datasource, dataset, SRC_path):
    
        ## Loop over any source files...
        for infile in os.listdir(SRC_path):          

            newData = loadRawData(datasource, dataset, SRC_path, infile)
            if not newData is None:

                ### RAW PIPELINE #############################################

                newData = localize(newData, datasource, dataset)
                
                data = merge(newData, data)
                
                
                

      

                ##############################################################
        
        return data
    

def loadRawData(datasource, dataset, srcPath, infile):

    if infile.lower().startswith(dataset["name"].lower()):

        print "Adding " + infile + " to " + dataset["name"] + " table"

        ## Load RAW data (assume CSV)
        return pandas.read_csv(srcPath + infile, 
                                  index_col=datasource["index_col"], 
                                  parse_dates=datasource["parse_dates"], 
                                  dayfirst=datasource["dayfirst"]
                                 )        
    else:
        return None


In [None]:
##
## Save data
##
def save_hdf(data, dataset, hdfStore):
    hdfStore.put(dataset["name"], data, format='table')
    print "Saved data to HDFStore: /" + dataset["name"]
    return data

def save_csv(data, filename):
    data.to_csv( filename, mode="w", header=False, index=False)
    print "Saved data to " + filename
    return data

In [None]:
import plotly
import plotly.offline as py
from plotly.tools import FigureFactory as FF
#from plotly import tools as pt
from plotly.graph_objs import *

plotly.offline.init_notebook_mode() # run at the start of every ipython notebook

##
## Visualise
##
def visualise(data, periods, count):
    csticks = data.values[0:count:,:periods*4].ravel().reshape(-1,4)

    fig = FF.create_candlestick(csticks[:,0], csticks[:,1], csticks[:,2], csticks[:,3])

    py.iplot(fig, filename='jupyter/simple-candlestick', validate=True)
    

In [None]:
## Set up the environment
import os
import json
import pickle
import pandas
import numpy

CONFIG_FILE = "datasets/config.json"

with open(CONFIG_FILE) as data_file:    
    config = json.load(data_file)

DS = config["datasources"]
FEATURES = config["features"]
CLASS = config["class"]

## Loop over datasources...

for datasource in DS:
    
    DS_path = config["dataPath"] + datasource["name"] + "/"
    SRC_path = DS_path + "raw/"
        
    # Create folder structure
    if not os.path.exists(SRC_path):
        os.makedirs(SRC_path)
    
    # Get HDFStore
    hdfFile = DS_path + datasource["name"] + ".hdf"
    hdfStore = pandas.HDFStore(hdfFile)
    
    for dataset in datasource["datasets"]:
        
        # Load Dataframe from store
        if dataset["name"] in hdfStore:
            storedData = hdfStore[dataset["name"]]
        else:
            storedData = pandas.DataFrame()
            
        ### PIPELINE ###################
        
        ## Resample all to dataset sample unit (to introduce nans in all missing periods)
        ## Resample feature units
        ## Crop on time for feature
        ## Resample class units
        ## Crop on time for class
        ## Convert to Feature Sets
        ## Encode the class
        ## Concat the two
                        
        storedData = mergeNewData(storedData, datasource, dataset, SRC_path)     
        
        data = cropDate(storedData, dataset["crop"]["start"], dataset["crop"]["end"])

        ## Clean selected data set
        
        data = resample(data, dataset["sample_unit"])
        
        featureData = resample(data, FEATURES["sample_unit"]) 
        
        featureData = cropTime(featureData, FEATURES["start_time"], FEATURES["end_time"])
        
        featureData = toFeatureSet(featureData, FEATURES["periods"])
        
        featureData = normaliseCandlesticks(featureData)
        
        classData = resample(data, CLASS["sample_unit"]) 
        
        classData = cropTime(classData, CLASS["start_time"], CLASS["end_time"])
        
        classData = toFeatureSet(classData, CLASS["periods"])
        
        classData = encode(classData, CLASS["encoding"])   
             
        csvData = concat(featureData, classData)
             
        csvData = removeNaNs(csvData)
        
        csvData = shuffle(csvData)
        
        csvData_train, csvData_val, csvData_test = split(csvData, train=.6, val=.2, test=.2)
        
        visualise(csvData_train, FEATURES["periods"], 5)
       
      
        ########################################
        
        ## Save output
        save_hdf(storedData, dataset, hdfStore)
        save_csv(csvData, DS_path + dataset["name"] + ".csv")
        save_csv(csvData_train, DS_path + dataset["name"] + "_train.csv")
        save_csv(csvData_val, DS_path + dataset["name"] + "_val.csv") 
        save_csv(csvData_test, DS_path + dataset["name"] + "_test.csv") 

    hdfStore.close()
