Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
213 changes: 213 additions & 0 deletions scripts/builtin/apply_pipeline.dml
Original file line number Diff line number Diff line change
@@ -0,0 +1,213 @@
#-------------------------------------------------------------
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
#
#-------------------------------------------------------------

# This script will read the dirty and clean data, then it will apply the best pipeline on dirty data
# and then will classify both cleaned dataset and check if the cleaned dataset is performing same as original dataset
# in terms of classification accuracy

# INPUT PARAMETERS:
# ----------------------------------------------------------------------------------------------------------------------
# NAME TYPE DEFAULT MEANING
# ----------------------------------------------------------------------------------------------------------------------
# trainData Frame[Unknown] ---
# testData Frame[Unknown] ---
# metaData Frame[Unknown] as.frame("NULL")
# lp Frame[Unknown] ---
# pip Frame[Unknown] ---
# hp Frame[Unknown] ---
# evaluationFunc String ---
# evalFunHp Matrix[Double] ---
# isLastLabel Boolean TRUE
# correctTypos Boolean FALSE
#
# ----------------------------------------------------------------------------------------------------------------------
#
# OUTPUT:
# ----------------------------------------------------------------------------------------------------------------------
# NAME TYPE MEANING
# ----------------------------------------------------------------------------------------------------------------------
# scores Matrix[Double] ---
# ----------------------------------------------------------------------------------------------------------------------


source("scripts/builtin/topk_cleaning.dml") as topk;

s_apply_pipeline = function(Frame[Unknown] testData, Frame[Unknown] metaData = as.frame("NULL"), Frame[Unknown] pip,
Frame[Unknown] applyFunc, Matrix[Double] hp, Boolean isLastLabel = TRUE,List[Unknown] exState, List[Unknown] iState, Boolean correctTypos=FALSE)
return (Matrix[Double] eXtest)
{
no_of_flag_vars = 5
[schema, mask, fdMask, maskY] = topk::prepareMeta(testData, metaData)
pip = removeEmpty(target=pip, margin="cols")
applyFunc = removeEmpty(target=applyFunc, margin="cols")
metaList = list(mask=mask, schema=schema, fd=fdMask, applyFunc=as.frame("NULL"))
ctx = list(prefix="----"); #TODO include seed
# separate the label
[Xtest, Ytest] = topk::getLabel(testData, isLastLabel)

# always recode the label
if(maskY == 1) {
M = as.frame(exState[1])
eYtest = transformapply(target=Ytest, spec= "{ids:true, recode:[1]}", meta=M);
}
else
{
eYtest = as.matrix(Ytest)
}
# # # when the evaluation function is called first we also compute and keep hyperparams of target application
ctx = list(prefix="apply Pipeline")

[Xtest, Xt] = topk::runStringPipeline(Xtest, Xtest, schema, mask, FALSE, correctTypos, ctx)

# # # if mask has 1s then there are categorical features
M = as.frame(exState[2])
index = vectorToCsv(mask)
jspecR = "{ids:true, recode:["+index+"]}"
eXtest = transformapply(target=Xtest, spec=jspecR, meta=M);
metaList["applyFunc"] = applyFunc

no_of_param = as.scalar(hp[1, 1]) + 1
hp_width= hp[1, 2:no_of_param]
hp_matrix = matrix(hp_width, rows=ncol(pip), cols=ncol(hp_width)/ncol(pip))
pipList = list(ph = pip, hp = hp_matrix, flags = no_of_flag_vars)
for(i in 1:length(iState)) {
op = as.scalar(pip[1,i])
XtestClone = eXtest
applyOp = toString(as.scalar(applyFunc[1,i]))
dataFlag = as.scalar(hp_matrix[i, ncol(hp_matrix)])
[iState, L] = remove(iState, 1)
[eXtest, executeFlag] = getDataFromFlag(eXtest, mask, dataFlag)
L2 = list(eXtest)
L = as.list(L)
for(k in 1:length(L)) {
L2 = append(L2, L[k])
}
if(executeFlag == 1 & applyOp != "NA") {
eXtest = eval(applyOp, L2);
eXtest = confirmDataFromMask (eXtest, XtestClone, mask, dataFlag)
eXtest = confirmMetaFromMask (eXtest, mask)
}
else {
print("not applying "+op+" executeFlag = 0")
}
}

}


