In [None]:
%%time

import pandas as pd
import numpy
import os
import tarfile
import dask
from dask.distributed import Client, progress
client = Client(threads_per_worker=2, n_workers=6)

def genotype_compare(geno_a, geno_b):
  (a1, a2) = geno_a.split("|")
  (b1, b2) = geno_b.split("|")

  if a1 == b1 and a2 == b2:
    return 1
  elif a1 == b1 or a2 == b2:
    return .5
  else:
    return 0

def make_matrix(row, samples):
  array = numpy.zeros(shape=[len(samples), len(samples)])
  for j, sample_a in enumerate(samples):
    for k, sample_b in enumerate(samples):
      array[j,k] = genotype_compare(row[sample_a], row[sample_b])
  return array

def export_matrix(vcf_file, skip_rows):
    # todo: handle skiprows?

    vcf_df = pd.read_csv(vcf_file, sep="\t", skiprows=skip_rows, dtype='string')
    samples = list(vcf_df.columns)[9:]
    vcf_df['matrix'] = vcf_df.apply(lambda row: make_matrix(row, samples), axis=1)

    vcf_file_base = '.'.join(os.path.basename(vcf_file).split('.')[:-2])

    tar_file_name = f"{vcf_file_base}.row_matrices.tar.gz"

    # create tar file
    with tarfile.open(tar_file_name, "w:gz") as tar:
        for row in range(0,vcf_df.shape[0]):
          row_df = pd.DataFrame(vcf_df.iloc[row]['matrix'], columns=samples, index=samples)
          id = vcf_df.iloc[row]['ID']
          filename = f"{vcf_file_base}.{id}.matrix.tsv"
          row_df.to_csv(filename, sep="\t")
          tar.add(filename)
          os.remove(filename)

    tar.close()

    return tar_file_name

def join(foo):
    print(foo)

def generate_matrices():
    phenotypes = ["max_height_cm", "max_growth_cm_gdd"]
    pvalues = ["p0001", "p0005", "p001"]

    delayed_results = []

    for phenotype in phenotypes:
        for pvalue in pvalues:
            delayed_results.append(dask.delayed(export_matrix)(f"../vcf/sorghum.filtered.season4.season6.{phenotype}_{pvalue}.vcf.gz", skip_rows=83))
            delayed_results.append(dask.delayed(export_matrix)(f"../vcf/sorghum.filtered.season4.season6.{phenotype}_{pvalue}_qtl.vcf.gz", skip_rows=84))

    x = dask.delayed(join)(delayed_results)
    x.compute()

generate_matrices()


+---------+-------------+-----------+---------+
| Package | This Worker | scheduler | workers |
+---------+-------------+-----------+---------+
| tornado | 6.1         | 6.2       | 6.1     |
+---------+-------------+-----------+---------+

+---------+-------------+-----------+---------+
| Package | This Worker | scheduler | workers |
+---------+-------------+-----------+---------+
| tornado | 6.1         | 6.2       | 6.1     |
+---------+-------------+-----------+---------+

+---------+-------------+-----------+---------+
| Package | This Worker | scheduler | workers |
+---------+-------------+-----------+---------+
| tornado | 6.1         | 6.2       | 6.1     |
+---------+-------------+-----------+---------+

+---------+-------------+-----------+---------+
| Package | This Worker | scheduler | workers |
+---------+-------------+-----------+---------+
| tornado | 6.1         | 6.2       | 6.1     |
+---------+-------------+-----------+---------+

+---------+-------------+----------