In [103]:
import os
import subprocess
import pandas as pd
import shutil
from paramiko import SSHClient, AutoAddPolicy
import sys
sys.path.append("../utils/")
from filter_variants import FilterVariants


In [111]:
class BackendManager:
    def __init__(self, server, uname, app):
        self.server = server
        self.uname = uname
        self.app = app
        self.client = SSHClient()
        self.client.set_missing_host_key_policy(AutoAddPolicy())
        self.connected = False
        
        
    def connect(self):
        self.client.connect(self.server, username=self.uname)
        self.connected = True
        
    def close(self):
        self.client.close()
        self.connected = False
        
    def generate_experiments(self, expression_dir, args, threads):
        if self.connected:
            cmd = "source ~/.analyzer; "
            cmd += "cd {}; ".format(expression_dir)
            cmd += "python generate-variants-linnea.py {} --threads={};".format(" ".join(args), threads)
            
            print(cmd)
            
            _, stdout, _ = self.client.exec_command(cmd)
            
            ret = stdout.readlines()
            print(ret)
            if "Generated Variants" in ret[-1]:
                return 0
            else:
                return 1
        else:
            return -1
        
        
    def check_if_file_exists(self, file_path):
        if self.connected:
            cmd = "test -f {};".format(file_path)
            _, stdout, _ = self.client.exec_command(cmd)
            ret = stdout.channel.recv_exit_status()
            if stdout.channel.recv_exit_status() == 0:
                return True
            return False
        return -1
        
    def run_experiments(self, runner_file):
        if self.connected:
            args_dir, script = os.path.split(runner_file)
            
            cmd = "source ~/.analyzer; "
            cmd += "cd {}; ".format(args_dir)
            cmd += "{} {};".format(self.app, script)
            
            _, stdout, _ = self.client.exec_command(cmd)
            
            print(cmd)

            if stdout.channel.recv_exit_status() == 0:
                return 0
            print("Error: ", stdout.channel.recv_exit_status())
            return 1
        return -1
    
    def check_slrum_status(self, jobname):
        if self.connected:
            cmd = "squeue --format=\"%.18i %.9P %.30j %.8u %.8T %.10M %.9l %.6D %R\" --me"
            _, stdout, _ = self.client.exec_command(cmd)
            ret = stdout.readlines()
            for j in ret:
                if jobname in j.split():
                    print(j)
                    return 2
            
            print(ret)
            return 0
        
    def copy_from_backend(self,backend_path, local_path):
        call = 'scp {uname}@{server}:{backend_path} {local_path}'.format(uname=self.uname,
                                                                     server=self.server,
                                                                     backend_path=backend_path,
                                                                     local_path=local_path)
        print(call)
        try:
            ret = subprocess.check_output(call.split())
            print(ret)
            return 0
        except Exception as e:
            print(e)
            return 1
        
    
    def generate_measurements_script(self,cmd_args, args_dir):
        if self.connected:
            cmd = "source ~/.analyzer; "
            cmd += "cd {}; ".format(args_dir)
            cmd += "python generate-measurements-script.py {}".format(cmd_args)
            print(cmd)
            
            _, stdout, _ = self.client.exec_command(cmd)
            
            if stdout.channel.recv_exit_status() == 0:
                return 0
            print("Error: ", stdout.channel.recv_exit_status())
            return 1
        
        return -1
            
        
    
    def cancel_job(self,job_name):
        pass
    
    def debug_cmd(self, cmd):
        call =  'ssh -l {} {}'.format(self.uname, self.server).split()
        ret = subprocess.check_output(call + [cmd,])
        print(ret)
            
         

