Skip to content

Commit

Permalink
Merge 9b34c97 into ccd9266
Browse files Browse the repository at this point in the history
  • Loading branch information
josenavas committed Jun 20, 2014
2 parents ccd9266 + 9b34c97 commit d05f4de
Show file tree
Hide file tree
Showing 11 changed files with 76 additions and 115 deletions.
60 changes: 10 additions & 50 deletions qiita_db/job.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,17 +16,12 @@
# -----------------------------------------------------------------------------
from __future__ import division
from json import dumps, loads
from os import remove
from os.path import basename, join, commonprefix
from shutil import copy
from os.path import join
from time import strftime
from datetime import date
from tarfile import open as taropen

from .base import QiitaStatusObject
from .util import (insert_filepaths, get_db_files_base_dir, get_work_base_dir,
convert_to_id)
from .exceptions import QiitaDBDuplicateError
from .util import insert_filepaths, convert_to_id
from .sql_connection import SQLConnectionHandler


Expand Down Expand Up @@ -177,35 +172,16 @@ def results(self):
-------
list
Filepaths to the result files
Notes
-----
All files are automatically copied into the working directory and
untar-ed if necessary. The filepaths point to these files/folders in
the working directory.
"""
# Copy files to working dir, untar if necessary, then return filepaths
sql = ("SELECT filepath, filepath_type_id FROM qiita.filepath WHERE "
"filepath_id IN (SELECT filepath_id FROM "
"qiita.job_results_filepath WHERE job_id = %s)")
conn_handler = SQLConnectionHandler()
results = conn_handler.execute_fetchall(sql, (self._id, ))
# create new list, untaring as necessary
results_untar = []
outpath = get_work_base_dir()
for fp, fp_type in results:
if fp_type == 7:
# untar to work directory
with taropen(join(get_db_files_base_dir(),
self._table, fp)) as tar:
base = commonprefix(tar.getnames())
tar.extractall(path=outpath)
else:
# copy to work directory
copy(join(get_db_files_base_dir(), self._table, fp), outpath)
base = fp
results_untar.append(join(outpath, base))
return results_untar
results = conn_handler.execute_fetchall(
"SELECT filepath FROM qiita.filepath WHERE filepath_id IN "
"(SELECT filepath_id FROM qiita.job_results_filepath "
"WHERE job_id = %s)",
(self._id, ))
# create new list, with relative paths from db base
return [join("job", fp[0]) for fp in results]

@property
def error_msg(self):
Expand Down Expand Up @@ -270,28 +246,12 @@ def add_results(self, results):
[1] http://stackoverflow.com/questions/2032403/
how-to-create-full-compressed-tar-file-using-python
"""
# go though the list and tar any folders if necessary
cleanup = []
addpaths = []
for fp, fp_type in results:
if fp_type == 7:
outpath = join("/tmp", ''.join((basename(fp), ".tar")))
with taropen(outpath, "w") as tar:
tar.add(fp)
addpaths.append((outpath, 7))
cleanup.append(outpath)
else:
addpaths.append((fp, fp_type))

