In [50]:
import sys
import numpy as np
import pandas as pd
import mxnet as mx
from time import time
import csv
import os
import os.path
from datetime import datetime
from scipy.stats.stats import pearsonr

device = mx.gpu()



def GPUComputeKPIs_PCC(normalizedDataPath, outputPath, doVerify, verifySourcePath, VerifyPlaces):
    # This method creates a MXNet Graph of actions that action as a function
    # that will take in a table of X days of stock close prices for several stocks.
    # This graph function will go through a series of matrix multiplication batches.
    # For this to work the data must be pre-L2Normalized.
    # This process custom batches the data specifically to avoid computing the whole matrix since
    # the upper right and lower left corners of the matrix are duplicates.
    # This method is described here: 
    # https://scholarworks.wmich.edu/cgi/viewcontent.cgi?referer=&httpsredir=1&article=1008&context=pcds_reports

    
    t1=time()
    stocksCSV = pd.read_csv(normalizedDataPath, delimiter='\t', dtype=np.float64)
    # Load all stocks into gpu memory
    stocks = mx.nd.array(stocksCSV[:],mx.gpu(), dtype='float64')
    # Setup variables
    gpu_stocks = mx.sym.Variable('gpu_stocks', dtype='float64')
    currentStock = mx.sym.Variable('currentStock', dtype='float64')
    stocksRemain = mx.sym.Variable('stocksRemain', dtype='float64')
    corr_matrix_window = mx.sym.Variable('corr_matrix_window', dtype='float64')
    stock_count = mx.sym.Variable('stock_count')
    # Not going to use this third transformation matrix so just give it a generic name as a placeholder
    C = mx.sym.Variable('C', dtype='float64')
    stock_count = int(stocks.shape[1])
    C = mx.sym.zeros((0,stock_count), dtype='float64')
    sym_group = []
    for i in range(stock_count-1):
        # Get the current column containing a stock
        currentStock = gpu_stocks.slice_axis(axis=1,begin=i,end=i+1)
        
        # Get the remaining stocks
        stocksRemain = gpu_stocks.slice_axis(axis=1,begin=i+1,end=stock_count)
        C = mx.sym.zeros((1,stock_count-(i+1)), dtype='float64')
        
        # Do the matrix multiplication
        corr_matrix_window = mx.symbol.linalg_gemm(currentStock, stocksRemain, C, transpose_a=True)
        
        # Add these actions to the graph
        sym_group.append(corr_matrix_window)

    symResults = mx.symbol.Group(sym_group)    

    # Now the graph is fully defined and we feed in the stock data and instruct MXNet to use the GPU
    workflow = (symResults).bind(mx.gpu(), {'gpu_stocks':stocks})

    # Execute the graph workflow to get the final PCC score
    results = workflow.forward()

    t2=time()

    print("")
    print("MXNet PCC results:")
    print("Execute Time in Seconds:")
    print(t2-t1)
    valuesIndex = 0
    ppcValues = []
    
    # Get data into a format to output
    for i in range(len(stocksCSV.columns)-1):
            ppcValues.append(results[i][0].asnumpy().astype('float64'))
    
    print('Start Write: ' + str(datetime.now()))
    if os.path.exists(outputPath):
        os.remove(outputPath)
    with open(outputPath, 'w', newline='') as f:
        writer = csv.writer(f)    
        
        # Are we going to do another PCC the very slow way and add it on as a column so we can verify our values?
        # Write out a column header
        if doVerify:
            writer.writerow(['StockA','StockB','Nvidia_PCC_Method','Python_PCC_Method'])
        else:
            writer.writerow(['StockA','StockB','PCC'])
        
        #write out the data rows and optionally verify the values with Pandas corr
        for i in range(len(stocksCSV.columns)-1):
            symbol = stocksCSV.columns[i]
            row = ppcValues[i]
            #print(row)
            colIndex=0
            pcc=-2
            
            #### This +1 is to skip the "zero" value used to initialize the array in the GPU
            for k in range(i+1, len(stocksCSV.columns)):
                symbol2 = stocksCSV.columns[k]

                if doVerify == True:
                    pcc=VerifyPCC(verifySourcePath, symbol,symbol2)
                if doVerify == False or (doVerify == True and pcc != None):
                    gpuValue = row[colIndex]
                    colIndex += 1

                
                if pcc == None:
                    pcc = -2
                elif isinstance(pcc, int) == False:
                    pcc = pcc.item()
                
                if doVerify == True and truncate(pcc,VerifyPlaces) != truncate(gpuValue,VerifyPlaces):
                    print("No match: ")
                    print(str(truncate(pcc,VerifyPlaces)) + " != " + str(truncate(gpuValue,VerifyPlaces)))
                    print(symbol)
                    print(symbol2)
                    sys.exit(0)
                if doVerify == True:
                    writer.writerow([symbol,symbol2,str(gpuValue).strip(),pcc])
                    valuesIndex += 1
                elif doVerify == False:
                    writer.writerow([symbol,symbol2,gpuValue])
                    valuesIndex += 1
                    
              
    print('End Write: ' + str(datetime.now()))    


