In [None]:
import copy
import glob
import japanize_matplotlib
import math
import matplotlib as mpl
import matplotlib.patches as mpatches
import matplotlib.pyplot as plt
import matplotlib.ticker as ticker
import numpy as np
import os
import pandas as pd
import random
import sys
from sklearn import linear_model
from sklearn.linear_model import HuberRegressor, LinearRegression
from sklearn.metrics import r2_score
from sklearn.model_selection import train_test_split
import sklearn.preprocessing as sp
from sklearn.metrics import r2_score
from sklearn.metrics import mean_absolute_error
from sklearn.metrics import mean_squared_error

In [None]:
# 平均絶対パーセント誤差 (MAPE)(Mean Absolute Percent Error (MAPE))を返す関数
# 引数として長さの同じ二つのリストをとる
# 引数l1: 実測値のリスト
# 引数l2: 予測値のリスト
# 単位：％

def returnMapeScore(l1, l2):
    return_num = 0
    if(len(l1) != len(l2)):
        print("引数のリストの長さが異なります", end=", ")
        return -1
    for i in range(len(l1)):
        l1_num = l1[i]
        l2_num = l2[i]

        return_num += abs((l1_num - l2_num)/l1_num)

    return_num /= len(l1)
    return_num *= 100
    return return_num


# 使用例：returnMapeScore([1,2,3,4], [4,3,2,1])
type(returnMapeScore([1, 2, 3, 4], [4, 3, 2, 1]))


def test_returnMapeScore():
    l1 = [1, 2, 3, 4]
    l2 = [4, 3, 2, 1]
    ansByFunc = returnMapeScore(l1, l2)
    ansByHand = (abs(1-4)/1 + abs(2-3)/2 + abs(3-2)/3 + abs(4-1)/4)/4 * 100
    # 多少の誤差を許容する
    ansByFunc = int(ansByFunc * 100) / 100
    ansByHand = int(ansByHand * 100) / 100

    assert ansByFunc == ansByHand

In [None]:
# ベンチマークを指定して存在するファイル名のものを返す
def returnExistingFileNames(benchmarkNames=[], classes=[], processes=[], csvDirPath="./csv_files"):
    candidateFileNames = {}
    returnDict = {}
    for benchmarkName in benchmarkNames:
        for benchmarkClass in classes:
            for process in processes:
                candidateFileNames[f"pprof_{benchmarkName}{benchmarkClass}{process}.csv"] = {"benchmarkName":benchmarkName, "benchmarkClass":benchmarkClass, "process":process}
    for candidateFileName in candidateFileNames.keys():
        filePath = os.path.join(csvDirPath, candidateFileName)
#         print(filePath)
        if(os.path.exists(filePath) and os.stat(filePath).st_size != 0):
            returnDict[candidateFileName]=candidateFileNames[candidateFileName]
#     print(returnDict)
    return(returnDict)


def test_returnExistingFileNames():
    benchmarkNames = ["test"]
    classes = ["A", "B", "C", "D"]
    processes = [1, 2, 4, 8, 16, 32, 64, 128, 256]
    csvDirPath = "../csv_files/"
    returnedList = returnExistingFileNames(
        benchmarkNames=benchmarkNames, classes=classes, processes=processes, csvDirPath=csvDirPath)
#     print(returnedList)
    assert returnedList["pprof_testA128.csv"] == {"benchmarkName":"test", "benchmarkClass":"A", "process":128}
    assert returnedList["pprof_testB256.csv"] == {"benchmarkName":"test", "benchmarkClass":"B", "process":256}

In [None]:
# ベンチマーク名・プロセス数・ベンチマーククラスをリストで渡して、実在するデータが集計されたDFを返す
def returnCollectedExistingData(benchmarkNames=[], classes=[], processes=[], csvDirPath="./csv_files"):
    fileNames = returnExistingFileNames(benchmarkNames=benchmarkNames, classes=classes, processes=processes, csvDirPath=csvDirPath)
    csvDataList = []
#     print(fileNames)
    for fileName in fileNames.keys():
        rawDatum = pd.read_csv(f"{csvDirPath}{fileName}")
        rawDatum["benchmarkName"] = fileNames[fileName]["benchmarkName"]
        rawDatum["benchmarkClass"] = fileNames[fileName]["benchmarkClass"]
        rawDatum["process"] = fileNames[fileName]["process"]
        csvDataList.append(rawDatum)
    returnDF = pd.concat(csvDataList, axis=0)
    returnDF = returnDF.rename(columns={'Name': 'functionName', '#Call': 'functionCallNum'})
    return(returnDF)
        
def test_returnCollectedExistingData():
    benchmarkNames = ["test"]
    classes = ["A", "B", "C", "D"]
    processes = [1, 2, 4, 8, 16, 32, 64, 128, 256]
    csvDirPath = "../csv_files/"
    returnedData = returnCollectedExistingData(
        benchmarkNames=benchmarkNames, classes=classes, processes=processes, csvDirPath=csvDirPath)

    case01 = {"benchmarkName": "test", "benchmarkClass": "A", "process": 128, "functionCalls" : {"function00": 99, "function01": 77, "function02": 555}}
    case02 = {"benchmarkName": "test", "benchmarkClass": "B", "process": 256, "functionCalls" : {"function00": 5, "function01": 70, "function02": 900}}
    
    for case in [case01, case02]:
        benchmarkName = case["benchmarkName"]
        benchmarkClass = case["benchmarkClass"]
        process = case["process"]
        for functionName in case["functionCalls"]:
            functionCallNum = case["functionCalls"][functionName]
            targetData = returnedData[(returnedData['benchmarkName'] == benchmarkName) & (returnedData['benchmarkClass'] == benchmarkClass) & (
                returnedData["process"] == process) & (returnedData["functionName"] == functionName)]
            columns = targetData.columns.tolist()
            functionCallNumIndex = columns.index("functionCallNum")
            assert targetData.iloc[0, functionCallNumIndex] == functionCallNum