# add filepaths to the job
conn_handler = SQLConnectionHandler()
file_ids = insert_filepaths(addpaths, self._id, self._table,
file_ids = insert_filepaths(results, self._id, self._table,
"filepath", conn_handler)

# associate filepaths with job
sql = ("INSERT INTO qiita.{0}_results_filepath (job_id, filepath_id) "
"VALUES (%s, %s)".format(self._table))
conn_handler.executemany(sql, [(self._id, fid) for fid in file_ids])

# clean up the created tars from the temp directory
map(remove, cleanup)
2 changes: 1 addition & 1 deletion qiita_db/support_files/initialize.sql
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ INSERT INTO qiita.portal_type (portal, description) VALUES ('QIIME', 'QIIME port
INSERT INTO qiita.required_sample_info_status (status) VALUES ('received'), ('in_preparation'), ('running'), ('completed');

-- Populate filepath_type table
INSERT INTO qiita.filepath_type (filepath_type) VALUES ('raw_sequences'), ('raw_barcodes'), ('raw_spectra'), ('preprocessed_sequences'), ('preprocessed_sequences_qual'), ('biom'), ('tar'), ('plain_text');
INSERT INTO qiita.filepath_type (filepath_type) VALUES ('raw_sequences'), ('raw_barcodes'), ('raw_spectra'), ('preprocessed_sequences'), ('preprocessed_sequences_qual'), ('biom'), ('directory'), ('plain_text');

-- Populate checksum_algorithm table
INSERT INTO qiita.checksum_algorithm (name) VALUES ('crc32');
Expand Down
2 changes: 1 addition & 1 deletion qiita_db/support_files/populate_test_db.sql
Original file line number Diff line number Diff line change
Expand Up @@ -324,7 +324,7 @@ INSERT INTO qiita.filepath (filepath, filepath_type_id, checksum, checksum_algor
INSERT INTO qiita.processed_filepath (processed_data_id, filepath_id) VALUES (1, 7);

-- Insert filepath for job results files
INSERT INTO qiita.filepath (filepath, filepath_type_id, checksum, checksum_algorithm_id) VALUES ('job1result.txt', 8, '852952723', 1), ('job2tar.tar', 7, '852952723', 1);
INSERT INTO qiita.filepath (filepath, filepath_type_id, checksum, checksum_algorithm_id) VALUES ('1_job_result.txt', 8, '852952723', 1), ('2_test_folder', 7, '852952723', 1);

-- Insert jobs
INSERT INTO qiita.job (data_type_id, job_status_id, command_id, options) VALUES (1, 1, 1, '{"option1":true,"option2":12,"option3":"FCM"}'), (1, 3, 2, 'options2'), (1, 1, 2, '{"option1":true,"option2":12,"option3":"FCM"}');
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
DATA
Binary file removed qiita_db/support_files/test_data/job/job2tar.tar
Binary file not shown.
3 changes: 2 additions & 1 deletion qiita_db/test/test_commands.py
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,8 @@ def test_import_preprocessed_data(self):
initial_ppd_count = get_count('qiita.preprocessed_data')
initial_fp_count = get_count('qiita.filepath')
ppd = load_preprocessed_data_from_cmd(
1, self.tmpdir, 'tar', 'preprocessed_sequence_illumina_params',
1, self.tmpdir, 'preprocessed_sequences',
'preprocessed_sequence_illumina_params',
1, False)
self.files_to_remove.append(
join(self.db_test_ppd_dir,
Expand Down
74 changes: 29 additions & 45 deletions qiita_db/test/test_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,16 +7,16 @@
# -----------------------------------------------------------------------------

from unittest import TestCase, main
from os import remove, makedirs
from os.path import exists, join
from os import remove, close
from os.path import exists, join, basename
from shutil import rmtree
from datetime import datetime
from tempfile import mkdtemp, mkstemp

from qiita_core.util import qiita_test_checker
from qiita_db.job import Job
from qiita_db.util import get_db_files_base_dir, get_work_base_dir
from qiita_db.analysis import Analysis
from qiita_db.exceptions import QiitaDBDuplicateError


@qiita_test_checker()
Expand Down Expand Up @@ -107,29 +107,14 @@ def test_retrieve_options(self):
'option3': 'FCM'})

def test_retrieve_results(self):
obs = self.job.results
self._delete_path = obs
self.assertEqual(self.job.results, [join("job", "1_job_result.txt")])

self.assertEqual(self.job.results, [join(get_work_base_dir(),
"job1result.txt")])
# make sure files copied correctly
self.assertTrue(exists(join(get_work_base_dir(), "job1result.txt")))
def test_retrieve_results_empty(self):
new = Job.create("18S", "Beta Diversity", self.options, Analysis(1))
self.assertEqual(new.results, [])

def test_retrieve_results_blank(self):
new = Job.create("18S", "Beta Diversity",
self.options, Analysis(1))
obs = new.results
self._delete_path = obs
self.assertEqual(obs, [])

def test_retrieve_results_tar(self):
obs = Job(2).results
self._delete_dir = obs
self.assertEqual(obs, [join(get_work_base_dir(), "test_folder")])
# make sure files copied correctly
self.assertTrue(exists(join(get_work_base_dir(), "test_folder")))
self.assertTrue(exists(join(get_work_base_dir(),
"test_folder/testfile.txt")))
def test_retrieve_results_dir(self):
self.assertEqual(Job(2).results, [join("job", "2_test_folder")])

def test_set_error(self):
timestamp = datetime(2014, 6, 13, 14, 19, 25)
Expand Down Expand Up @@ -159,34 +144,33 @@ def test_add_results(self):
"1_placeholder.txt")))

# make sure files attached to job properly
obs = self.conn_handler.execute_fetchall("SELECT * FROM "
"qiita.job_results_filepath "
"WHERE job_id = 1")
obs = self.conn_handler.execute_fetchall(
"SELECT * FROM qiita.job_results_filepath WHERE job_id = 1")

self.assertEqual(obs, [[1, 8], [1, 10]])

def test_add_results_tar(self):
# make test directory to tar, inclluding internal file
basedir = join(get_work_base_dir(), "tar_folder")
self._delete_dir = [basedir]
self._delete_path = [join(get_db_files_base_dir(), "job",
"1_tar_folder.tar")]
makedirs(basedir)
with open(join(basedir, "tar_data.txt"), 'w'):
pass
def test_add_results_dir(self):
# Create a test directory
test_dir = mkdtemp(dir=get_work_base_dir())
self._delete_dir.append(test_dir)
fd, test_file = mkstemp(dir=test_dir, suffix='.txt')
close(fd)
with open(test_file, "w") as f:
f.write('\n')
self._delete_path.append(test_file)

