In [1]:
# Using https://github.com/fastlmm/PySnpTools/commit/23236b80 (but any should do)
#    and
#       https://github.com/CarlKCarlK/sgkit-plink/commit/309684b9
#            python setup.py build_ext --inplace    

In [1]:
# util.mapreduce1: Run loops in parallel on multiple processes, threads, or clusters.
#       API Docs: https://fastlmm.github.io/PySnpTools/#module-pysnptools.util.mapreduce1

from pysnptools.util.mapreduce1 import map_reduce
from pysnptools.util.mapreduce1.runner import LocalMultiThread, Local
import time

def slow_square(x):
    time.sleep(1)
    return x*x

def square_sum(count,runner=None):
    ss= map_reduce(range(count),
               mapper=slow_square,
               reducer=sum,
               runner=runner)
    return ss

square_sum(3)

5

In [3]:
%%time

square_sum(10,runner=Local())

CPU times: user 0 ns, sys: 0 ns, total: 0 ns
Wall time: 10 s


285

In [5]:
%%time

#Multithreading is faster

square_sum(10,runner=LocalMultiThread(10))

CPU times: user 0 ns, sys: 0 ns, total: 0 ns
Wall time: 1 s


285

In [2]:
# Read from a Bed file in (perhaps parallel) batches and return the mean

import os
import numpy as np
from sgkit_plink._open_bed import open_bed

os.chdir(r'D:\OneDrive\programs\pstsgkit\tests')

def thread_read(filename, sid_end=None, runner=None, verbose=False):
    with open_bed(filename) as bed:
        bed.shape # causes the lazy metadata to read the number of lines in the two metadata files
        batch_size = 100
        def read_and_report(start):
            if verbose:
                print(start)
            val = bed.read(index=np.s_[:,start:start+batch_size],dtype='int8')
            return np.nanmean(val), val.shape

        sid_end = sid_end or bed.sid_count #!!!but what if sid_end is 0?
        report = map_reduce(range(0, sid_end,batch_size),
                   mapper=read_and_report,
                   runner=runner)
        return report
    

thread_read('datasets/all_chr.maf0.001.N300.bed')

Note: NumExpr detected 12 cores but "NUMEXPR_MAX_THREADS" not set, so enforcing safe limit of 8.
NumExpr defaulting to 8 threads.
Loading fam file datasets\all_chr.maf0.001.N300.fam
Loading bim file datasets\all_chr.maf0.001.N300.bim


[(0.13873333333333332, (300, 100)),
 (0.22336666666666666, (300, 100)),
 (0.1944, (300, 100)),
 (0.20983333333333334, (300, 100)),
 (0.21043333333333333, (300, 100)),
 (0.18643333333333334, (300, 100)),
 (0.19606666666666667, (300, 100)),
 (0.13453333333333334, (300, 100)),
 (0.19976666666666668, (300, 100)),
 (0.18323333333333333, (300, 100)),
 (0.4428888888888889, (300, 15))]

In [3]:
%%time

bigfile = r'M:\deldir\genbgen\2\merged_487400x220000.1.bed'
open_bed(bigfile).shape

Loading fam file M:\deldir\genbgen\2\merged_487400x220000.1.fam
Loading bim file M:\deldir\genbgen\2\merged_487400x220000.1.bim
Wall time: 3.21 s


(487400, 220000)

In [4]:
%%time

thread_read(bigfile,sid_end=1000,runner=Local(),verbose=True)

Loading fam file M:\deldir\genbgen\2\merged_487400x220000.1.fam
Loading bim file M:\deldir\genbgen\2\merged_487400x220000.1.bim
0
100
200
300
400
500
600
700
800
900
Wall time: 5.36 s


[(-29.79690145670907, (487400, 100)),
 (-24.055449343455066, (487400, 100)),
 (-34.39397287648748, (487400, 100)),
 (-29.801632334837915, (487400, 100)),
 (-32.10032076323348, (487400, 100)),
 (-37.84243836684448, (487400, 100)),
 (-19.455485945835044, (487400, 100)),
 (-22.905076590069758, (487400, 100)),
 (-24.05008110381617, (487400, 100)),
 (-22.907826713171932, (487400, 100))]

In [5]:
%%time
#The verbose messages arrive out of order, so we know it's multithreading
thread_read(bigfile,sid_end=1000,runner=LocalMultiThread(10),verbose=True)

Loading fam file M:\deldir\genbgen\2\merged_487400x220000.1.fam
Loading bim file M:\deldir\genbgen\2\merged_487400x220000.1.bim
0
100
200
300
400
500
600
700
800
900
Wall time: 4.96 s


[(-29.79690145670907, (487400, 100)),
 (-24.055449343455066, (487400, 100)),
 (-34.39397287648748, (487400, 100)),
 (-29.801632334837915, (487400, 100)),
 (-32.10032076323348, (487400, 100)),
 (-37.84243836684448, (487400, 100)),
 (-19.455485945835044, (487400, 100)),
 (-22.905076590069758, (487400, 100)),
 (-24.05008110381617, (487400, 100)),
 (-22.907826713171932, (487400, 100))]

In [6]:
%%time

#10K variants
thread_read(bigfile,sid_end=10*1000,runner=LocalMultiThread(10),verbose=True)

Loading fam file M:\deldir\genbgen\2\merged_487400x220000.1.fam
Loading bim file M:\deldir\genbgen\2\merged_487400x220000.1.bim
0
1000
2000
1003000

1100
21004000

5000
31006000
4100
200

7000
1200
4200220051006100
8000



7100
9000
32001300

3008100

520091006200
23004300


400
1400
8200

330072006300
24008300

5300

1500

34009200
8400
500

930044006400

2500

7300
16008500

2600
3500
8600
600
7400
9400
4500
1700
8700
27006500
9500
8800

700360054004600
7500


28009600

6600
8900
1800
80047007600


3700

9700550029004800



6700
1900
56007700900


4900
3800
6800
98003900

78005700
6900

9900
7900
5800
5900
Wall time: 21.2 s


[(-29.79690145670907, (487400, 100)),
 (-24.055449343455066, (487400, 100)),
 (-34.39397287648748, (487400, 100)),
 (-29.801632334837915, (487400, 100)),
 (-32.10032076323348, (487400, 100)),
 (-37.84243836684448, (487400, 100)),
 (-19.455485945835044, (487400, 100)),
 (-22.905076590069758, (487400, 100)),
 (-24.05008110381617, (487400, 100)),
 (-22.907826713171932, (487400, 100)),
 (-20.599556114074684, (487400, 100)),
 (-20.60552800574477, (487400, 100)),
 (-27.499685227739022, (487400, 100)),
 (-24.048973553549445, (487400, 100)),
 (-25.19879571194091, (487400, 100)),
 (-30.94615664751744, (487400, 100)),
 (-21.753124969224455, (487400, 100)),
 (-33.246497496922444, (487400, 100)),
 (-29.798468178087813, (487400, 100)),
 (-20.604071235125154, (487400, 100)),
 (-28.648547004513745, (487400, 100)),
 (-27.498296819860485, (487400, 100)),
 (-21.759837566680346, (487400, 100)),
 (-30.948608904390642, (487400, 100)),
 (-27.497819798933115, (487400, 100)),
 (-27.50512839556832, (487400, 10

In [10]:
#Single threaded: 1000 variants: 10 seconds
#Multi            1000 variants:  5 seconds
#Multi          10,000 variants: 23 seconds