### Pipeline ID : marketdirection

### Input Description

RAW OHLC data.

### Output Features 

A Dataset representing the normalised OHLC bars for X time periods, 

### Output Classes

Binary classification representing whether the Y time periods were cumulatively up (1) or down (0).

### Operations

This code takes a financial market data file and runs it through a processing pipeline. The following operations are carried out :

- Localise the time data to market time
- Merge with existing RAW data based on datetime
- Crop data
- Resample data
- Normalise data 
- Encode class data
- Concatenate additional columns
- Shuffle data (removed)
- Add Bias
- Split into Traning/Validation/Test set (removed)
- Visualise data
- Save the resulting RAW data to HDF5
- Save the resulting datasets to CSV
- Save the resulting datasets to Cloud Object Storage
- Post the resulting datasets to the Market Insights API (todo)

In [1]:
%%writefile functions/marketdirection.py

import json
import pandas
import quantutils.dataset.pipeline as ppl

def executePipeline(args):
    
    ### PIPELINE ###################        
    ## Process ready for Machine Learning:
    ##
    ## Resample feature units
    ## Crop on time for feature
    ## Resample class units
    ## Crop on time for class
    ## Convert to Feature Sets
    ## Encode the class
    ## Concat the two
    
    data = pandas.read_json(json.dumps(args["data"]), orient='split')
    dataset = args["dataset"]
    features = dataset["features"]
    labels = dataset["labels"]

    ## Resample all to dataset sample unit (to introduce nans in all missing periods)
    featureData = ppl.resample(data, features["sample_unit"]) 

    featureData = ppl.localize(featureData, "UTC", dataset["timezone"])

    print("Features..")

    featureData = ppl.cropTime(featureData, features["start_time"], features["end_time"])

    featureData = ppl.toFeatureSet(featureData, features["periods"])
    
    featureData = ppl.normaliseCandlesticks(featureData, allowZero=True)

    print("Labels..")

    classData = ppl.resample(data, labels["sample_unit"]) 

    classData = ppl.localize(classData, "UTC", dataset["timezone"])

    classData = ppl.cropTime(classData, labels["start_time"], labels["end_time"])

    classData = ppl.toFeatureSet(classData, labels["periods"])

    classData = ppl.encode(classData, labels["encoding"])   
    
    # 23/01/18 - Set index to reflect the predicted time.
    indexedData = ppl.concat(featureData, classData)
    indexedData.index = classData.index

    csvData = indexedData
    #csvData = csvData.append(indexedData)
    # 24/01/18 - Save multiple datasets, one per asset. Provides more flexibility for a range of models.
    # If interleaving of asset data is required then this occurs at training time.
    ########################################

    csvData = ppl.removeNaNs(csvData)

    # 13/07 - Added datetime index, and replaced shuffled data with sorted
    # To avoid a) bias due to correlation between markets, b) bias due to lookahead during training
    #csvData = shuffle(csvData)
    csvData = csvData.sort_index()

    # 26/09 - Split data at training time, rather than pipeline. Also save data to Object Store.
    #csvData_train, csvData_val, csvData_test = split(csvData, train=.6, val=.2, test=.2)

    return json.loads(csvData.to_json(orient='split', date_format="iso"))

Overwriting functions/marketdirection.py


In [2]:
def loadMarketData(market_list, sample_unit):
    
    markets = dict()

    # TODO : Should this be behind an API? Pipeline shouldn't care about datasource, only market and maybe sample unit.
    ## Loop over datasources...
    for market in market_list:
        
        for datasource in market["datasources"]:

            DS_path = CONFIG["dataPath"] + datasource["source"] + "/"
            SRC_path = DS_path + "raw/"

            # Get HDFStore
            hdfFile = DS_path + datasource["source"] + ".hdf"
            print(hdfFile)
            hdfStore = pandas.HDFStore(hdfFile)
            

            # Load Dataframe from store
            tsData = hdfStore[datasource["name"]]                        

            ## Crop selected data set to desired ranges

            tsData = ppl.cropDate(tsData, datasource["crop"]["start"], datasource["crop"]["end"])

            ## Resample all to dataset sample unit (to introduce nans in all missing periods)

            tsData = ppl.resample(tsData, datasource["sample_unit"])
            
            tsData = ppl.resample(tsData, sample_unit) 
            
            # 06/06/18
            # Remove NaNs and resample again, to remove partial NaN entries before merging
            tsData = ppl.removeNaNs(tsData)
            tsData = ppl.resample(tsData, sample_unit)

            if market["name"] not in markets:
                markets[market["name"]] = pandas.DataFrame()

            markets[market["name"]] = ppl.merge(tsData, markets[market["name"]])
            
            hdfStore.close()
    
    return markets