def VerifyPCC(verifySourcePath, StockA,StockB):
    # Do a Pandas correlation on the close values between two stock files
    valueColumnName="close"
    stock_A = pd.read_csv(verifySourcePath+StockA+".csv", delimiter=",", quotechar='"',usecols=[valueColumnName], dtype={valueColumnName:np.float64})
    stock_B = pd.read_csv(verifySourcePath+StockB+".csv", delimiter=",", quotechar='"',usecols=[valueColumnName], dtype={valueColumnName:np.float64})
    RowCount = len(stock_A.index)

  
    # Keep first RowCount rows after Start
    stock_A = stock_A[0:RowCount]
   
    stock_B = stock_B[0:RowCount]

    # Now both arrays contain a cleaned list of 100 items starting on the same date (startingKey)
    df = pd.DataFrame({'A': stock_A[valueColumnName].astype('float64'), 'B': stock_B[valueColumnName].astype('float64')})
    return df['A'].corr(df['B'])
  
    
def truncate(f, n):
    # Truncates/pads a float f to n decimal places without rounding
    s = '{}'.format(f)
    if 'e' in s or 'E' in s:
        return '{0:.{1}f}'.format(f, n)
    i, p, d = s.partition('.')
    return '.'.join([i, (d+'0'*n)[:n]])

    
