Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

init code refactoring #742

Merged
Merged
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
326 changes: 196 additions & 130 deletions wqflask/wqflask/correlation/show_corr_results.py
Expand Up @@ -74,6 +74,193 @@ def set_template_vars(start_vars, correlation_data):
return correlation_data


def apply_filters(trait, target_trait, target_dataset, **filters):
def __p_val_filter__(p_lower, p_upper):
return not (float(trait.get('corr_coefficient', 0.0)) >= p_lower and
float(trait.get('corr_coefficient', 0.0)) <= p_upper)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A more pythonic (and readable way) to write this is:

Suggested change
return not (float(trait.get('corr_coefficient', 0.0)) >= p_lower and
float(trait.get('corr_coefficient', 0.0)) <= p_upper)
return not (float(p_upper >= trait.get('corr_coefficient', 0.0)) >= p_lower)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Awesome


def __min_filter__(min_expr):
if (target_dataset['type'] in ["ProbeSet", "Publish"] and target_trait['mean']):
return (min_expr != None) and (float(target_trait['mean']) < min_expr)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Similar comment to above:

Suggested change
return (min_expr != None) and (float(target_trait['mean']) < min_expr)
return float(target_trait['mean']) > min_expr != None

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@BonfaceKilz the above example may lead to unintended results
for example

min_expr = None

return 2.4 < min_expr is not None

results to 
TypeError: '<' not supported between instances of 'float' and 'str'

and if you use bracket that means you will be comparing int to boolean


return False

def __location_filter__(location_type, location_chr,
min_location_mb, max_location_mb):

if target_dataset["type"] in ["ProbeSet", "'Geno"] and location_type == "gene":

return ((location_chr != None and (target_trait["chr"] != location_chr)
or
(min_location_mb != None) and (
float(target_trait['mb']) < min_location_mb)
or
max_location_mb != None) and
(float(target_trait['mb']) > float(max_location_mb)
))
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In this case too, you could chain operations together as demo'd above.

elif target_dataset["type"] in ["ProbeSet", "Publish"]:
return ((location_chr != None and (target_trait["lrs_chr"] != location_chr)
or
(min_location_mb != None) and (
float(target_trait['lrs_mb']) < float(min_location_mb))
or
(max_location_mb != None) and (
float(target_trait['lrs_mb']) > float(max_location_mb))
)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same sentiment as above.

)

return True

# check if one of the condition is not met i.e One is True

return (__p_val_filter__(
filters.get("p_range_lower"),
filters.get("p_range_upper")
)
or
(
__min_filter__(
filters.get("min_expr")
)
)
or
__location_filter__(
filters.get("location_type"),
filters.get("location_chr"),
filters.get("min_location_mb"),
filters.get("max_location_mb")


)
)


def get_user_filters(start_vars):
(min_expr, p_min, p_max) = (
get_float(start_vars, 'min_expr'),
get_float(start_vars, 'p_range_lower', -1.0),
get_float(start_vars, 'p_range_upper', 1.0)
)

if all(keys in start_vars for keys in ["loc_chr",
"min_loc_mb",
"max_location_mb"]):

location_chr = get_string(start_vars, "loc_chr")
min_location_mb = get_int(start_vars, "min_loc_mb")
max_location_mb = get_int(start_vars, "max_loc_mb")

else:
location_chr = min_location_mb = max_location_mb = None

return {

"min_expr": min_expr,
"p_range_lower": p_min,
"p_range_upper": p_max,
"location_chr": location_chr,
"location_type": start_vars['location_type'],
"min_location_mb": min_location_mb,
"max_location_mb": max_location_mb

}


def generate_table_metadata(all_traits, dataset_metadata, dataset_obj):

def __fetch_trait_data__(trait, dataset_obj):
target_trait_ob = create_trait(dataset=dataset_obj,
name=trait,
get_qtl_info=True)
return jsonable(target_trait_ob, dataset_obj)

metadata = [__fetch_trait_data__(trait, dataset_obj) for
trait in (all_traits ^ dataset_metadata.keys())]
return (dataset_metadata | ({trait["name"]: trait for trait in metadata}))


def populate_table(dataset_metadata, target_dataset, this_dataset, corr_results, filters):

def __populate_trait__(idx, trait):