getDataFromFlag = function(Matrix[Double] X, Matrix[Double] mask, Integer dataFlag)
return(Matrix[Double] X,Integer executeFlag)
{
executeFlag = 1
if(dataFlag == 0)
{
if(sum(mask) == ncol(mask))
executeFlag = 0
else {
# take numerics out and remove categorical
X = removeEmpty(target=X, margin = "cols", select = (mask == 0))
}
}
else if(dataFlag == 1)
{
if(sum(mask) == 0)
executeFlag = 0
else {
# take categorical out and remove numerics
X = removeEmpty(target=X, margin = "cols", select = mask)
}
}
else X = X
}

confirmMetaFromMask = function(Matrix[Double] X, Matrix[Double] mask)
return (Matrix[Double] X)
{
if((sum(mask) > 0) & (ncol(X) == ncol(mask)))
{
# get the max + 1 for nan replacement
nanMask = is.na(X)
# replace nan
X = replace(target = X, pattern = NaN, replacement = 9999)
# take categorical out
cat = removeEmpty(target=X, margin="cols", select = mask)
# round categorical (if there is any floating point)
cat = round(cat)
less_than_1_mask = cat < 1
less_than_1 = less_than_1_mask * 9999
cat = (cat * (less_than_1_mask == 0)) + less_than_1
# reconstruct original X
X = X * (mask == 0)
q = table(seq(1, ncol(cat)), removeEmpty(target=seq(1, ncol(mask)), margin="rows",
select=t(mask)), ncol(cat), ncol(X))
X = (cat %*% q) + X

# put nan back
nanMask = replace(target = nanMask, pattern = 1, replacement = NaN)
X = X + nanMask
}
}


confirmDataFromMask = function(Matrix[Double] nX, Matrix[Double] originalX, Matrix[Double] mask, Integer dataFlag)
return (Matrix[Double] X)
{

if(dataFlag == 0 & (sum(mask) > 0) & (sum(mask) != ncol(originalX)))
{
maxDummy = max(replace(target=nX, pattern=NaN, replacement=0)) + 1
nX = replace(target = nX, pattern = NaN, replacement = maxDummy)
# X without numerics
Xcat = removeEmpty(target=originalX, margin="cols", select=mask)
nanMask = is.na(Xcat)
Xcat = replace(target = Xcat, pattern = NaN, replacement = -1111)

# reconstruct the original matrix
p = table(seq(1, ncol(nX)), removeEmpty(target=seq(1, ncol(mask)), margin="rows",
select=t(mask==0)), ncol(nX), ncol(originalX))
q = table(seq(1, ncol(Xcat)), removeEmpty(target=seq(1, ncol(mask)), margin="rows",
select=t(mask)), ncol(Xcat), ncol(originalX))
X = (nX %*% p) + (Xcat %*% q)

X = replace(target = X, pattern = maxDummy, replacement = NaN)
X = replace(target = X, pattern = -1111, replacement = NaN)
}
else if(dataFlag == 1 & (sum(mask) > 0) & (sum(mask) != ncol(originalX)))
{
maxDummy = max(replace(target=nX, pattern=NaN, replacement=0)) + 1
nX = replace(target = nX, pattern = NaN, replacement = maxDummy)
# X without categorical
Xnum = removeEmpty(target=originalX, margin="cols", select=(mask==0))
nanMask = is.na(Xnum)
Xnum = replace(target = Xnum, pattern = NaN, replacement = -1111)
# reconstruct the original matrix
p = table(seq(1, ncol(Xnum)), removeEmpty(target=seq(1, ncol(mask)), margin="rows",
select=t(mask==0)), ncol(Xnum), ncol(originalX))
q = table(seq(1, ncol(nX)), removeEmpty(target=seq(1, ncol(mask)), margin="rows",
select=t(mask)), ncol(nX), ncol(originalX))
X = (nX %*% q) + (Xnum %*% p)
X = replace(target = X, pattern = maxDummy, replacement = NaN)
X = replace(target = X, pattern = -1111, replacement = NaN)

}
else X = nX

}
5 changes: 3 additions & 2 deletions scripts/builtin/bandit.dml
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@