# add folder to job
self.job.add_results([(basedir, 7)])
# make sure tar file copied correctly
self.assertTrue(exists(join(get_db_files_base_dir(), "job",
"1_tar_folder.tar")))
self.job.add_results([(test_dir, 7)])

# make sure temp tar files cleaned up properly
self.assertFalse(exists("/tmp/1_tar_folder.tar"))
# check that the directory was copied correctly
db_path = join(get_db_files_base_dir(), "job",
"1_%s" % basename(test_dir))
self._delete_dir.append(db_path)
self.assertTrue(exists(db_path))

# make sure files attached to job properly
obs = self.conn_handler.execute_fetchall("SELECT * FROM "
"qiita.job_results_filepath "
"WHERE job_id = 1")
obs = self.conn_handler.execute_fetchall(
"SELECT * FROM qiita.job_results_filepath WHERE job_id = 1")
self.assertEqual(obs, [[1, 8], [1, 10]])


Expand Down
4 changes: 2 additions & 2 deletions qiita_db/test/test_util.py
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ def test_exists_dynamic_table(self):

def test_convert_to_id(self):
"""Tests that ids are returned correctly"""
self.assertEqual(convert_to_id("tar", "filepath_type"), 7)
self.assertEqual(convert_to_id("directory", "filepath_type"), 7)

def test_convert_to_id_bad_value(self):
"""Tests that ids are returned correctly"""
Expand Down Expand Up @@ -129,7 +129,7 @@ def test_get_filepath_types(self):
obs = get_filepath_types()
exp = {'raw_sequences': 1, 'raw_barcodes': 2, 'raw_spectra': 3,
'preprocessed_sequences': 4, 'preprocessed_sequences_qual': 5,
'biom': 6, 'tar': 7, 'plain_text': 8}
'biom': 6, 'directory': 7, 'plain_text': 8}
self.assertEqual(obs, exp)

obs = get_filepath_types(key='filepath_type_id')
Expand Down
42 changes: 28 additions & 14 deletions qiita_db/util.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,9 @@
from binascii import crc32
from bcrypt import hashpw, gensalt
from functools import partial
from os.path import join, basename
from shutil import copy
from os.path import join, basename, isdir
from os import walk
from shutil import copy, copytree

from qiita_core.exceptions import IncompetentQiitaDeveloperError
from .exceptions import QiitaDBColumnError
Expand Down Expand Up @@ -321,27 +322,36 @@ def get_work_base_dir(conn_handler=None):
"SELECT base_work_dir FROM settings")[0]


def compute_checksum(filepath):
r"""Returns the checksum of the file pointed by filepath
def compute_checksum(path):
r"""Returns the checksum of the file pointed by path
Parameters
----------
filepath : str
The path to the file
path : str
The path to compute the checksum
Returns
-------
int
The file checksum
"""
crc = None
with open(filepath, "Ub") as f:
# Go line by line so we don't need to load the entire file in memory
for line in f:
if crc is None:
crc = crc32(line)
else:
crc = crc32(line, crc)
filepaths = []
if isdir(path):
for name, dirs, files in walk(path):
join_f = partial(join, name)
filepaths.extend(list(map(join_f, files)))
else:
filepaths.append(path)

for fp in filepaths:
with open(fp, "Ub") as f:
# Go line by line so we don't need to load the entire file
for line in f:
if crc is None:
crc = crc32(line)
else:
crc = crc32(line, crc)
# We need the & 0xffffffff in order to get the same numeric value across
# all python versions and platforms
return crc & 0xffffffff
Expand Down Expand Up @@ -382,7 +392,11 @@ def insert_filepaths(filepaths, obj_id, table, filepath_table, conn_handler):
for path, id in filepaths]
# Copy the original files to the controlled DB directory
for old_fp, new_fp in zip(filepaths, new_filepaths):
copy(old_fp[0], new_fp[0])
# 7 means a directory, so we need to actually copy the dir
if old_fp[1] == 7:
copytree(old_fp[0], new_fp[0])
else:
copy(old_fp[0], new_fp[0])

paths_w_checksum = [(path, id, compute_checksum(path))
for path, id in new_filepaths]
Expand Down
3 changes: 2 additions & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,8 @@
'support_files/test_data/processed_data/*',
'support_files/test_data/raw_data/*',
'support_files/test_data/reference/*',
'support_files/test_data/job/*',
'support_files/test_data/job/*.txt',
'support_files/test_data/job/2_test_foler/*',
'support_files/work_data/*']},
scripts=glob('scripts/*'),
extras_require={'test': ["nose >= 0.10.1", "pep8"],
Expand Down

0 comments on commit d05f4de

Please sign in to comment.