In [67]:
class Runner:
    def __init__(self, name, expression_dir, args, threads=4, backend=None):
        """
        This class handles the code generation and execution of the variant codes. 
        The generated event data can be obtained as a pandas dataframe.
        
        Requirements:
        
        It is assumed that there exists a script file that generates variant codes
        for a given oopoerand sizes. The operand sizes are input as command line args
            e.g. run of script file: python generate.py 10 10 10 10 12
        
        After running the script file, inside the folder "experiments", which is in 
        the same directory as the script file, an "argument folder" is generated, 
        which contains the case_table, event_meta_table (i.e, the event table without actual run times)
        ,and a runner script as shown in the expample below:
            e.g. experiment/10_10_10_10_12/
                    case_table.csv
                    event_meta_table.csv
                    runner.jl 
        
        'runner.jl' is the script that runs the experiments and generates a log file 'run_times.txt' (which is the
        event table with actual run times) in the "arguments folder"
        
        
        INPUT:
        
        name: Experiment name
        script_path: Path to the script file that generates variants
        args: operand sizes (or arguments to the script file)
        
        USECASE:
        If the behavior of the script is as said in the requirements, this class can 
        call the scipt file and collects the eventlogs as a pandas dataframe, and 
        if needed, can also clean the generated folders.
        
        """
        self.name = name
        self.expression_dir = expression_dir
        self.threads = threads
        
        self.script_path = os.path.join(self.expression_dir, "generate-variants-linnea.py")
        self.args = args
        self.args_dir = os.path.join(self.expression_dir,
                                   "experiments",
                                  "_".join(self.args))
        
        self.backend = backend
        
    
    def generate_experiments(self):  
        """
        generates experiments for a given set of valid arguments
        that can be given as input to the script file.
            e.g. in,  python generate.py 10 10 10 10 12
            ['10','10','10','10','12'] would be the argument list.
            
        Output: Return code == 0 implies successful completion 
        """
        
        if not self.backend:
            call = ["python", self.script_path] + self.args + ["--threads={}".format(self.threads)]
            completed_proccess = subprocess.run(call)
            ret = completed_proccess.returncode
        else:
            ret = self.backend.generate_experiments(self.expression_dir, self.args, self.threads)
            
        return ret
    
       
    def run_experiments(self):
        """
        executes the runner file, which generates run_times.txt
        """ 
        runner_path = os.path.join(self.args_dir,"runner.jl")
        if not self.backend:
            if os.path.exists(self.args_dir):
                print("Running Experiments locally")
                completed_proccess = subprocess.run(["julia", runner_path])
                if completed_proccess.returncode == 0:
                    print("Experiments completed locally")
                    return 0 # Ran experiment  
        else:    
            ret = self.backend.run_experiments(runner_path)
            if ret == 0:
                print("Running experiments in the backend.")
                return 0
            
        return -1
    
    def clean(self):
        """remove arguments folder"""
        if os.path.exists(self.args_dir):
            shutil.rmtree(self.args_dir)
        else:
            return -1
            
            

In [143]:
class DataCollector:
    def __init__(self,local_data_dir, backend_data_dir=None, backend=None):
        self.local_data_dir = local_data_dir
        self.backend = backend
        self.backend_data_dir = backend_data_dir
        
    
    def read_log(self, log_path):
        if os.path.exists(log_path):
            df = pd.read_csv(log_path,sep=';')
            return df
        return -1
    
    def get_table(self, table_name):
        table_path = os.path.join(self.local_data_dir,table_name)
        if os.path.exists(table_path):
            return self.read_log(table_path)
        elif self.backend_data_dir:
            backend_path = os.path.join(self.backend_data_dir, table_name)
            self.backend.copy_from_backend(backend_path, self.local_data_dir)
            if os.path.exists(table_path):
                return self.read_log(table_path)
            
        return -1
    
    def get_case_table(self):
        return self.get_table("case_table.csv")      
                    
    def get_event_meta_table(self):
        """get event table without actual execution times."""
        return self.get_table("event_meta_table.csv")
    
    def get_all_runtimes_table(self):    
        """get event table with actual execution times."""
        return self.get_table("run_times.csv")
    
    def get_runtimes_competing_table(self, run_id):
        return self.get_table("run_times_competing_{}.csv".format(run_id))
      