m_bandit = function(Matrix[Double] X_train, Matrix[Double] Y_train, Matrix[Double] X_test, Matrix[Double] Y_test, List[Unknown] metaList,
String evaluationFunc, Matrix[Double] evalFunHp, Frame[Unknown] lp, Matrix[Double] lpHp, Frame[Unknown] primitives, Frame[Unknown] param, Integer k = 3,
Integer R=50, Double baseLineScore, Boolean cv, Integer cvk = 2, Double ref = 0, Integer seed = -1, Boolean enablePruning = FALSE, Boolean verbose = TRUE, String output="")
Integer R=50, Double baseLineScore, Boolean cv, Integer cvk = 2, Double ref = 0, Integer seed = -1, Boolean enablePruning = FALSE, Boolean verbose = TRUE)
# return(Boolean perf)
return (Frame[Unknown] bestPipeline, Matrix[Double] bestHyperparams, Matrix[Double] bestAccuracy, Frame[String] applyFunc)
{
Expand Down Expand Up @@ -290,7 +290,7 @@ run_with_hyperparam = function(Frame[Unknown] ph_pip, Integer r_i = 1, Matrix[Do
hp = hp[, 2:totalVals]
applyFunctions = allApplyFunctions[i]
no_of_res = nrow(hp)
# print("PIPELINE EXECUTION START ... "+toString(op))
print("PIPELINE EXECUTION START ... "+toString(op))
hpForPruning = matrix(0, rows=1, cols=ncol(op))
changesByOp = matrix(0, rows=1, cols=ncol(op))
metaList2 = metaList; #ensure metaList is no result var
Expand Down Expand Up @@ -564,6 +564,7 @@ return (Double accuracy, Matrix[Double] evalFunHp, Matrix[Double] hpForPruning,
allChanges = min(allChanges)
changesByOp = colMaxs(cvChanges)
accuracy = mean(accuracyMatrix)
print("mean: \n"+toString(accuracyMatrix))
print("cv accuracy: "+toString(accuracy))
}

Expand Down
16 changes: 10 additions & 6 deletions scripts/builtin/executePipeline.dml
Original file line number Diff line number Diff line change
Expand Up @@ -57,9 +57,9 @@ s_executePipeline = function(Frame[String] pipeline, Matrix[Double] Xtrain, Mat
Matrix[Double] Xtest, Matrix[Double] Ytest, List[Unknown] metaList, Matrix[Double] hyperParameters, Matrix[Double] hpForPruning = as.matrix(0),
Matrix[Double] changesByOp = as.matrix(0), Integer flagsCount, Boolean test = FALSE, Boolean verbose)
return (Matrix[Double] Xtrain, Matrix[Double] Ytrain, Matrix[Double] Xtest, Matrix[Double] Ytest,
Double t2, Matrix[Double] hpForPruning, Matrix[Double] changesByOp, Double changesAll)
Double t2, Matrix[Double] hpForPruning, Matrix[Double] changesByOp, Double changesAll, List[Unknown] internalStates)
{

internalStates = list()
mask=as.matrix(metaList['mask'])
FD = as.matrix(metaList['fd'])
applyFunc = as.frame(metaList['applyFunc'])
Expand All @@ -76,7 +76,7 @@ s_executePipeline = function(Frame[String] pipeline, Matrix[Double] Xtrain, Mat
for(i in 1:ncol(pipeline)) {
op = as.scalar(pipeline[1,i])
applyOp = toString(as.scalar(applyFunc[1,i]))

# print("op: "+op)
Xclone = Xtrain
XtestClone = Xtest
[hp, dataFlag, yFlag, executeFlag] = matrixToList(Xtrain, Ytrain, mask, FD, hyperParameters[i], flagsCount, op)
Expand All @@ -86,18 +86,22 @@ s_executePipeline = function(Frame[String] pipeline, Matrix[Double] Xtrain, Mat
Xtrain = as.matrix(O)
if(applyOp != "NA") {
[Xtest, executeFlag] = applyDataFlag(Xtest, mask, dataFlag)
internalStates = append(internalStates, L)
L = append(L, list(X=Xtest));
Xtest = eval(applyOp, L);
Xtest = confirmData(Xtest, XtestClone, mask, dataFlag, yFlag)
# print("L \n"+toString(L, rows=3))
Xtest = confirmData(Xtest, XtestClone, mask, dataFlag)
}
Xtrain = confirmData(Xtrain, Xclone, mask, dataFlag, yFlag)
else internalStates = append(internalStates, as.frame("NA"))
Xtrain = confirmData(Xtrain, Xclone, mask, dataFlag)

# dataFlag 0 = only on numeric, 1 = on whole data
if(yFlag) {
[L, Y] = remove(L, 1);
Ytrain = as.matrix(Y)
}
Xtrain = confirmMeta(Xtrain, mask)
Xtest = confirmMeta(Xtest, mask)
}
else {
print("not applying "+op+" executeFlag = 0")
Expand Down Expand Up @@ -225,7 +229,7 @@ return (Matrix[Double] X)
}


confirmData = function(Matrix[Double] nX, Matrix[Double] originalX, Matrix[Double] mask, Integer dataFlag, Integer yFlag)
confirmData = function(Matrix[Double] nX, Matrix[Double] originalX, Matrix[Double] mask, Integer dataFlag)
return (Matrix[Double] X)
{

Expand Down
26 changes: 13 additions & 13 deletions scripts/builtin/fit_pipeline.dml
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@
# ----------------------------------------------------------------------------------------------------------------------
# NAME TYPE MEANING
# ----------------------------------------------------------------------------------------------------------------------
# result Matrix[Double] ---
# scores Matrix[Double] ---
# ----------------------------------------------------------------------------------------------------------------------

source("scripts/pipelines/scripts/utils.dml") as utils;
Expand All @@ -54,10 +54,12 @@ source("scripts/builtin/bandit.dml") as bandit;
s_fit_pipeline = function(Frame[Unknown] trainData, Frame[Unknown] testData, Frame[Unknown] metaData = as.frame("NULL"),
Frame[Unknown] pip, Frame[Unknown] applyFunc, Matrix[Double] hp, String evaluationFunc, Matrix[Double] evalFunHp,
Boolean isLastLabel = TRUE, Boolean correctTypos=FALSE)
return (Matrix[Double] result, Matrix[Double] cleanTrain, Matrix[Double] cleanTest)
return (Matrix[Double] scores, Matrix[Double] cleanTrain, Matrix[Double] cleanTest, List[Unknown] externalState, List[Unknown] iState)
{
externalState = list()
no_of_flag_vars = 5
[schema, mask, fdMask, maskY] = topk::prepareMeta(trainData, metaData)

pip = removeEmpty(target=pip, margin="cols")
applyFunc = removeEmpty(target=applyFunc, margin="cols")
metaList = list(mask=mask, schema=schema, fd=fdMask, applyFunc=as.frame("NULL"))
Expand All @@ -70,6 +72,7 @@ return (Matrix[Double] result, Matrix[Double] cleanTrain, Matrix[Double] cleanTe
if(maskY == 1) {
[eYtrain, M] = transformencode(target=Ytrain, spec= "{ids:true, recode:[1]}");
eYtest = transformapply(target=Ytest, spec= "{ids:true, recode:[1]}", meta=M);
externalState = append(externalState, M)
}
else
{
Expand All @@ -83,22 +86,19 @@ return (Matrix[Double] result, Matrix[Double] cleanTrain, Matrix[Double] cleanTe
[Xtrain, Xtest] = topk::runStringPipeline(Xtrain, Xtest, schema, mask, FALSE, correctTypos, ctx)

# # # if mask has 1s then there are categorical features
[eXtrain, eXtest] = topk::recodeData(Xtrain, Xtest, mask, FALSE, "recode")
[eXtrain, eXtest, M1] = topk::recodeData(Xtrain, Xtest, mask, FALSE, "recode")
externalState = append(externalState, M1)
# # # do the early dropping
[eXtrain, eXtest, metaList] = topk::featureDrop(eXtrain, eXtest, metaList, FALSE)
# [eXtrain, eXtest, metaList] = topk::featureDrop(eXtrain, eXtest, metaList, FALSE)
metaList["applyFunc"] = applyFunc
# construct the parameter list for best hyper-parameters if the oversampling technique is part of
# pipeline then take it out because oversampling is not applied on test dataset
# this condition is unnecessary here in this case because the input dataset is balanced and
# instead of diving the dataset into train/test I am doing cross validations

no_of_param = as.scalar(hp[1, 1]) + 1
hp_width= hp[1, 2:no_of_param]
hp_matrix = matrix(hp_width, rows=ncol(pip), cols=ncol(hp_width)/ncol(pip))
pipList = list(ph = pip, hp = hp_matrix, flags = no_of_flag_vars)

# # # now test accuracy
[eXtrain, eYtrain, eXtest, eYtest, a, b,Tr] = executePipeline(pipeline=pip, Xtrain=eXtrain, Ytrain=eYtrain,
[eXtrain, eYtrain, eXtest, eYtest, a, b, c, d, iState] = executePipeline(pipeline=pip, Xtrain=eXtrain, Ytrain=eYtrain,
Xtest=eXtest, Ytest=eYtest, metaList=metaList, hyperParameters=hp_matrix, flagsCount=no_of_flag_vars, test=TRUE, verbose=FALSE)

if(max(eYtrain) == min(eYtrain))
Expand All @@ -110,10 +110,10 @@ return (Matrix[Double] result, Matrix[Double] cleanTrain, Matrix[Double] cleanTe
score = eval(evaluationFunc, list(X=eXtrain, Y=eYtrain, Xtest=eXtest, Ytest=eYtest, Xorig=as.matrix(0), evalFunHp=evalFunHp))
testAccuracy = as.scalar(score[1, 1])

result = matrix(0, rows=1, cols=3)
result[1, 1] = dirtyScore
result[1, 2] = trainAccuracy
result[1, 3] = testAccuracy
scores = matrix(0, rows=1, cols=3)
scores[1, 1] = dirtyScore
scores[1, 2] = trainAccuracy
scores[1, 3] = testAccuracy
cleanTrain = cbind(eXtrain, eYtrain)
cleanTest = cbind(eXtest, eYtest)
}
3 changes: 1 addition & 2 deletions scripts/builtin/frameSort.dml
Original file line number Diff line number Diff line change
Expand Up @@ -37,10 +37,9 @@
# f_odered Frame[String] sorted dataset by column 1 in decreasing order
# ----------------------------------------------------------------------------------------------------------------------

s_frameSort = function(Frame[String] F, Matrix[Double] mask, Boolean orderDesc = TRUE )
s_frameSort = function(Frame[String] F, Matrix[Double] mask, Boolean orderDesc = TRUE)
return (Frame[String] f_odered)
{
# idx[1,1] = 0 # to save accuracy column from encoding
index = vectorToCsv(mask)
# recode logical pipelines for easy handling
jspecR = "{ids:true, recode:["+index+"]}";
Expand Down
Loading