In [None]:
from abc import ABC, abstractmethod
import csv
import glob
import os
import shutil
import xml.etree.ElementTree as ET
import random
from datetime import datetime

from absl import app
from absl import flags


In [None]:
class Job(ABC):
    """
    The Job interface declares the operations that all concrete jobs
    must implement.
    """

    @abstractmethod
    def readBenchmarkingProfiles(self) -> list:
        pass

    @abstractmethod
    def createRepository(self) -> None:
        pass

    @abstractmethod
    def copyInputFiles(self) -> None:
        pass

    @abstractmethod
    def copySpecificInputFiles(self) -> None:
        pass

    @abstractmethod
    def createExecutableBatchFile(self) -> str:
        pass

    @abstractmethod
    def identifySpecificInputFiles(self) -> None:
        pass


"""
Concrete Jobs provide various implementations of the Job interface.
"""


class AbstractJob(Job):
    def __init__(
        self,
        BenchmarkingCSVFile_path,
        InputFiles_path,
        storage_path,
        Number_of_jobs_repetition,
    ):
        self.csvfile = BenchmarkingCSVFile_path
        self.path = InputFiles_path
        self.src = storage_path
        self.dest = Number_of_jobs_repetition

    def readBenchmarkingProfiles(self) -> list:
        job_parameters = []

        with open(self.csvfile, "r") as file:
            csv_file = csv.DictReader(file)
            for row in csv_file:
                job_parameters.append(dict(row))

        return job_parameters

    def createRepository(self,running_job_path) -> None:
        os.makedirs(running_job_path)

    def copyInputFiles(self, src, dest) -> None:
        try:
            shutil.copytree(src, dest, dirs_exist_ok=True)
        except NotADirectoryError:
            shutil.copy(src, dest)

    def copySpecificInputFiles(self) -> str:
        return "{Result of the AbstractJob: copySpecificInputFiles}"

    def createExecutableBatchFile(self) -> str:
        return "{Result of the AbstractJob: createExecutableBatchFile}"

    def identifySpecificInputFiles(self) -> str:
        return "{Result of the AbstractJob: identifySpecificInputFiles}"


class MaxQuantJob(AbstractJob):
    def __init__(
        self,
        BenchmarkingCSVFile_path,
        InputFiles_path,
        storage_path,
        Number_of_jobs_repetition,

    ):
        super().__init__(
            BenchmarkingCSVFile_path,
            InputFiles_path,
            storage_path,
            Number_of_jobs_repetition,
        )
        # TODO: Variables needs to be initialize 
        # self.sample_files = sample_files
        # self.xml_file_path = xml_file_path
        # self.numthreads = numthreads

    def createExecutableBatchFile(self, job_parameters, path, ExecutionID) -> None:
        with open(f"{path}{job_parameters['job-name']}_batch.sh", "w+") as fb:
            fb.writelines("#!/bin/bash\n")
            fb.writelines(f"#SBATCH -p {job_parameters['partition']}\n")
            fb.writelines(f"#SBATCH --qos=regular_partitiontimelimit\n")
            fb.writelines(f"#SBATCH --job-name={job_parameters['job-name']}\n")
            fb.writelines(f"#SBATCH --ntasks=1\n")
            fb.writelines(f"#SBATCH --time={job_parameters['timelimit']}\n")
            fb.writelines(
                f"#SBATCH --cpus-per-task={job_parameters['cpus-per-task']}\n"
            )
            fb.writelines(f"#SBATCH --mem={job_parameters['mem']}G\n")
            fb.writelines(f"#SBATCH --output={path}slurm-%j.out\n")
            fb.writelines(f"#SBATCH --mail-type=ALL,ARRAY_TASKS\n")
            fb.writelines(f"#SBATCH --mail-user=romano.h@wehi.edu.au\n")

            fb.writelines(f"module load MaxQuant/2.0.2.0\n")
            fb.writelines(f"module load python/3.8.8\n")

            fb.writelines(
                f"MaxQuant {path}mqpar.xml --changeFolder {path}mqpar.post.xml {path} {path}\n"
            )

            fb.writelines(f"MaxQuant {path}mqpar.post.xml\n")

            fb.writelines(
                f"find {path} -maxdepth 1 -mindepth 1 -type f -not -regex '.*\.\(fasta\|xml\|out\|raw\|sh\)' -delete\n"
            )
            fb.writelines(
                f"find {path} -maxdepth 1 -mindepth 1 -type d -not -regex '.*\.\(d\)' -exec rm -rf '{{}}' \;\n"
            )

            fb.writelines(
                f'echo ""$SLURM_ARRAY_JOB_ID","$SLURM_ARRAY_TASK_ID"",{job_parameters["partition"]},{job_parameters["type"]},{job_parameters["job-name"]},{job_parameters["NumFiles"]},{job_parameters["cpus-per-task"]},{job_parameters["mem"]},{job_parameters["threads"]},{job_parameters["timelimit"]} >> jobs_executed_{ExecutionID}.txt\n'
            )

        os.system(f"sbatch {path}{job_parameters['job-name']}_batch.sh")

    def updateXmlFile(self, sample_files, xml_file_path, numthreads) -> None:
        tree = ET.parse(xml_file_path)
        root = tree.getroot()

        for filepath_tag in root.findall("filePaths/string"):
            root.findall("filePaths")[0].remove(filepath_tag)

        for sample_file in sample_files:
            new_path = ET.Element("string")
            new_path.text = sample_file
            root.findall("filePaths")[0].append(new_path)

        # <useDotNetCore>True</useDotNetCore>
        root.findall("useDotNetCore")[0].text = "True"
        # <numThreads>8</numThreads>
        root.findall("numThreads")[0].text = str(numthreads)

        outputfile = xml_file_path
        tree.write(outputfile)


