Run: `seq 1 1 10 | xargs  -i papermill /data/work/home/rmalanij/bdg-seqtender/performance/bdg_perf/pipeline-benchmark-manual.ipynb /data/work/home/rmalanij/bdg-seqtender/performance/bdg_perf/pipeline-benchmark-manual_run.ipynb -p executor_num {} -k seq-edu"'`

In [None]:
executor_num = 8
executor_mem  = 4
min_partitions = 40

In [None]:
import os
bdg_perf_pass = os.environ.get("BDG_PERF_PASS")
bdg_perf_db = os.environ.get("BDG_PERF_DB")
bdg_perf_user = os.environ.get("BDG_PERF_USER")
bdg_perf_table = "bdg_perf_tests"
os.environ['SPARK_HOME'] = "/data/local/opt/spark-2.4.3-bin-hadoop2.7"
os.environ['PYSPARK_PYTHON'] = '/usr/bin/python3.6'
os.environ['PYSPARK_SUBMIT_ARGS'] = "--conf spark.hadoop.yarn.timeline-service.enabled=false --conf spark.driver.extraJavaOptions='-Dhdp.version=3.1.0.0-78' --conf spark.yarn.am.extraJavaOptions='-Dhdp.version=3.1.0.0-78' --conf spark.hadoop.hive.metastore.uris='thrift://cdh01.cl.ii.pw.edu.pl:9083' --conf spark.hadoop.hive.metastore.sasl.enabled=true --conf spark.hadoop.metastore.catalog.default=hive --conf spark.sql.catalogImplementation=hive --conf spark.hadoop.hive.execution.engine=mr --conf spark.shuffle.service.enabled=true --conf spark.dynamicAllocation.enabled=true --conf spark.dynamicAllocation.maxExecutors=5 --conf spark.sql.hive.metastore.jars='/usr/hdp/current/spark2-client/standalone-metastore/*' --conf spark.hadoop.net.topology.script.file.name='/etc/hadoop/conf/topology_script_spark.py' --conf spark.jars.repositories='http://zsibio.ii.pw.edu.pl/nexus/repository/maven-releases/,http://zsibio.ii.pw.edu.pl/nexus/repository/maven-snapshots/' --conf spark.jars='/data/local/opt/sequila/bdg-sequila_2.11-0.6.0-spark-2.4.3-SNAPSHOT-assembly.jar,/data/local/opt/seqtender/bdg-seqtender_2.11-0.2-SNAPSHOT-assembly.jar' pyspark-shell"

In [None]:
app_name = 'SeqtenderPerfTest'

split_size = round(134217728/min_partitions)

from pyspark.sql import SparkSession
spark = SparkSession \
.builder \
.master(f'local[{executor_num}]') \
.config('spark.driver.memory','4g') \
.config('spark.executor.memory', f'{executor_mem}g') \
.config('spark.dynamicAllocation.enabled', 'false') \
.config('spark.sparkContext.defaultMinPartitions',f'{min_partitions}') \
.config("spark.hadoop.mapreduce.input.fileinputformat.split.maxsize", f'{split_size}') \
.config("spark.hadoop.mapreduce.input.fileinputformat.split.minsize", f'{split_size}') \
.appName(f'bdgenomics-{app_name}') \
.getOrCreate()
#.config('spark.executor.instances',f'{executor_num}') \

In [None]:
#vcf_path= "/igap/all/split/HG001_GRCh38_GIAB_highconf_CG-IllFB-IllGATKHC-Ion-10X-SOLID_CHROM1-X_v.3.3.2_highconf_PGandRTGphasetransfer.vcf.gz"
#anno_vcf_path = '/edugen/vcf/NA12878_anno.vcf'

vcf_path= "/data/work/home/rmalanij/HG001_GRCh38_GIAB_highconf_CG-IllFB-IllGATKHC-Ion-10X-SOLID_CHROM1-X_v.3.3.2_highconf_PGandRTGphasetransfer.vcf.gz"
anno_vcf_path = '/data/work/home/rmalanij/NA12878_anno.vcf'


cache_dir = "/data/samples/vep_data/vep/95"
vep_version="95"
annotate_cmd = f"""docker run --rm -i -v {cache_dir}:/opt/vep/.vep biodatageeks/bdg-vep:{vep_version}
        vep
        --dir /opt/vep/.vep
        --pick_allele
        --format vcf
        --no_stats
        --force_overwrite
        --everything
        -cache
        --vcf
        -offline
        -o stdout """.replace("\n   ", "") 

In [None]:
import timeit
import hashlib
import re
import datetime 


def time(command, 
         tag: str = None, 
         executor_name: str = None,
         tool_name: str = None,
         tool_version: str = None,
         docker_image: str = None,
         num = 1, 
         executor_num = 1, 
         executor_mem = 1, 
         global_vars = None,
         docker_command: str = None,
         input_file: str = None):
    results = []
    for i in range(0, num):
        wall_time = timeit.timeit(command, number=1, globals = global_vars )
        command_hash = hashlib.md5(re.sub(r'\W', '', command).encode() ).hexdigest()
        perf_record = [command_hash, 
                       tag,
                       executor_name,
                       tool_name,
                       tool_version,
                       docker_image,
                       datetime.datetime.now(), 
                       command, 
                       docker_command,
                       input_file,
                       executor_num, 
                       executor_mem, 
                       wall_time ]
        results.append(perf_record)
    dfw = spark.createDataFrame(results, ['test_id',
                                          'tag',
                                          'executor_name',
                                          'tool_name',
                                          'tool_version',
                                          'docker_image',
                                          'time_stamp', 
                                          'command',
                                          'docker_command',
                                          'input_file',
                                          'exec_total_cores', 
                                          'exec_mem', 
                                          'wall_time'])
#     dfw.write \
#     .format("jdbc") \
#     .option("url", f"{bdg_perf_db}") \
#     .option("dbtable", f"{bdg_perf_table}") \
#     .option("user", f"{bdg_perf_user}") \
#     .option("password", f'{bdg_perf_pass}') \
#     .option("driver", "org.postgresql.Driver") \
#     .mode("append") \
#     .save()
    return dfw

In [None]:
#!hdfs dfs -rm -r -skipTrash /edugen/vcf/NA12878_anno.vcf*
!yes | rm -r {anno_vcf_path}

In [None]:
%%time
tag = 'vcf_annotation localhdd'
anno_code = """
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()
from seqtender import SeqTenderAnnotation
seq_anno = SeqTenderAnnotation(spark)
annotated = seq_anno.pipe_variants (vcf_path, annotate_cmd)
seq_anno.save_variants(anno_vcf_path, annotated)
"""

global_vars={'vcf_path': vcf_path,
             'annotate_cmd': annotate_cmd,
             'anno_vcf_path': anno_vcf_path
            }

df = time(anno_code,
          num = 1,
          executor_num = executor_num,
          executor_name = 'seqtender',
          tool_name = 'vep',
          tool_version = vep_version,
          docker_image = f'biodatageeks/bdg-vep:{vep_version}',
          tag = tag,
          executor_mem = executor_mem, 
          docker_command = annotate_cmd,
          input_file = vcf_path,
          global_vars = global_vars)
df.toPandas()

In [None]:
spark.stop()