In [167]:
class RunnerCompeting:
    def __init__(self,competing_variants, args_dir, threads=4, backend=None):
        self.competing_variants = competing_variants
        self.args_dir = args_dir
        self.threads = threads
        self.backend = backend
        
        
    def measure_competing_variants(self, run_id, reps):
        cmd_args = "--algs {algs} --rep {rep} --threads {threads} --id {run_id}".format(algs=" ".join(self.competing_variants),
                                                                                        rep=reps,
                                                                                        threads=self.threads,
                                                                                        run_id=run_id)
        
        runner_file = os.path.join(self.args_dir, "runner_competing_{}.jl".format(run_id))
        generate_file = os.path.join(self.args_dir, "generate-measurements-script.py")
        
        if self.backend:
            ret = self.backend.generate_measurements_script(cmd_args,self.args_dir)
            if ret != 0:
                return 3
            
            
            ret = self.backend.run_experiments(runner_file)
            if ret == 0:
                print("Running experiments in the backend.")
                return 0
            
            return ret
        else:
            call = ["python", generate_file] + cmd_args.split()
            completed_proccess = subprocess.run(call)
            ret = completed_proccess.returncode
            if ret == 0:
                print("Running experiments locally.")
                call = ["julia", runner_file]
                completed_proccess = subprocess.run(call)
                ret = completed_proccess.returncode
                if ret == 0:
                    print("Experiments completed locally")
                
            return ret
                        
        return -1
                 

In [168]:
args = ["70","70","70","70","70"]

exp_dir_local = "../Matrix-Chain-4/variants-linnea/"
local_threads = 4
runner_local = Runner("MC", exp_dir_local, args, threads=local_threads)
dc_local = DataCollector(runner_local.args_dir)

app = "sbatch submit.sh"
bm = BackendManager("login18-1.hpc.itc.rwth-aachen.de", "as641651", app)
bm.connect()

exp_dir_backend = "~/PhD/performance-analyazer/Experiment2/Matrix-Chain-4/variants-linnea"
backend_threads = 8
runner_backend = Runner("MC", exp_dir_backend, args, threads=backend_threads, backend=bm)
args_dir_name = runner_backend.args_dir.split('/')[-1]
job_name = "{}_T{}".format(args_dir_name, backend_threads)

local_data_dir = os.path.join(exp_dir_local, "cluster-experiments", "{}".format(args_dir_name))
if not os.path.exists(local_data_dir):
    os.makedirs(local_data_dir)
   
dc_backend = DataCollector(local_data_dir, runner_backend.args_dir, bm)


In [151]:
competing_algs = ['algorithm1', 'algorithm2']
threads=backend_threads
reps=10
run_id=0
bm.generate_measurements_script(competing_algs,reps,run_id,threads,runner_backend.args_dir)

source ~/.analyzer; cd ~/PhD/performance-analyazer/Experiment2/Matrix-Chain-4/variants-linnea/experiments/70_70_70_70_70; python generate-measurements-script.py --algs algorithm1 algorithm2 --rep 10 --threads 8 --id 0


0

### Run locally

In [154]:
ret = runner_local.generate_experiments()
ret

New solution:.............2.06e+06
No further generation steps possible.
----------------------------------
Number of nodes:                 8
Solution nodes:                  1
Data:                     2.45e+04
Best solution:            2.06e+06
Intensity:                      84
Number of algorithms:            6
Generated Variants.


0

In [155]:
ct = dc_local.get_case_table()
ct

Unnamed: 0,case:concept:name,case:flops,case:num_kernels
0,algorithm1,2060000.0,3
1,algorithm5,2060000.0,3
2,algorithm4,2060000.0,3
3,algorithm0,2060000.0,3
4,algorithm3,2060000.0,3
5,algorithm2,2060000.0,3


In [156]:
ret = runner_local.run_experiments()
ret

Running Experiments locally
Experiments completed locally


0

In [157]:
et = dc_local.get_all_runtimes_table()
et.head()

