Skip to content

Commit

Permalink
Merge c6eead7 into 1ffd66a
Browse files Browse the repository at this point in the history
  • Loading branch information
squirrelo committed Aug 7, 2014
2 parents 1ffd66a + c6eead7 commit 44cf07d
Show file tree
Hide file tree
Showing 3 changed files with 169 additions and 29 deletions.
96 changes: 84 additions & 12 deletions qiita_db/job.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,13 @@
from __future__ import division
from json import dumps, loads
from os.path import join
from os import remove
from shutil import rmtree
from functools import partial
from collections import defaultdict

from future.builtins import zip
from future.utils import viewitems, viewkeys

from qiita_core.exceptions import IncompetentQiitaDeveloperError
from .base import QiitaStatusObject
Expand Down Expand Up @@ -81,7 +84,7 @@ def get_commands():
return Command.create_list()

@classmethod
def exists(cls, datatype, command, options):
def exists(cls, datatype, command, options, analysis):
"""Checks if the given job already exists
Parameters
Expand All @@ -92,21 +95,90 @@ def exists(cls, datatype, command, options):
The name of the command run on the data
options : dict
Options for the command in the format {option: value}
analysis : Analysis object
Analysis the job will be added to if it doesn't exist
Returns
-------
bool
Whether the job exists or not
"""
conn_handler = SQLConnectionHandler()
# check passed arguments and grab analyses for matching jobs
datatype_id = convert_to_id(datatype, "data_type", conn_handler)
sql = "SELECT command_id FROM qiita.command WHERE name = %s"
command_id = conn_handler.execute_fetchone(sql, (command, ))[0]
opts_json = dumps(options, sort_keys=True, separators=(',', ':'))
sql = ("SELECT EXISTS(SELECT * FROM qiita.{0} WHERE data_type_id = %s"
" AND command_id = %s AND options = %s)".format(cls._table))
return conn_handler.execute_fetchone(
sql, (datatype_id, command_id, opts_json))[0]
sql = ("SELECT DISTINCT aj.analysis_id FROM qiita.analysis_job aj "
"JOIN qiita.{0} j ON aj.job_id = j.job_id WHERE j.data_type_id"
" = %s AND j.command_id = %s "
"AND j.options = %s".format(cls._table))
analyses = conn_handler.execute_fetchall(
sql, (datatype_id, command_id, opts_json))
if not analyses:
return False
analyses = [x[0] for x in analyses]
if analysis.id in analyses:
analyses.remove(analysis.id)

# check data used to create jobid
sql = ("SELECT processed_data_id, array_agg(sample_id) FROM "
"qiita.analysis_sample WHERE analysis_id = %s "
"GROUP BY processed_data_id")
samples = dict(conn_handler.execute_fetchall(sql, [analysis.id]))
# turn to sets for fast and easy comparisons later
for proc_data in viewkeys(samples):
samples[proc_data] = set(samples[proc_data])
for aid in analyses:
# grab the processed data and samples for the matching analysis
comp_samples = dict(conn_handler.execute_fetchall(sql, [aid]))
same = True
# check if same processed_data_ids
for key in viewkeys(samples):
if key not in comp_samples:
same = False
break
if not same:
continue
# check if same samples in both processed_data_ids
for proc_data, samps in viewitems(samples):
if samps.symmetric_difference(comp_samples[proc_data]):
same = False
break
if same:
return True
return False

@classmethod
def delete(cls, jobid):
conn_handler = SQLConnectionHandler()
# store filepath info for later use
sql = ("SELECT f.filepath, f.filepath_id FROM qiita.filepath f JOIN "
"qiita.job_results_filepath jf ON jf.filepath_id = "
"f.filepath_id WHERE jf.job_id = %s")
filepaths = conn_handler.execute_fetchall(sql, [jobid])

# remove fiepath links in DB
conn_handler.execute("DELETE FROM qiita.job_results_filepath WHERE "
"job_id = %s", [jobid])
sql = "DELETE FROM qiita.filepath WHERE"
for x in range(len(filepaths)):
sql = ' '.join((sql, "filepath_id = %s"))
conn_handler.execute(sql, [fp[1] for fp in filepaths])