class DiaNNJob(AbstractJob):
    def __init__(
        self,
        BenchmarkingCSVFile_path,
        InputFiles_path,
        storage_path,
        Number_of_jobs_repetition,
    ):
        super().__init__(
            BenchmarkingCSVFile_path,
            InputFiles_path,
            storage_path,
            Number_of_jobs_repetition,
        )
    
    def identifySpecificInputFiles(self) -> dict:
        # Extract the list of Input filenames: .Fasta, .tsv and .d
        original_files = glob.glob(self.path + "*.d", recursive=False)

        # Create a Dictionary to store Input Files Orderly
        DiaNNSpecificInputFiles = {}
        DiaNNSpecificInputFiles["original_files"] = original_files
        DiaNNSpecificInputFiles["fasta_file"] = glob.glob(
            self.path + "*.fasta", recursive=False
        )[0]
        

        if glob.glob(self.path + "*.tsv", recursive=False):
            DiaNNSpecificInputFiles["tsv_file"] = glob.glob(
                self.path + "*.tsv", recursive=False
            )[0]
        else:
            DiaNNSpecificInputFiles["tsv_file"] = None
            
        return DiaNNSpecificInputFiles

    def copySpecificInputFiles(self,specificInputFiles, running_job_path) -> None:
        # Copy Fasta & XML File
        self.copyInputFiles(specificInputFiles["fasta_file"], running_job_path)
                
        if specificInputFiles["tsv_file"]:
            self.copyInputFiles(specificInputFiles["tsv_file"], running_job_path)

    def createExecutableBatchFile(
        self, job_parameters, path, specificInputFiles, ExecutionID
    ) -> None:

        os.system(f'(cd {path} ; DIANN_RUN_TYPE=""{job_parameters["type"]}"" DIANN_LIB=""{specificInputFiles["tsv_file"]}"" DIANN_TIME=""{job_parameters["timelimit"]}""  DIANN_CPUS=""{job_parameters["cpus-per-task"]}"" DIANN_MEM=""{job_parameters["mem"]}G"" DIANN_THREADS=""{job_parameters["threads"]} DIANN_OUTPUT_PATH=""{path}/output"" /stornext/System/data/apps/rc-tools/rc-tools-1.0/bin/tools/DiaNN/createdianncmd.sh)')

        with open(f"{path}diann.slurm", "a") as fb:
            fb.writelines(f'echo ""$SLURM_JOB_ID"",{job_parameters["partition"]},{job_parameters["type"]},{job_parameters["job-name"]},{job_parameters["NumFiles"]},{job_parameters["cpus-per-task"]},{job_parameters["mem"]},{job_parameters["threads"]},{job_parameters["timelimit"]} >> jobs_executed_{ExecutionID}.txt\n')

        os.system(f"sbatch {path}diann.slurm")