Unnamed: 0,case:concept:name,concept:name,concept:flops,concept:operation,concept:kernel,timestamp:start,timestamp:end
0,algorithm1,gemm_6.86e+05,686000.0,tmp1 = (A B),"gemm!('N', 'N', 1.0, ml0, ml1, 0.0, ml4)",1655917000.0,1655917000.0
1,algorithm1,gemm_6.86e+05,686000.0,tmp3 = (C D),"gemm!('N', 'N', 1.0, ml2, ml3, 0.0, ml5)",1655917000.0,1655917000.0
2,algorithm1,gemm_6.86e+05,686000.0,tmp6 = (tmp1 tmp3),"gemm!('N', 'N', 1.0, ml4, ml5, 0.0, ml6)",1655917000.0,1655917000.0
3,algorithm5,gemm_6.86e+05,686000.0,tmp1 = (A B),"gemm!('N', 'N', 1.0, ml0, ml1, 0.0, ml4)",1655917000.0,1655917000.0
4,algorithm5,gemm_6.86e+05,686000.0,tmp4 = (tmp1 C),"gemm!('N', 'N', 1.0, ml4, ml2, 0.0, ml5)",1655917000.0,1655917000.0


### Get run times competing locally

In [169]:
competing_algs = ['algorithm0', 'algorithm1']

runner_competing_local = RunnerCompeting(competing_algs, runner_local.args_dir,
                                          threads=local_threads)

ret = runner_competing_local.measure_competing_variants(run_id=7, reps=3)
ret

Running experiments locally.
Experiments completed locally


0

In [172]:
rt = dc_local.get_runtimes_competing_table(7) 
rt.head()

Unnamed: 0,case:concept:name,concept:name,concept:flops,concept:operation,concept:kernel,timestamp:start,timestamp:end
0,algorithm1_1,gemm_6.86e+05,686000.0,tmp1 = (A B),"gemm!('N', 'N', 1.0, ml0, ml1, 0.0, ml4)",1655917000.0,1655917000.0
1,algorithm1_1,gemm_6.86e+05,686000.0,tmp3 = (C D),"gemm!('N', 'N', 1.0, ml2, ml3, 0.0, ml5)",1655917000.0,1655917000.0
2,algorithm1_1,gemm_6.86e+05,686000.0,tmp6 = (tmp1 tmp3),"gemm!('N', 'N', 1.0, ml4, ml5, 0.0, ml6)",1655917000.0,1655917000.0
3,algorithm0_0,gemm_6.86e+05,686000.0,tmp3 = (C D),"gemm!('N', 'N', 1.0, ml2, ml3, 0.0, ml4)",1655917000.0,1655917000.0
4,algorithm0_0,gemm_6.86e+05,686000.0,tmp5 = (B tmp3),"gemm!('N', 'N', 1.0, ml1, ml4, 0.0, ml5)",1655917000.0,1655917000.0


### Run on the cluster

In [133]:
ret = runner_backend.generate_experiments()
ret

source ~/.analyzer; cd ~/PhD/performance-analyazer/Experiment2/Matrix-Chain-4/variants-linnea; python generate-variants-linnea.py 70 70 70 70 70 --threads=8;
['New solution:.............2.06e+06\n', 'No further generation steps possible.\n', '----------------------------------\n', 'Number of nodes:                 8\n', 'Solution nodes:                  1\n', 'Data:                     2.45e+04\n', 'Best solution:            2.06e+06\n', 'Intensity:                      84\n', 'Number of algorithms:            6\n', 'Generated Variants.\n']


0

In [134]:
ct_b = dc_backend.get_case_table()

In [135]:
ct_b

Unnamed: 0,case:concept:name,case:flops,case:num_kernels
0,algorithm0,2060000.0,3
1,algorithm2,2060000.0,3
2,algorithm4,2060000.0,3
3,algorithm1,2060000.0,3
4,algorithm5,2060000.0,3
5,algorithm3,2060000.0,3


In [97]:
ret = runner_backend.run_experiments()
ret

source ~/.analyzer; cd ~/PhD/performance-analyazer/Experiment2/Matrix-Chain-4/variants-linnea/experiments/70_70_70_70_70; sbatch submit.sh runner.jl;
Running experiments in the backend.


0