# remove job
conn_handler.execute("DELETE FROM qiita.analysis_job WHERE "
"job_id = %s", [jobid])
conn_handler.execute("DELETE FROM qiita.job WHERE job_id = %s",
[jobid])

# remove files/folders attached to job
basedir = get_db_files_base_dir()
for fp in filepaths:
try:
rmtree(join(basedir, "job", fp[0]))
except OSError:
remove(join(basedir, "job", fp[0]))

@classmethod
def create(cls, datatype, command, analysis):
Expand All @@ -127,10 +199,10 @@ def create(cls, datatype, command, analysis):
The newly created job
"""
# EXISTS IGNORED FOR DEMO, ISSUE #83
# if cls.exists(datatype, command, options):
# if cls.exists(datatype, command, options, analysis):
# raise QiitaDBDuplicateError(
# "Job", "datatype: %s, command: %s, options: %s"
# % (datatype, command, options))
# "Job", "datatype: %s, command: %s, options: %s, analysis: %s"
# % (datatype, command, options, analysis.id))

# Get the datatype and command ids from the strings
conn_handler = SQLConnectionHandler()
Expand Down Expand Up @@ -425,7 +497,7 @@ def __init__(self, name, command, input_opts, required_opts,
"""
self.name = name
self.command = command
self.input_opts = dumps(input_opts)
self.required_opts = dumps(required_opts)
self.optional_opts = dumps(optional_opts)
self.output_opts = dumps(output_opts)
self.input_opts = loads(input_opts)
self.required_opts = loads(required_opts)
self.optional_opts = loads(optional_opts)
self.output_opts = loads(output_opts)
2 changes: 1 addition & 1 deletion qiita_db/support_files/populate_test_db.sql
Original file line number Diff line number Diff line change
Expand Up @@ -327,7 +327,7 @@ INSERT INTO qiita.processed_filepath (processed_data_id, filepath_id) VALUES (1,
INSERT INTO qiita.filepath (filepath, filepath_type_id, checksum, checksum_algorithm_id) VALUES ('1_job_result.txt', 8, '852952723', 1), ('2_test_folder', 7, '852952723', 1);

-- Insert jobs
INSERT INTO qiita.job (data_type_id, job_status_id, command_id, options) VALUES (1, 1, 1, '{"--otu_table_fp":1}'), (1, 3, 2, '{"--otu_table_fp":1,"--mapping_fp":1}'), (1, 1, 2, '{"--otu_table_fp":1,"--mapping_fp":1}');
INSERT INTO qiita.job (data_type_id, job_status_id, command_id, options) VALUES (1, 1, 1, '{"--otu_table_fp":1}'), (1, 3, 2, '{"--mapping_fp":1,"--otu_table_fp":1}'), (1, 1, 2, '{"--mapping_fp":1,"--otu_table_fp":1}');

-- Insert Analysis
INSERT INTO qiita.analysis (email, name, description, analysis_status_id, pmid) VALUES ('test@foo.bar', 'SomeAnalysis', 'A test analysis', 1, '121112'), ('test@foo.bar', 'SomeSecondAnalysis', 'Another test analysis', 1, '22221112');
Expand Down
100 changes: 84 additions & 16 deletions qiita_db/test/test_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,16 +7,17 @@
# -----------------------------------------------------------------------------

from unittest import TestCase, main
from os import remove
from os.path import join
from os import remove, mkdir
from os.path import join, exists
from shutil import rmtree
from datetime import datetime

from qiita_core.util import qiita_test_checker
from qiita_db.job import Job, Command
from qiita_db.util import get_db_files_base_dir
from qiita_db.analysis import Analysis
from qiita_db.exceptions import QiitaDBDuplicateError, QiitaDBStatusError
from qiita_db.exceptions import (QiitaDBDuplicateError, QiitaDBStatusError,
QiitaDBUnknownIDError)
from qiita_db.logger import LogEntry


Expand All @@ -38,19 +39,31 @@ def tearDown(self):
for item in self._delete_dir:
rmtree(item)

# EXISTS IGNORED FOR DEMO, ISSUE #83
# def test_exists(self):
# """tests that existing job returns true"""
# self.assertTrue(Job.exists("16S", "Summarize Taxa",
# {'option1': True, 'option2': 12,
# 'option3': 'FCM'}))

# def test_exists_not_there(self):
# """tests that non-existant job returns false"""
# self.assertFalse(Job.exists("Metabolomic",
# "Summarize Taxa",
# {'option1': "Nope", 'option2': 10,
# 'option3': 'FCM'}))
def test_exists(self):
"""tests that existing job returns true"""
# need to insert matching sample data into analysis 2
self.conn_handler.execute(
"DELETE FROM qiita.analysis_sample WHERE analysis_id = 2")
self.conn_handler.execute(
"INSERT INTO qiita.analysis_sample (analysis_id, "
"processed_data_id, sample_id) VALUES (2,1,'SKB8.640193'), "
"(2,1,'SKD8.640184'), (2,1,'SKB7.640196'), (2,1,'SKM9.640192'),"
"(2,1,'SKM4.640180')")
self.assertTrue(Job.exists("16S", "Beta Diversity",
{"--otu_table_fp": 1,
"--mapping_fp": 1}, Analysis(1)))

def test_exists_noexist_options(self):
"""tests that non-existant job with bad options returns false"""
self.assertFalse(Job.exists("16S", "Beta Diversity",
{"--otu_table_fp": 1,
"--mapping_fp": 27}, Analysis(1)))

def test_exists_noexist_samples(self):
"""tests that non-existant job with bad samples returns false"""
self.assertFalse(Job.exists("16S", "Beta Diversity",
{"--otu_table_fp": 1,
"--mapping_fp": 1}, Analysis(1)))

def test_get_commands(self):
exp = [
Expand All @@ -71,6 +84,61 @@ def test_get_commands(self):
]
self.assertEqual(Job.get_commands(), exp)

def test_delete_files(self):
try:
Job.delete(1)
with self.assertRaises(QiitaDBUnknownIDError):
Job(1)

obs = self.conn_handler.execute_fetchall(
"SELECT * FROM qiita.filepath WHERE filepath_id = 8 OR "
"filepath_id = 10")
self.assertEqual(obs, [])

obs = self.conn_handler.execute_fetchall(
"SELECT * FROM qiita.job_results_filepath WHERE job_id = 1")
self.assertEqual(obs, [])

obs = self.conn_handler.execute_fetchall(
"SELECT * FROM qiita.analysis_job WHERE job_id = 1")
self.assertEqual(obs, [])

self.assertFalse(exists(join(get_db_files_base_dir(),
"job/1_job_result.txt")))
finally:
if not exists(join(get_db_files_base_dir(),
"job/1_job_result.txt")):
with open(join(get_db_files_base_dir(),
"job/1_job_result.txt"), 'w') as f:
f.write("job1result.txt")

def test_delete_folders(self):
try:
Job.delete(2)
with self.assertRaises(QiitaDBUnknownIDError):
Job(2)

obs = self.conn_handler.execute_fetchall(
"SELECT * FROM qiita.filepath WHERE filepath_id = 9")
self.assertEqual(obs, [])

obs = self.conn_handler.execute_fetchall(
"SELECT * FROM qiita.job_results_filepath WHERE job_id = 2")
self.assertEqual(obs, [])

obs = self.conn_handler.execute_fetchall(
"SELECT * FROM qiita.analysis_job WHERE job_id = 2")
self.assertEqual(obs, [])

self.assertFalse(exists(join(get_db_files_base_dir(),
"job/2_test_folder")))
finally:
if not exists(join(get_db_files_base_dir(), "job/2_test_folder")):
mkdir(join(get_db_files_base_dir(), "job/2_test_folder"))
with open(join(get_db_files_base_dir(),
"job/2_test_folder/testfile.txt"), 'w') as f:
f.write("DATA")

def test_create(self):
"""Makes sure creation works as expected"""
# make first job
Expand Down

0 comments on commit 44cf07d

Please sign in to comment.