In [None]:
# モデルの共通部分となるクラス
# すべての引数はただのリスト。クラスの初期化時に""np.reshape()""を実行する
class ModelBase:
    def __init__(self, trainX, trainY, targetX=[], targetY=[], benchmarkName="benchmarkName", functionName="functionName"):
        self.benchmarkName = benchmarkName
        self.functionName = functionName

        self.trainX = np.reshape(trainX, (-1, 1))
        self.trainY = np.reshape(trainY, (-1, 1))
        self.targetX = np.reshape(targetX, (-1, 1))
        self.targetY = np.reshape(targetY, (-1, 1))

In [None]:
# 分岐モデル

class ModelBranch(ModelBase):
    def calcLr(self):
        self.t = np.ndarray.argmax(self.trainY)
        self.tNum = self.trainX[self.t]
        if (self.t == 0 or self.t == len(self.trainY) - 1):
            self.lr1 = LinearRegression()
            self.lr1.fit(self.trainX, self.trainY)
            self.lr2 = LinearRegression()
            self.lr2.fit(self.trainX, self.trainY)
        else:
            self.trainX1 = self.trainX[:self.t]
            self.trainX2 = self.trainX[self.t:]
            self.trainY1 = self.trainY[:self.t]
            self.trainY2 = self.trainY[self.t:]
            self.lr1 = LinearRegression()
            self.lr1.fit(self.trainX1, self.trainY1)
            self.lr2 = LinearRegression()
            self.lr2.fit(self.trainX2, self.trainY2)

    def predict(self, num):
        num = np.reshape(num, (-1, 1))
        numT = np.ndarray.argmax(num)
        numTMax = num[numT]
        k = np.abs(np.asarray(num) - self.tNum).argmin()
        if(len(num) == 1 and numTMax >= self.tNum):
            predicted = self.lr2.predict(num)
            return(predicted)
        elif (numTMax < self.trainX[self.t] or k == 0):
            predicted = self.lr1.predict(num)
            return(predicted)
        else:
            num1 = num[:k]
            num2 = num[k:]
            predicted1 = self.lr1.predict(num1)
            predicted2 = self.lr2.predict(num2)
            predicted = np.concatenate([predicted1, predicted2])
            return(predicted)

    def ModelName(self):
        return("ModelBranch")

In [None]:
# 反比例モデル

def ipFunc(x):
    return 1/x

class ModelIp(ModelBase):

    def calcLr(self):
        self.transformerIp = sp.FunctionTransformer(
            func=ipFunc, inverse_func=ipFunc)
        trainXIp = self.transformerIp.transform(self.trainX)
        self.lr = LinearRegression()
        self.lr.fit(trainXIp, self.trainY)

    def predict(self, num):
        num = np.reshape(num, (-1, 1))
        numConverted = self.transformerIp.transform(num)
        predicted = self.lr.predict(numConverted)
        return(predicted)

    def return_coef_(self):
        return self.lr.coef_

    def return_intercept_(self):
        return self.lr.intercept_

    def ModelName(self):
        return("ModelIp")

# # 使用例
# modelIp = ModelIp(trainX=trainX, trainY=trainY, targetX=targetX, targetY=targetY)
# modelIp.calcLr()
# plotY = modelIp.predict(plotX)

In [None]:
# 線形モデル

class ModelLin(ModelBase):

    def calcLr(self):
        self.lr = LinearRegression()
        self.lr.fit(self.trainX, self.trainY)

    def predict(self, num):
        num = np.reshape(num, (-1, 1))
        predicted = self.lr.predict(num)
        return(predicted)

    def return_coef_(self):
        return self.lr.coef_

    def return_intercept_(self):
        return self.lr.intercept_

    def ModelName(self):
        return("ModelLin")

In [None]:
# 対数モデル


def inverterLog10Func(x):
    return 10**x

class ModelLog10(ModelBase):

    def calcLr(self):
        self.transformerLog10 = sp.FunctionTransformer(
            func=np.log10, inverse_func=inverterLog10Func)
        trainXLog10 = self.transformerLog10.transform(self.trainX)
        self.lr = LinearRegression()
        self.lr.fit(trainXLog10, self.trainY)

    def predict(self, num):
        num = np.reshape(num, (-1, 1))
        numConverted = self.transformerLog10.transform(num)
        predicted = self.lr.predict(numConverted)
        return(predicted)

    def return_coef_(self):
        return self.lr.coef_

    def return_intercept_(self):
        return self.lr.intercept_

    def ModelName(self):
        return("ModelLog")

# # 使用例
# modelLog10 = ModelLog10(trainX=trainX, trainY=trainY, targetX=targetX, targetY=targetY)
# modelLog10.calcLr()
# plotY = modelLog10.predict(plotX)