In [None]:
class BenchmarkingToolCreator(ABC):
    """
    The BenchmarkingToolCreator class declares the factory method that is supposed to return an
    object of a Job class. The BenchmarkingToolCreator's subclasses usually provide the
    implementation of this method.
    """

    def __init__(
        self,
        BenchmarkingCSVFile_path,
        InputFiles_path,
        storage_path,
        Number_of_jobs_repetition,
    ):
        self.BenchmarkingCSVFile_path = BenchmarkingCSVFile_path
        self.InputFiles_path = InputFiles_path
        self.storage_path = storage_path
        self.Number_of_jobs_repetition = Number_of_jobs_repetition

    @abstractmethod
    def factory_method_create_job(self):
        """
        Note that the BenchmarkingToolCreator may also provide some default implementation of
        the factory method.
        """
        pass

    def runBenchmarking(self) -> str:
        """
        Also note that, despite its name, the BenchmarkingToolCreator's primary responsibility
        is not creating jobs. Usually, it contains some core business logic
        that relies on Job objects, returned by the factory method.
        Subclasses can indirectly change that business logic by overriding the
        factory method and returning a different type of job from it.
        """

        # Call the factory method to create a Job object.
        job = self.factory_method_create_job()
        # Now, use the job.

        # TODO: Inside of this method will be our core business logic
        
        # current date and time
        now = datetime.now()

        # ID to identify each Benchmarking executed
        ExecutionID = now.strftime("%Y%m%d%H%M%S")

        # Storing job_parameters of CSV file
        job_parameters = job.readBenchmarkingProfiles()

        # Specific files Identification (.tsv, .d, .xml, .fasta)
        specificInputFiles = job.identifySpecificInputFiles()

        # Let's run the job according to the number of repetition
        for parameters in job_parameters:

            for _ in range(0, self.Number_of_jobs_repetition):
                now = datetime.now()  # current date and time
                JobExecutionID = now.strftime("%Y%m%d%H%M%S")
                running_job_path = f"{self.storage_path}repo-{parameters['job-name']}-{JobExecutionID}/"

                job.createRepository(running_job_path)

                # Copy samples files k = number of input files to randomly select
                sample_files = random.sample(specificInputFiles["original_files"], k=int(parameters["NumFiles"]))
                specificInputFiles["sample_files"] = sample_files

                for sample_file_path in specificInputFiles["sample_files"]:
                    name_of_folder = sample_file_path.split("/")[-1]
                    job.copyInputFiles(sample_file_path, running_job_path + name_of_folder)

                # Copy ONLY specific input files such (.tsv, .xml, .fasta)
                job.copySpecificInputFiles(specificInputFiles, running_job_path)
        
                # Create and Execute the SBatch File

                job.createExecutableBatchFile(
                    parameters,
                    running_job_path,
                    specificInputFiles,
                    ExecutionID,
                )

        
        # result = f"BenchmarkingToolCreator: The same creator's code has just worked with {job.readBenchmarkingProfiles()}"
        result = "BenchMe has finished running"

        return result


In [None]:
"""
Concrete Creators override the factory method in order to change the resulting
product's type.
"""

class MQBenchmarkingTool(BenchmarkingToolCreator):
    """
    Note that the signature of the method still uses the abstract job type,
    even though the concrete job is actually returned from the method. This
    way the BenchmarkingToolCreator can stay independent of concrete job classes.
    """
    def __init__(self,BenchmarkingCSVFile_path, InputFiles_path, storage_path, Number_of_jobs_repetition,):
        super().__init__(BenchmarkingCSVFile_path, InputFiles_path, storage_path, Number_of_jobs_repetition)

        # TODO: Variables needs to be initialize
        # Extract the list of Input filenames: .Fasta, .XML and .d
        # original_files = glob.glob(InputFiles_path + "*.d", recursive=False)

        # Create a Dictionary to store Input Files Orderly
        # MaxQuantInputFiles = {}
        # MaxQuantInputFiles["fasta_file"] = glob.glob(InputFiles_path + "*.fasta", recursive=False)[0]
        # MaxQuantInputFiles["xml_file"] = glob.glob(InputFiles_path + "*.xml", recursive=False)[0]

        # self.sample_files = sample_files
        # self.xml_file_path = xml_file_path
        # self.numthreads = numthreads
        
    def factory_method_create_job(self) -> Job:
        return MaxQuantJob(self.BenchmarkingCSVFile_path,self.InputFiles_path,self.storage_path,self.Number_of_jobs_repetition )


