Motivation for this post originates from my interest in working with [openSNP](https://opensnp.org/) data. openSNP is an open-source platform where users upload their genotype data from direct-to-consumer genetics companies like [23andMe](https://www.23andme.com/).  Raw result files are often hundreds of thousandas of records long, each record representing a location in the human genome.  There are over 4,000 raw result files in the openSNP data however, my laptop runs a VirtualBox with only 5GB RAM, which meant I needed to find some clever solutions beyond the standard [pandas]() library in order to work with this much data.  This post describes one of several applications that can benefit from out-of-core computing.

## openSNP data dump

In [None]:
# this will take some time to download, depending on your internet speed
! wget https://opensnp.org/data/zip/opensnp_datadump.current.zip

In [None]:
! unzip opensnp_datadump.current.zip
! du -h 

That's over 40 GB of genotype data stored as text files!  For the purposes of this demo, the analysis will be limited to only the **23&Me** files.

In [6]:
# Isolate the 23&me files
! mkdir /media/sf_ubuntuVbox/gt23
! mv *.23andme.txt* /media/sf_ubuntuVbox/gt23/

In [1]:
! find /media/sf_ubuntuVbox/opensnp_datadump.current/gt23/ -type f | wc -l

1915


Here's a look at an example **23&Me** result file, the header is commented out with `#` then there are 600,000+ rows of genotype data.

In [117]:
testfile = '/media/sf_ubuntuVbox/opensnp_datadump.current/gt23/user2001_file1185_yearofbirth_unknown_sex_unknown.23andme.txt'
! head -20 $testfile

# This data file generated by 23andMe at: Mon Apr 28 21:55:49 2014
#
# Below is a text version of your data.  Fields are TAB-separated
# Each line corresponds to a single SNP.  For each SNP, we provide its identifier 
# (an rsid or an internal id), its location on the reference human genome, and the 
# genotype call oriented with respect to the plus strand on the human reference sequence.
# We are using reference human assembly build 37 (also known as Annotation Release 104).
# Note that it is possible that data downloaded at different times may be different due to ongoing 
# improvements in our ability to call genotypes. More information about these changes can be found at:
# https://www.23andme.com/you/download/revisions/
# 
# More information on reference human assembly build 37 (aka Annotation Release 104):
# http://www.ncbi.nlm.nih.gov/mapview/map_search.cgi?taxid=9606
#
# rsid	chromosome	position	genotype
rs12564807	1	734462	AA
rs3131972	1	752721	G

In [5]:
! wc -l $testfile

602366 /media/sf_ubuntuVbox/opensnp_datadump.current/gt23/user2001_file1185_yearofbirth_unknown_sex_unknown.23andme.txt


Unfortunately, like most real-world data, the openSNP data dump was a little messy.  Some of the files were in binary format, had incorrect filename extensions, and malformed headers.  I removed files that were small or large, relative to the averages.

In [8]:
# Clean 23&me files -- may need some other processing
! find . -name "*.txt" -size -15M -delete
! find . -name "*.txt" -size +25M -delete

## openSNP test set

For the first exercise, let's use a manageable subset of the full openSNP data.

In [None]:
# select 10 for a test set
! mkdir gt23test
! cp gt23/user200* gt23test/

In [2]:
import glob
dir23gt = '/media/sf_ubuntuVbox/opensnp_datadump.current/gt23test/'
allFiles = glob.glob(dir23gt + "/*.txt")

In [3]:
# How big are the files in the gt23test directory?
! du -h $dir23gt

166M	/media/sf_ubuntuVbox/opensnp_datadump.current/gt23test/


## Pure pandas

First, start with a pure [pandas](http://pandas.pydata.org/) `read_csv` solution, something that should be familiar to Python data scientists.  Let's try to create a large `DataFrame` in memory.  `join` can accomplish this task, even though it's an expensive operation the test data is small enough that we can successfully execute it.

In [11]:
import re
import pandas as pd

In [9]:
%%time
gtdf = pd.DataFrame(index=['rsid']) #empty DataFrame

for file_ in allFiles:
    #extract the userid
    match = re.search('user(\d+)', file_)
    userid = match.group(1)
    
    df = pd.read_csv(file_, 
                     comment='#', 
                     sep='\t', 
                     header=None,
                     usecols=[0,3],
                     error_bad_lines=False,
                     names=['rsid', userid],
                     dtype={'rsid':object,
                            'genotype':object},
                     index_col=['rsid']
                    )
    gtdf = gtdf.join(df, how='outer')

CPU times: user 52.3 s, sys: 508 ms, total: 52.8 s
Wall time: 53.9 s


In [10]:
gtdf.head()

Unnamed: 0,2000,2001,2002,2003,2004,2005,2006,2008,2009,200
i1000001,A,A,,A,A,A,A,A,A,
i1000003,A,A,,A,A,A,A,A,A,
i1000007,C,C,,C,C,C,C,C,C,
i1000008,G,G,,G,G,G,G,G,G,
i1000009,G,G,G,G,G,G,G,G,G,G


On the test set of 10 files, the entire `DataFrame` fits into memory.
However, using this pure pandas method on the full set of 1,915 files would eventually 
crash the computer because it would run out of pyhsical memory.

## Dask -- parallel out-of-core DataFrame

Enter [dask](http://dask.pydata.org/en/latest/index.html), a parallel Python library that implements out-of-core DataFrames.  It's API is similar to pandas, with a few additional methods and arguments.
This code will read the same files in parallel and create a "lazy" `DataFrame` that isn't computed until explicity executed.

In [1]:
import dask.dataframe as dd

In [None]:
#for the blocksize parameter in the next cell
import psutil
blocksize = psutil.virtual_memory().total / psutil.cpu_count() / 10

In [13]:
%%time # <-- won't actually read the csv's yet...
ddf = dd.read_csv('/media/sf_ubuntuVbox/opensnp_datadump.current/gt23test/*.txt',
                  comment='#', 
                  sep='\t', 
                  header=None,
                  usecols=[0,3],
                  error_bad_lines=False,
                  names=['rsid', 'genotype'],
                  dtype={'rsid':object,
                         'genotype':object},
                  blocksize=blocksize)

CPU times: user 16 ms, sys: 8 ms, total: 24 ms
Wall time: 35.5 ms


In [14]:
# 1 partition per file
ddf

dd.DataFrame<from-de..., npartitions=10>

In [16]:
%%time
#  this operation is expensive
ddf = ddf.set_index('rsid')

CPU times: user 27.4 s, sys: 1.6 s, total: 29 s
Wall time: 34.2 s


The dask `compute()` method provides familiar results.  Since we didn't `join` the dask DataFrame, let's investigate one SNP.
Remember, in contrast to `gtdf`, `ddf` isn't in memory so it will take longer to get this result.

In [26]:
# dask
%%time
ddf.loc['rs1333525']['genotype'].value_counts().compute()

CPU times: user 18.3 s, sys: 1.32 s, total: 19.6 s
Wall time: 17.5 s


CC    8
CT    2
Name: genotype, dtype: int64

In [24]:
# pure pandas
%%time
gtdf.loc['rs1333525'].value_counts()

CPU times: user 372 ms, sys: 0 ns, total: 372 ms
Wall time: 383 ms


CC    8
CT    2
Name: rs1333525, dtype: int64

At first blush, the computation times are deceiving.  As expected, the dask method took longer because of the lazy computation, it still had to read all the files and then perform the operation.
If you consider the 34 seconds to set the index and the 17.5 seconds to compute the `value_counts()`, the parallel dask method was actually equally as fast as the 53.9 seconds 
that it took pure pandas to create the `gtdf` `DataFrame`.  The benefit is that the parallel dask method didn't store the `DataFrame` object in memory!

##  Increase query peformance with Parquet

It would be nice to speed up the dask queries so we can work with the `DataFrame` for downstream analysis.
The solution is to store the data on disk in an efficient format (binary).  A popular choice for this is traditionally [HDF5](https://support.hdfgroup.org/HDF5/doc/H5.intro.html), but I chose to use [parquet](https://parquet.apache.org/) becuase 
HDF5 can be tricky to work with.  Dask uses the [fastparquet](http://fastparquet.readthedocs.io/en/latest/index.html) implementation.

In [31]:
%%time
dd.to_parquet('/media/sf_ubuntuVbox/opensnp_datadump.current/gt23test_pq/', ddf)

CPU times: user 1min 7s, sys: 3.74 s, total: 1min 11s
Wall time: 1min 10s


Essentially, what we've done to this point is convert csv files to parquet files.  How much performance is really gained from this?
Revisiting the dask DataFrame from csv's, `ddf`, a comparison:

In [2]:
# using dask with fastparquet, nothing computed here...
pqdf = dd.read_parquet('/media/sf_ubuntuVbox/opensnp_datadump.current/gt23test_pq/', index='rsid')

In [35]:
%%time
pqdf.loc['rs1333525']['genotype'].value_counts().compute()

CPU times: user 3.94 s, sys: 108 ms, total: 4.05 s
Wall time: 4.37 s


CC    8
CT    2
Name: genotype, dtype: int64

In [36]:
%%time
ddf.loc['rs1333525']['genotype'].value_counts().compute()

CPU times: user 18.6 s, sys: 1.15 s, total: 19.8 s
Wall time: 18.3 s


CC    8
CT    2
Name: genotype, dtype: int64

Parquet offers a noticeable performance increase, even with only 10 files.  Scaling this up to 1,915 files, the csv version, `ddf`, took 5+ hours to execute `value_counts()` for one SNP.
`to_parquet` on the 1,915 file `DataFrame` took a couple hours.  Once you see the query performance improvements, it's well worth the up-front cost of converting from csv to parquet.

## Performance on full data set

In [46]:
# Used commands above to produce parquet files
! find /media/sf_ubuntuVbox/opensnp_datadump.current/gt23_pq/ -type f | wc -l

1898


In [9]:
allpqdf = dd.read_parquet('/media/sf_ubuntuVbox/opensnp_datadump.current/gt23_pq/', index='rsid')

In [5]:
%%time
allpqdf.loc['rs1333525']['genotype'].value_counts().compute()

CPU times: user 9.07 s, sys: 524 ms, total: 9.59 s
Wall time: 10.7 s


CC    1625
CT     277
TT      13
Name: genotype, dtype: int64

These results highlight the superior performance of parquet over csv.  Additionally, dask proved it's value as an easy-to-use tool when physical memory is a constraint.
By now you've probably noticed I wasn't able to assign the user identifier to each column.  `dd.read_csv()` assumes the column names in each file are identical.

## Efficient "big-data" downstream analysis

I wanted to show how easy dask and parquet make it to quickly compare variant frequencies in the opensnp data to [exAC](http://exac.broadinstitute.org/) data.
Here we'll define a function to extract variant frequencies from exAC, then compare to openSNP frequencies.

In [3]:
from urllib.parse import urlparse
from urllib.parse import urlencode
from urllib.request import urlopen
import json

In [4]:
import pandas as pd

In [10]:
def ExacFrequency(rsid):
    try:
        exac = {}

        request = urlopen('http://exac.hms.harvard.edu/rest/dbsnp/{}'.format(rsid))
        results = request.read().decode('utf-8')
        exac_variants = json.loads(results)['variants_in_region']

        for alt_json in exac_variants:
            exac_allele = alt_json['ref']+alt_json['alt']
            exac_freq = alt_json['allele_freq']
            exac[exac_allele] = exac_freq
    except:
        return {'n/a':0}
    return exac

In [11]:
brca2snps = ['rs766173', 'rs144848', 'rs11571746', 'rs11571747',
             'rs4987047', 'rs11571833', 'rs1801426']

In [12]:
df = pd.DataFrame(columns=['rsid', 'gt', 'source', 'varFreq'])

In [13]:
for snp in brca2snps:
    Osnp_alleles = allpqdf.loc[snp]['genotype'].value_counts().compute() #dask
    
    Osnp_alleles = Osnp_alleles.to_dict()
    opensnp_N = sum(Osnp_alleles.values())
    
    Osnp_freqs = {alt: AC/float(opensnp_N) for alt,AC in Osnp_alleles.items()}
    exac_freqs= ExacFrequency(snp)
    
    for gt,freq in exac_freqs.items():
        df = df.append({'rsid':snp, 'gt': gt, 'source':'exAC', 'varFreq':freq }, ignore_index=True )
    
    for gt,freq in Osnp_freqs.items():
        if gt in exac_freqs:
            gt = gt
        elif gt[::-1] in exac_freqs:
            gt = gt[::-1]
        else:
            continue
        df = df.append({'rsid':snp, 'gt': gt, 'source':'Osnp', 'varFreq':freq }, ignore_index=True )

[](/assets/images/snps.png)

In my next post, I'll focus on working with some of the phenotype groups identified in the opensnp data.  Thanks for reading, comments and code improvements welcome!