Skip to content

Commit

Permalink
Move Bam doctests to unittests
Browse files Browse the repository at this point in the history
  • Loading branch information
mvdbeek committed Dec 8, 2017
1 parent 3c9999d commit 369485a
Show file tree
Hide file tree
Showing 2 changed files with 78 additions and 49 deletions.
56 changes: 7 additions & 49 deletions lib/galaxy/datatypes/binary.py
Expand Up @@ -213,21 +213,13 @@ def merge(split_files, output_file):
:param split_files: List of bam file paths to merge
:param output_file: Write merged bam file to this location
>>> from galaxy.datatypes.sniff import get_test_fname
>>> bamfile = get_test_fname('1.bam')
>>> out_dir = tempfile.mkdtemp()
>>> outpath = os.path.join(out_dir, 'out.bam')
>>> Bam.merge([bamfile, bamfile], outpath)
>>> assert int(pysam.view('-c', outpath).strip()) == 2 * int(pysam.view('-c', bamfile).strip())
>>> shutil.rmtree(outpath, ignore_errors=True)
"""
pysam.merge('-O', 'BAM', output_file, *split_files)

@staticmethod
def _is_coordinate_sorted(file_name):
"""
See if the input BAM file is sorted from the header information.
Check if the input BAM file is sorted from the header information.
>>> from galaxy.datatypes.sniff import get_test_fname
>>> bamfile = get_test_fname('1.bam')
Expand All @@ -247,20 +239,7 @@ def _is_coordinate_sorted(file_name):

def dataset_content_needs_grooming(self, file_name):
"""
See if file_name is a coordinate-sorted BAM file
>>> from galaxy.datatypes.sniff import get_test_fname
>>> bamfile = get_test_fname('1.bam')
>>> b = Bam()
>>> b.dataset_content_needs_grooming(bamfile)
False
>>> out_dir = tempfile.mkdtemp()
>>> qname_sorted = os.path.join(out_dir, 'qname_sorted.bam')
>>> _ = pysam.sort('-n', bamfile, '-o', qname_sorted )
>>> assert b.dataset_content_needs_grooming(qname_sorted) == True
>>> shutil.rmtree(out_dir, ignore_errors=True)
>>> unsorted_bam = get_test_fname('1.unsorted.bam')
>>> assert b.dataset_content_needs_grooming(unsorted_bam) == True
Check if file_name is a coordinate-sorted BAM file
"""
# We check if the input BAM file is coordinate-sorted from the header information.
return not self._is_coordinate_sorted(file_name)
Expand Down Expand Up @@ -305,25 +284,6 @@ def init_meta(self, dataset, copy_from=None):
Binary.init_meta(self, dataset, copy_from=copy_from)

def set_meta(self, dataset, overwrite=True, **kwd):
"""
Creates the index for the BAM file.
>>> from galaxy.datatypes.sniff import get_test_fname
>>> from galaxy.util.bunch import Bunch
>>> dataset = Bunch()
>>> dataset.file_name = get_test_fname('1.bam')
>>> dataset.metadata = Bunch()
>>> dataset.metadata.bam_index = Bunch()
>>> _, dataset.metadata.bam_index.file_name = tempfile.mkstemp()
>>> b = Bam()
>>> b.set_meta(dataset=dataset)
>>> dataset.metadata.sort_order
'coordinate'
>>> bam_file = pysam.AlignmentFile(dataset.file_name, mode='rb', index_filename=dataset.metadata.bam_index.file_name)
>>> bam_file.has_index()
True
>>> os.remove(dataset.metadata.bam_index.file_name)
"""
# These metadata values are not accessible by users, always overwrite
index_file = dataset.metadata.bam_index
if not index_file:
Expand All @@ -333,13 +293,11 @@ def set_meta(self, dataset, overwrite=True, **kwd):
# Now use pysam with BAI index to determine additional metadata
try:
bam_file = pysam.AlignmentFile(dataset.file_name, mode='rb', index_filename=index_file.file_name)
# Reference names, lengths, read_groups and headers can become very large,
# but even small files error out with
# OperationalError: (psycopg2.OperationalError) index row size 3616 exceeds maximum 2712 for index "ix_history_dataset_association_metadata"
# dataset.metadata.reference_names = list(bam_file.references)
# dataset.metadata.reference_lengths = list(bam_file.lengths)
# dataset.metadata.bam_header = bam_file.header
# dataset.metadata.read_groups = [read_group['ID'] for read_group in dataset.metadata.bam_header.get('RG', []) if 'ID' in read_group]
# TODO: Reference names, lengths, read_groups and headers can become very large, truncate when necessary
dataset.metadata.reference_names = list(bam_file.references)
dataset.metadata.reference_lengths = list(bam_file.lengths)
dataset.metadata.bam_header = bam_file.header
dataset.metadata.read_groups = [read_group['ID'] for read_group in dataset.metadata.bam_header.get('RG', []) if 'ID' in read_group]
dataset.metadata.sort_order = bam_file.header.get('HD', {}).get('SO', None)
dataset.metadata.bam_version = bam_file.header.get('HD', {}).get('VN', None)
except Exception:
Expand Down
71 changes: 71 additions & 0 deletions test/unit/datatypes/test_bam.py
@@ -0,0 +1,71 @@
import os
import shutil
import tempfile
from contextlib import contextmanager

import pysam

from galaxy.datatypes.binary import Bam
from galaxy.util.bunch import Bunch


def test_merge_bam():
with get_input_files('1.bam', '1.bam') as input_files, get_tmp_path() as outpath:
Bam.merge(input_files, outpath)
alignment_count_output = int(pysam.view('-c', outpath).strip())
alignment_count_input = int(pysam.view('-c', input_files[0]).strip()) * 2
assert alignment_count_input == alignment_count_output


def test_dataset_content_needs_grooming():
b = Bam()
with get_input_files('1.bam') as input_files:
assert b.dataset_content_needs_grooming(input_files[0]) is False
with get_tmp_path() as qname_sorted:
pysam.sort('-n', input_files[0], '-o', qname_sorted)
assert b.dataset_content_needs_grooming(qname_sorted) is True
with get_input_files('1.unsorted.bam') as input_files:
assert b.dataset_content_needs_grooming(input_files[0]) is True


def test_groom_dataset_content():
b = Bam()
with get_input_files('1.unsorted.bam') as input_files:
b.groom_dataset_content(input_files[0])
assert b.dataset_content_needs_grooming(input_files[0]) is False


def test_set_meta():
b = Bam()
dataset = Bunch()
with get_input_files('1.bam') as input_files, get_tmp_path() as index_path:
dataset.file_name = input_files[0]
dataset.metadata = Bunch()
dataset.metadata.bam_index = Bunch()
dataset.metadata.bam_index.file_name = index_path
b.set_meta(dataset=dataset)
assert dataset.metadata.sort_order == 'coordinate'
bam_file = pysam.AlignmentFile(dataset.file_name, mode='rb',
index_filename=dataset.metadata.bam_index.file_name)
assert bam_file.has_index() is True


@contextmanager
def get_tmp_path():
_, path = tempfile.mkstemp()
os.remove(path)
yield path
os.remove(path)


@contextmanager
def get_input_files(*args):
# need to import here, otherwise get_test_fname is treated as a test
from galaxy.datatypes.sniff import get_test_fname
temp_dir = tempfile.mkdtemp()
test_files = []
for file in args:
shutil.copy(get_test_fname(file), temp_dir)
test_files.append(os.path.join(temp_dir, file))
yield test_files
shutil.rmtree(temp_dir, ignore_errors=True)

0 comments on commit 369485a

Please sign in to comment.