In [100]:
if bm.check_slrum_status(job_name) == 0:
    et_b = dc_backend.get_all_runtimes_table()    
et_b.head()

['             JOBID PARTITION                           NAME     USER    STATE       TIME TIME_LIMI  NODES NODELIST(REASON)\n']


Unnamed: 0,case:concept:name,concept:name,concept:flops,concept:operation,concept:kernel,timestamp:start,timestamp:end
0,algorithm0,gemm_6.86e+05,686000.0,tmp3 = (C D),"gemm!('N', 'N', 1.0, ml2, ml3, 0.0, ml4)",1655907000.0,1655907000.0
1,algorithm0,gemm_6.86e+05,686000.0,tmp5 = (B tmp3),"gemm!('N', 'N', 1.0, ml1, ml4, 0.0, ml5)",1655907000.0,1655907000.0
2,algorithm0,gemm_6.86e+05,686000.0,tmp6 = (A tmp5),"gemm!('N', 'N', 1.0, ml0, ml5, 0.0, ml6)",1655907000.0,1655907000.0
3,algorithm2,gemm_6.86e+05,686000.0,tmp2 = (B C),"gemm!('N', 'N', 1.0, ml1, ml2, 0.0, ml4)",1655907000.0,1655907000.0
4,algorithm2,gemm_6.86e+05,686000.0,tmp4 = (A tmp2),"gemm!('N', 'N', 1.0, ml0, ml4, 0.0, ml5)",1655907000.0,1655907000.0


### Get run times competing on cluster

In [148]:
competing_algs = ['algorithm0', 'algorithm1']

runner_competing_backend = RunnerCompeting(competing_algs, runner_backend.args_dir,
                                          threads=backend_threads, backend=bm)

In [138]:
ret = runner_competing_backend.measure_competing_variants(run_id=0, reps=3)
ret

source ~/.analyzer; cd ~/PhD/performance-analyazer/Experiment2/Matrix-Chain-4/variants-linnea/experiments/70_70_70_70_70; python generate-measurements-script.py --algs algorithm0 algorithm1 --rep 3 --threads 8 --id 0
source ~/.analyzer; cd ~/PhD/performance-analyazer/Experiment2/Matrix-Chain-4/variants-linnea/experiments/70_70_70_70_70; sbatch submit.sh runner_competing_0.jl;
Running experiments in the backend.


0

In [142]:
bm.check_slrum_status(job_name)

['             JOBID PARTITION                           NAME     USER    STATE       TIME TIME_LIMI  NODES NODELIST(REASON)\n']


0

In [147]:
if bm.check_slrum_status(job_name) == 0:
    rt_b = dc_backend.get_runtimes_competing_table(0)   
rt_b.head()

['             JOBID PARTITION                           NAME     USER    STATE       TIME TIME_LIMI  NODES NODELIST(REASON)\n']


Unnamed: 0,case:concept:name,concept:name,concept:flops,concept:operation,concept:kernel,timestamp:start,timestamp:end
0,algorithm1_2,gemm_6.86e+05,686000.0,tmp1 = (A B),"gemm!('N', 'N', 1.0, ml0, ml1, 0.0, ml4)",1655916000.0,1655916000.0
1,algorithm1_2,gemm_6.86e+05,686000.0,tmp3 = (C D),"gemm!('N', 'N', 1.0, ml2, ml3, 0.0, ml5)",1655916000.0,1655916000.0
2,algorithm1_2,gemm_6.86e+05,686000.0,tmp6 = (tmp1 tmp3),"gemm!('N', 'N', 1.0, ml4, ml5, 0.0, ml6)",1655916000.0,1655916000.0
3,algorithm0_2,gemm_6.86e+05,686000.0,tmp3 = (C D),"gemm!('N', 'N', 1.0, ml2, ml3, 0.0, ml4)",1655916000.0,1655916000.0
4,algorithm0_2,gemm_6.86e+05,686000.0,tmp5 = (B tmp3),"gemm!('N', 'N', 1.0, ml1, ml4, 0.0, ml5)",1655916000.0,1655916000.0