In [3]:
def execPipeline(data, ppl_desc):
    csvData = executePipeline( {"data":json.loads(data.to_json(orient='split', date_format="iso")), "dataset":ppl_desc} )
    return pandas.read_json(json.dumps(csvData), orient='split')

In [4]:
def execPipelineAPI(data, ppl_desc):
    print("Executing Pipeline...")
    csvData = {"data":json.loads(data.to_json(orient='split', date_format="iso")), "dataset":ppl_desc} 
    return fun.call_function("marketdirection", csvData)

In [1]:
# HARNESS ##

#!pip install --upgrade ../../quantutils
%run functions/marketdirection.py

import os
import json
import pandas
import numpy
    
import quantutils.dataset.pipeline as ppl
from quantutils.api.auth import CredentialsStore
from quantutils.api.bluemix import ObjectStore
from quantutils.api.marketinsights import MarketInsights
from quantutils.api.functions import Functions

CONFIG_FILE = "../../marketinsights-datasets/WallSt-FinalTradingHour.json"
dataset_desc_file = "../../marketinsights-datasets/dataset_desc.json"
markets_file = "../../marketinsights-datasets/markets.json"

with open(CONFIG_FILE) as data_file:    
    CONFIG = json.load(data_file)    

with open(dataset_desc_file) as data_file:    
    dataset_desc = json.load(data_file)

with open(markets_file) as data_file:    
    markets = json.load(data_file)

ppl_desc = dataset_desc["pipeline"]["pipeline_desc"]

credStore = CredentialsStore()
mi = MarketInsights(credStore)
fun = Functions(credStore)

markets = loadMarketData(markets, ppl_desc["features"]["sample_unit"])

# 24/01/18 csvData = pd.DataFrame()
for market, data in markets.items():
    csvData = execPipelineAPI(data, ppl_desc)
    csvData = ppl.localize(csvData, "UTC", ppl_desc["timezone"])
    # Visualise
    ppl.visualise(csvData, ppl_desc["features"]["periods"], 5)

    ## Save output
    #ppl.save_csv(csvData, "".join([OUT_path, config["name"],"_",market,".csv"]))
    #objStore.put_file("Experiment2", "".join([OUT_path, config["name"],"_",market,".csv"]), "".join([config["name"],"_",market,".csv"]))
    #mi.put_dataset(csvData, dataset_desc, market)
    



FileNotFoundError: [Errno 2] No such file or directory: '/home/cwilkin/.quantutils/MIOapi_cred.json'

In [None]:
from quantutils.api.marketinsights import Dataset
obj = Dataset.csvtojson(ppl.cropTime(ppl.removeNaNs(data["2016-07-06":"2016-07-15"]), "17:00", "21:00"), None, None, False)

In [None]:
str(obj)

In [6]:
mi.get_dataset(market, PIPELINE_ID)

Unnamed: 0_level_0,0,1,2,3,4,5,6,7,8,9
Date_Time,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1,Unnamed: 8_level_1,Unnamed: 9_level_1,Unnamed: 10_level_1
2013-01-02 15:00:00-05:00,0.390661,0.826167,0.199723,0.508553,0.503930,1.000000,0.000000,0.629219,0.040237,1
2013-01-03 15:00:00-05:00,0.766138,1.000000,0.585742,0.943056,0.943056,0.943925,0.000000,0.486633,0.085589,0
2013-01-04 15:00:00-05:00,0.096931,1.000000,0.000000,0.631664,0.632741,0.905223,0.340334,0.847604,0.034544,1
2013-01-07 15:00:00-05:00,0.078826,0.468008,0.000000,0.337731,0.337401,1.000000,0.253628,0.984499,0.056402,1
2013-01-08 15:00:00-05:00,0.000000,0.747823,0.000000,0.685475,0.684779,1.000000,0.344479,0.518635,0.053407,1
2013-01-09 15:00:00-05:00,0.703476,0.961984,0.383418,0.772629,0.773353,1.000000,0.000000,0.005069,0.051379,1
2013-01-10 15:00:00-05:00,0.174452,0.433290,0.000000,0.425806,0.425032,1.000000,0.084129,0.888516,0.072084,1
2013-01-11 15:00:00-05:00,0.693603,0.799904,0.000000,0.019721,0.020683,1.000000,0.019240,0.774892,0.038674,1
2013-01-14 15:00:00-05:00,0.254461,0.703783,0.000000,0.312991,0.313704,1.000000,0.151677,0.835475,0.052123,0
2013-01-15 15:00:00-05:00,0.444072,0.641996,0.000000,0.046551,0.047890,1.000000,0.047890,0.963831,0.055546,1