trait_name = list(trait.keys())[0]
target_trait = dataset_metadata.get(trait_name)
trait = trait[trait_name]
if not apply_filters(trait, target_trait, target_dataset, **filters):
results_dict = {}
results_dict['index'] = idx + 1 #
results_dict['trait_id'] = target_trait['name']
results_dict['dataset'] = target_dataset['name']
results_dict['hmac'] = hmac.data_hmac(
'{}:{}'.format(target_trait['name'], target_dataset['name']))
results_dict['sample_r'] = f"{float(trait.get('corr_coefficient',0.0)):.3f}"
results_dict['num_overlap'] = trait.get('num_overlap', 0)
results_dict['sample_p'] = f"{float(trait.get('p_value',0)):.3e}"
if target_dataset['type'] == "ProbeSet":
results_dict['symbol'] = target_trait['symbol']
results_dict['description'] = "N/A"
results_dict['location'] = target_trait['location']
results_dict['mean'] = "N/A"
results_dict['additive'] = "N/A"
if target_trait['description']:
results_dict['description'] = target_trait['description']
if target_trait['mean']:
results_dict['mean'] = f"{float(target_trait['mean']):.3f}"
try:
results_dict['lod_score'] = f"{float(target_trait['lrs_score']) / 4.61:.1f}"
except:
results_dict['lod_score'] = "N/A"
results_dict['lrs_location'] = target_trait['lrs_location']
if target_trait['additive']:
results_dict['additive'] = f"{float(target_trait['additive']):.3f}"
results_dict['lit_corr'] = "--"
results_dict['tissue_corr'] = "--"
results_dict['tissue_pvalue'] = "--"
if this_dataset['type'] == "ProbeSet":
if 'lit_corr' in trait:
results_dict['lit_corr'] = f"{float(trait['lit_corr']):.3f}"
if 'tissue_corr' in trait:
results_dict['tissue_corr'] = f"{float(trait['tissue_corr']):.3f}"
results_dict['tissue_pvalue'] = f"{float(trait['tissue_p_val']):.3e}"
elif target_dataset['type'] == "Publish":
results_dict['abbreviation_display'] = "N/A"
results_dict['description'] = "N/A"
results_dict['mean'] = "N/A"
results_dict['authors_display'] = "N/A"
results_dict['additive'] = "N/A"
results_dict['pubmed_link'] = "N/A"
results_dict['pubmed_text'] = target_trait["pubmed_text"]

if target_trait["abbreviation"]:
results_dict = target_trait['abbreviation']

if target_trait["description"] == target_trait['description']:
results_dict['description'] = target_trait['description']

if target_trait["mean"]:
results_dict['mean'] = f"{float(target_trait['mean']):.3f}"

if target_trait["authors"]:
authors_list = target_trait['authors'].split(',')
results_dict['authors_display'] = ", ".join(
authors_list[:6]) + ", et al." if len(authors_list) > 6 else target_trait['authors']

if "pubmed_id" in target_trait:
results_dict['pubmed_link'] = target_trait['pubmed_link']
results_dict['pubmed_text'] = target_trait['pubmed_text']
try:
results_dict["lod_score"] = f"{float(target_trait['lrs_score']) / 4.61:.1f}"
except ValueError:
results_dict['lod_score'] = "N/A"
else:
results_dict['lrs_location'] = target_trait['lrs_location']

return results_dict

return [__populate_trait__(idx, trait)
for (idx, trait) in enumerate(corr_results)]