def KPIsToNormalizedMatrix(input_path, output_path_csv):
    # This creates a matrix from a folder of files containing stock data (one file per stock) where the close prices
    # are L2Normalized to prepare the data for the matrix multiplications on the GPU later
    
    KeyColumn = "timestamp"
    ValueColumn = "close"
    Start = 0
    RowCount = 100
    OutputTable = None
    workflows = []
    symbols = []

    print('KPIsToNormalizedMatrix Start: ' + str(datetime.now()))

    directory = os.fsencode(input_path)
    
    for file in os.listdir(directory):

        Start = 0

        filename = os.path.join(input_path, file.decode("utf-8"))
        symbol = file.decode("utf-8").replace(".csv","")
        symbols.append(symbol)

        # Read in the key and value fields for the KPI
        dataFrame = pd.read_csv(filename, header=0, usecols=[KeyColumn, ValueColumn], dtype={ValueColumn:np.float64})

        # Keep first RowCount rows after Start
        dataFrame = dataFrame[Start:Start+RowCount]



        # Copy the data to the gpu and calculate the normalized variances for each value
        stock = mx.nd.array([dataFrame[ValueColumn].tolist()], device, dtype='float64')

        gpu_stock = mx.sym.Variable('gpu_stock', dtype='float64')
        mean = mx.sym.Variable('mean', dtype='float64')
        mean_array = mx.sym.Variable('mean_array', dtype='float64')
        variance = mx.sym.Variable('variance', dtype='float64')
        L2Norm = mx.sym.Variable('L2Norm', dtype='float64')
        normalizedStock = mx.sym.Variable('normalizedStock', dtype='float64')


        # Calc the mean across all the stock closing prices for the 100 days
        mean = gpu_stock.mean()
        # Make an array (i.e. vector or 1D matrix) that is the same shape as the stock array 
        # and that contains only duplicates of the mean value in each slot 
        # (each slot of this array will be subtracted from an array slot in the stock array)
        mean_array = mx.sym.reshape(mx.sym.repeat(mean, 100), shape=(1,100))

        # This looks like simple subtraction, however it is actually an array minus an array
        # This results in another array where each slot contains the result of same slots in the
        # other two arrays (gpu_stock - mean_array) being subtracted

        # By subtracting the mean what is left is just the amount the closing price varies from day to day
        variance = gpu_stock - mean_array

        # See notes and reference on what the L2 Normalization equation is about
        normalizedStock = mx.symbol.L2Normalization(variance)

        # Now the graph is fully defined and we feed in the stock data and instruct MXNet to use the gpu
        workflow = (normalizedStock).bind(device, {'gpu_stock':stock})

        # Execute the graph workflow to get the final normalized results
        results = workflow.forward()
        workflows.append(results)

        # Loop through all data and kick off the workflows before trying to retrieve any results.
        # This way parallel execution can go on as long as possible before blocking to get a result
        # In fact we might be lucky enough to have the results already waiting on us by time 
        # we access them (by looping over the workflows) so that there will be no blocking

        continue

    # Lets go get our results from the workflows which have been executing in the background
    for i in range(len(workflows)):
        symbol = symbols[i]
        results = workflows[i][0]
        # We will append each KPI array as a column to a 100 row matrix (each row is a timeseries snapshot)
        if type(OutputTable) == type(None) or OutputTable.empty:
            # Create new DataFrame with the first column
            OutputTable = pd.DataFrame(results[0].asnumpy().astype('float64').tolist(), columns=[symbol], dtype=np.float64)
        else:
            # Append the new column 
            a = results[0].asnumpy().astype('float64').tolist()
            OutputTable[symbol] = a
            
    OutputTable.dropna()
    
    OutputTable.to_csv(output_path_csv, sep='\t', encoding='utf-8', index=False)
    print('KPIsToNormalizedMatrix End: ' + str(datetime.now()))
    
            
        
rootPath = "E:\\Job7\\"
sourceDataPath = "E:\\Job7\\DataSource\\"
cleanedDataPath = "E:\\Job7\\CleanedData\\"
normalizedFileName = "NormalizedKPIValues.csv"
resultsFileName = "Results.csv"
DoVerify = True
# 100 days of data for each stock
RowCount = 100
VerifyPlaces = 5


print('Pipeline Start: ' + str(datetime.now()))

# Convert folder of files into a normalized table of KPIs
KPIsToNormalizedMatrix(cleanedDataPath, rootPath + normalizedFileName)

# GPU compute table of normalized KPIs to PCC values
GPUComputeKPIs_PCC(rootPath + normalizedFileName, rootPath + resultsFileName, DoVerify, cleanedDataPath, VerifyPlaces, RowCount)

print('Pipeline End: ' + str(datetime.now()))

Pipeline Start: 2019-08-09 19:01:26.390327
KPIsToNormalizedMatrix Start: 2019-08-09 19:01:26.390327
KPIsToNormalizedMatrix End: 2019-08-09 19:01:31.236303

MXNet PCC results:
Execute Time in Seconds:
0.28389835357666016
Start Write: 2019-08-09 19:01:31.817892
End Write: 2019-08-09 20:26:32.637958
Pipeline End: 2019-08-09 20:26:32.862363