class DiaNNBenchmarkingTool(BenchmarkingToolCreator):
    def __init__(self,BenchmarkingCSVFile_path, InputFiles_path, storage_path, Number_of_jobs_repetition):
        super().__init__(BenchmarkingCSVFile_path, InputFiles_path, storage_path, Number_of_jobs_repetition)
        
    def factory_method_create_job(self) -> Job:
        return DiaNNJob(self.BenchmarkingCSVFile_path,self.InputFiles_path,self.storage_path,self.Number_of_jobs_repetition)


In [None]:
# TODO: Add this function to your diagram
def client_code(creator: BenchmarkingToolCreator) -> None:
    """
    The client code works with an instance of a concrete creator, albeit through
    its base interface. As long as the client keeps working with the creator via
    the base interface, you can pass it any creator's subclass.
    """

    print(
            f"Client: I'm not aware of the creator's class, but it still works.\n"
            f"{creator.runBenchmarking()}",
            end="",
        )


In [None]:
# Welcoming message
BenchTool = input("Thanks for using BenchMe! Please select '1' if you want benchmark MQ or '2' for DiaNN")

InputFiles_path = input("Absolute path of Input DiaNN input files (.d, .fasta, .tsv). I.E: '/stornext/HPCScratch/home/romano.h/Software-Projects/Local-Repositories/DiaNN/DiaNNFiles-Dataset/'")
BenchmarkingCSVFile_path = input("Name of CSV file with DiaNN Profiles for benchmarking. I.E: 'benchmarking-profiles.csv'")
storage_path = input("Absolute path of storage directory to save outputs. I.E: '/vast/scratch/users/romano.h/DiaNNBenchmarking/'")
Number_of_jobs_repetition = int(input("Number of times to run each benchmarking profile job. Default: 5 times"))

if BenchTool == '1':
    print("App: Launched with the MQBenchmarkingTool.")
    client_code(MQBenchmarkingTool(BenchmarkingCSVFile_path, InputFiles_path, storage_path, Number_of_jobs_repetition))
elif BenchTool == '2':
    print("App: Launched with the DiaNNBenchmarkingTool.")
    client_code(DiaNNBenchmarkingTool(BenchmarkingCSVFile_path, InputFiles_path, storage_path, Number_of_jobs_repetition))
else:
    print("I am sorry, the selected option is invalid.")


## This will need to be added/modified when migrating out of Notebooks

In [None]:
# TODO: Add this function to your diagram
def main() -> None:
    """
    The client code works with an instance of a concrete creator, albeit through
    its base interface. As long as the client keeps working with the creator via
    the base interface, you can pass it any creator's subclass.
    """

    print("App: Launched with the MQBenchmarkingTool.")
    client_code(MQBenchmarkingTool())
    print("\n")

    print("App: Launched with the DiaNNBenchmarkingTool.")
    client_code(DiaNNBenchmarkingTool())

    def client_code(creator: BenchmarkingToolCreator):

        print(
            f"Client: I'm not aware of the creator's class, but it still works.\n"
            f"{creator.runBenchmarking()}",
            end="",
        )

In [None]:
FLAGS = flags.FLAGS
flags.DEFINE_string("files", None, "Absolute path of Input DiaNN input files (.d, .fasta, .tsv). I.E: '/stornext/HPCScratch/home/romano.h/Software-Projects/Local-Repositories/DiaNN/DiaNNFiles-Dataset/'")
flags.DEFINE_string("profiles", None, "Name of CSV file with DiaNN Profiles for benchmarking. I.E: 'benchmarking-profiles.csv'")
flags.DEFINE_string("storage", None, "Absolute path of storage directory to save outputs. I.E: '/vast/scratch/users/romano.h/DiaNNBenchmarking/'")
flags.DEFINE_integer("repeat", 5,"Number of times to run each benchmarking profile job. Default: 5 times")

# Required flag.
flags.mark_flag_as_required("files")
flags.mark_flag_as_required("profiles")
flags.mark_flag_as_required("storage")

In [None]:
if __name__ == "__main__":
    app.run(main)