def correlation_json_for_table(start_vars, correlation_data, this_trait, this_dataset, target_dataset_ob):
"""Return JSON data for use with the DataTable in the correlation result page

Expand All @@ -86,139 +273,18 @@ def correlation_json_for_table(start_vars, correlation_data, this_trait, this_da
this_trait = correlation_data['this_trait']
this_dataset = correlation_data['this_dataset']
target_dataset = target_dataset_ob.as_dict()

corr_results = correlation_data['correlation_results']
results_list = []

new_traits_metadata = {}

dataset_metadata = correlation_data["traits_metadata"]

min_expr = get_float(start_vars, 'min_expr')
p_range_lower = get_float(start_vars, 'p_range_lower', -1.0)
p_range_upper = get_float(start_vars, 'p_range_upper', 1.0)

if ('loc_chr' in start_vars and
'min_loc_mb' in start_vars and
'max_loc_mb' in start_vars):

location_chr = get_string(start_vars, 'loc_chr')
min_location_mb = get_int(start_vars, 'min_loc_mb')
max_location_mb = get_int(start_vars, 'max_loc_mb')
else:
location_chr = min_location_mb = max_location_mb = None

for i, trait_dict in enumerate(corr_results):
trait_name = list(trait_dict.keys())[0]
trait = trait_dict[trait_name]

target_trait = dataset_metadata.get(trait_name)
if target_trait is None:
target_trait_ob = create_trait(dataset=target_dataset_ob,
name=trait_name,
get_qtl_info=True)
target_trait = jsonable(target_trait_ob, target_dataset_ob)
new_traits_metadata[trait_name] = target_trait

if (float(trait.get('corr_coefficient',0.0)) >= p_range_lower and
float(trait.get('corr_coefficient',0.0)) <= p_range_upper):

if (target_dataset['type'] == "ProbeSet" or target_dataset['type'] == "Publish") and bool(target_trait['mean']):
if (min_expr != None) and (float(target_trait['mean']) < min_expr):
continue

if start_vars['location_type'] == "gene" and (target_dataset['type'] == "ProbeSet" or target_dataset['type'] == "Geno"):
if location_chr != None and (target_trait['chr'] != location_chr):
continue
if (min_location_mb != None) and (float(target_trait['mb']) < float(min_location_mb)):
continue
if (max_location_mb != None) and (float(target_trait['mb']) > float(max_location_mb)):
continue
elif target_dataset['type'] == "ProbeSet" or target_dataset['type'] == "Publish":
if location_chr != None and (target_trait['lrs_chr'] != location_chr):
continue
if (min_location_mb != None) and (float(target_trait['lrs_mb']) < float(min_location_mb)):
continue
if (max_location_mb != None) and (float(target_trait['lrs_mb']) > float(max_location_mb)):
continue
else:
continue
else:
continue

results_dict = {}
results_dict['index'] = i + 1
results_dict['trait_id'] = target_trait['name']
results_dict['dataset'] = target_dataset['name']
results_dict['hmac'] = hmac.data_hmac(
'{}:{}'.format(target_trait['name'], target_dataset['name']))
results_dict['sample_r'] = f"{float(trait.get('corr_coefficient',0.0)):.3f}"
results_dict['num_overlap'] = trait.get('num_overlap',0)
results_dict['sample_p'] = f"{float(trait.get('p_value',0)):.3e}"
if target_dataset['type'] == "ProbeSet":
results_dict['symbol'] = target_trait['symbol']
results_dict['description'] = "N/A"
results_dict['location'] = target_trait['location']
results_dict['mean'] = "N/A"
results_dict['additive'] = "N/A"
if bool(target_trait['description']):
results_dict['description'] = target_trait['description']
if bool(target_trait['mean']):
results_dict['mean'] = f"{float(target_trait['mean']):.3f}"
try:
results_dict['lod_score'] = f"{float(target_trait['lrs_score']) / 4.61:.1f}"
except:
results_dict['lod_score'] = "N/A"
results_dict['lrs_location'] = target_trait['lrs_location']
if bool(target_trait['additive']):
results_dict['additive'] = f"{float(target_trait['additive']):.3f}"
results_dict['lit_corr'] = "--"
results_dict['tissue_corr'] = "--"
results_dict['tissue_pvalue'] = "--"
if this_dataset['type'] == "ProbeSet":
if 'lit_corr' in trait:
results_dict['lit_corr'] = f"{float(trait['lit_corr']):.3f}"
if 'tissue_corr' in trait:
results_dict['tissue_corr'] = f"{float(trait['tissue_corr']):.3f}"
results_dict['tissue_pvalue'] = f"{float(trait['tissue_p_val']):.3e}"
elif target_dataset['type'] == "Publish":
results_dict['abbreviation_display'] = "N/A"
results_dict['description'] = "N/A"
results_dict['mean'] = "N/A"
results_dict['authors_display'] = "N/A"
results_dict['additive'] = "N/A"
results_dict['pubmed_link'] = "N/A"
results_dict['pubmed_text'] = target_trait["pubmed_text"]

if bool(target_trait['abbreviation']):
results_dict['abbreviation_display'] = target_trait['abbreviation']
if bool(target_trait['description']):
results_dict['description'] = target_trait['description']
if bool(target_trait['mean']):
results_dict['mean'] = f"{float(target_trait['mean']):.3f}"
if bool(target_trait['authors']):
authors_list = target_trait['authors'].split(',')
if len(authors_list) > 6:
results_dict['authors_display'] = ", ".join(
authors_list[:6]) + ", et al."
else:
results_dict['authors_display'] = target_trait['authors']
if 'pubmed_id' in target_trait:
results_dict['pubmed_link'] = target_trait['pubmed_link']
results_dict['pubmed_text'] = target_trait['pubmed_text']
try:
results_dict['lod_score'] = f"{float(target_trait['lrs_score']) / 4.61:.1f}"
except:
results_dict['lod_score'] = "N/A"
results_dict['lrs_location'] = target_trait['lrs_location']
if bool(target_trait['additive']):
results_dict['additive'] = f"{float(target_trait['additive']):.3f}"
else:
results_dict['location'] = target_trait['location']

results_list.append(results_dict)
dataset_metadata = generate_table_metadata({name for trait in corr_results
for (name, _val) in trait.items()},
correlation_data["traits_metadata"],
target_dataset_ob)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This nested "for" loop would be difficult to maintain. I'd advise using a more simpler, albeit verbose, loop instead.


return json.dumps(results_list)
results = populate_table(dataset_metadata,
target_dataset,
this_dataset, corr_results,
get_user_filters(start_vars))
return json.dumps([result for result in results if result])


def get_formatted_corr_type(corr_type, corr_method):
Expand Down