Skip to content

Commit

Permalink
Merge pull request #90 from johnlees/sketchlib140
Browse files Browse the repository at this point in the history
Update to pp-sketchlib v1.4.0
  • Loading branch information
johnlees committed Jul 3, 2020
2 parents e9c8c26 + f867da4 commit 58bef9e
Show file tree
Hide file tree
Showing 17 changed files with 196 additions and 348 deletions.
2 changes: 1 addition & 1 deletion PopPUNK/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,4 +3,4 @@

'''PopPUNK (POPulation Partitioning Using Nucleotide Kmers)'''

__version__ = '2.1.0'
__version__ = '2.1.1'
109 changes: 63 additions & 46 deletions PopPUNK/__main__.py

Large diffs are not rendered by default.

61 changes: 33 additions & 28 deletions PopPUNK/lineage_clustering.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@
sys.exit(0)
from functools import partial

import pp_sketchlib

# import poppunk package
from .plot import writeClusterCsv

Expand All @@ -32,13 +34,13 @@
def get_chunk_ranges(N, nb):
""" Calculates boundaries for dividing distances array
into chunks for parallelisation.
Args:
N (int)
Number of rows in array
nb (int)
Number of blocks into which to divide array.
Returns:
range_sizes (list of tuples)
Limits of blocks for dividing array.
Expand All @@ -53,13 +55,13 @@ def get_chunk_ranges(N, nb):
def rank_distance_matrix(bounds, distances = None):
""" Ranks distances between isolates for each index (row)
isolate.
Args:
bounds (2-tuple)
Range of rows to process in this thread.
distances (ndarray in shared memory)
Shared memory object storing pairwise distances.
Returns:
ranks (numpy ndarray)
Ranks of distances for each row.
Expand All @@ -73,7 +75,7 @@ def rank_distance_matrix(bounds, distances = None):

def get_nearest_neighbours(rank, isolates = None, ranks = None):
""" Identifies sets of nearest neighbours for each isolate.
Args:
rank (int)
Rank used in analysis.
Expand All @@ -82,7 +84,7 @@ def get_nearest_neighbours(rank, isolates = None, ranks = None):
ranks (ndarray in shared memory)
Shared memory object pointing to ndarray of
ranked pairwise distances.
Returns:
nn (default dict of frozensets)
Dict indexed by isolates, values are a
Expand All @@ -102,18 +104,18 @@ def get_nearest_neighbours(rank, isolates = None, ranks = None):
nn[i] = neighbours
# return dict
return nn


def pick_seed_isolate(G, distances = None):
""" Identifies seed isolate from the closest pair of
unclustered isolates.
Args:
G (network)
Network with one node per isolate.
distances (ndarray in shared memory)
Pairwise distances between isolates.
Returns:
seed_isolate (int)
Index of isolate selected as seed.
Expand Down Expand Up @@ -148,7 +150,7 @@ def get_lineage(G, neighbours, seed_isolate, lineage_index):
Index of isolate selected as seed.
lineage_index (int)
Label of current lineage.
Returns:
G (network)
Network modified with new edges.
Expand Down Expand Up @@ -177,10 +179,12 @@ def get_lineage(G, neighbours, seed_isolate, lineage_index):
# return final clustering
return G

def cluster_into_lineages(distMat, rank_list = None, output = None, isolate_list = None, qlist = None, existing_scheme = None, use_accessory = False, num_processes = 1):
def cluster_into_lineages(distMat, rank_list = None, output = None,
isolate_list = None, qlist = None, existing_scheme = None,
use_accessory = False, num_processes = 1):
""" Clusters isolates into lineages based on their
relative distances.
Args:
distMat (np.array)
n x 2 array of core and accessory distances for n samples.
Expand All @@ -202,12 +206,12 @@ def cluster_into_lineages(distMat, rank_list = None, output = None, isolate_list
Option to use accessory distances rather than core distances.
num_processes (int)
Number of CPUs to use for calculations.
Returns:
overall_lineages (nested dict)
Dict for each rank listing the lineage of each isolate.
"""

# data structures
lineage_clustering = defaultdict(dict)
overall_lineage_seeds = defaultdict(dict)
Expand All @@ -217,7 +221,8 @@ def cluster_into_lineages(distMat, rank_list = None, output = None, isolate_list
lineage_clustering, overall_lineage_seeds, rank_list = pickle.load(pickle_file)

# generate square distance matrix
seqLabels, coreMat, accMat = update_distance_matrices(isolate_list, distMat)
seqLabels, coreMat, accMat = \
update_distance_matrices(isolate_list, distMat, threads = num_processes)
if use_accessory:
distances = accMat
else:
Expand All @@ -227,19 +232,19 @@ def cluster_into_lineages(distMat, rank_list = None, output = None, isolate_list
except:
sys.stderr.write('Isolates in wrong order?')
exit(1)

# list indices and set self-self to Inf
isolate_indices = [n for n,i in enumerate(isolate_list)]
for i in isolate_indices:
distances[i,i] = np.Inf

# get ranks of distances per row
chunk_boundaries = get_chunk_ranges(distances.shape[0], num_processes)
with SharedMemoryManager() as smm:

# share isolate list
isolate_list_shared = smm.ShareableList(isolate_indices)

# create shared memory object for distances
distances_raw = smm.SharedMemory(size = distances.nbytes)
distances_shared_array = np.ndarray(distances.shape, dtype = distances.dtype, buffer = distances_raw.buf)
Expand All @@ -251,14 +256,14 @@ def cluster_into_lineages(distMat, rank_list = None, output = None, isolate_list
ranked_array = pool.map(partial(rank_distance_matrix,
distances = distances_shared_array),
chunk_boundaries)

# concatenate ranks into shared memory
distance_ranks = np.concatenate(ranked_array)
distance_ranks_raw = smm.SharedMemory(size = distance_ranks.nbytes)
distance_ranks_shared_array = np.ndarray(distance_ranks.shape, dtype = distance_ranks.dtype, buffer = distance_ranks_raw.buf)
distance_ranks_shared_array[:] = distance_ranks[:]
distance_ranks_shared_array = NumpyShared(name = distance_ranks_raw.name, shape = distance_ranks.shape, dtype = distance_ranks.dtype)

# parallelise neighbour identification for each rank
with Pool(processes = num_processes) as pool:
results = pool.map(partial(run_clustering_for_rank,
Expand All @@ -267,7 +272,7 @@ def cluster_into_lineages(distMat, rank_list = None, output = None, isolate_list
isolates = isolate_list_shared,
previous_seeds = overall_lineage_seeds),
rank_list)

# extract results from multiprocessing pool
for n,result in enumerate(results):
rank = rank_list[n]
Expand All @@ -276,7 +281,7 @@ def cluster_into_lineages(distMat, rank_list = None, output = None, isolate_list
# store output
with open(output + "/" + output + '_lineages.pkl', 'wb') as pickle_file:
pickle.dump([lineage_clustering, overall_lineage_seeds, rank_list], pickle_file)

# process multirank lineages
overall_lineages = {}
overall_lineages = {'Rank_' + str(rank):{} for rank in rank_list}
Expand All @@ -290,7 +295,7 @@ def cluster_into_lineages(distMat, rank_list = None, output = None, isolate_list
else:
overall_lineage = overall_lineage + '-' + str(lineage_clustering[rank][index])
overall_lineages['overall'][isolate] = overall_lineage

# print output as CSV
writeClusterCsv(output + "/" + output + '_lineages.csv',
isolate_list,
Expand Down Expand Up @@ -319,7 +324,7 @@ def run_clustering_for_rank(rank, distances_input = None, distance_ranks_input =
Should be included within rlist.
use_existing (bool)
Whether to extend a previously generated analysis or not.
Returns:
lineage_clustering (dict)
Assignment of each isolate to a cluster.
Expand All @@ -328,7 +333,7 @@ def run_clustering_for_rank(rank, distances_input = None, distance_ranks_input =
neighbours (nested dict)
Neighbour relationships between isolates for R.
"""

# load shared memory objects
distances_shm = shared_memory.SharedMemory(name = distances_input.name)
distances = np.ndarray(distances_input.shape, dtype = distances_input.dtype, buffer = distances_shm.buf)
Expand All @@ -346,12 +351,12 @@ def run_clustering_for_rank(rank, distances_input = None, distance_ranks_input =
G = nx.Graph()
G.add_nodes_from(isolate_indices)
G.nodes.data('lineage', default = 0)

# identify nearest neighbours
nn = get_nearest_neighbours(rank,
ranks = distance_ranks_input,
isolates = isolate_list)

# iteratively identify lineages
lineage_index = 1
while nx.number_of_isolates(G) > 0:
Expand Down
15 changes: 9 additions & 6 deletions PopPUNK/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@
from scipy.spatial.distance import euclidean
from scipy import stats

import pp_sketchlib

from .plot import plot_scatter

# BGMM
Expand All @@ -36,7 +38,6 @@
# refine
from .refine import refineFit
from .refine import likelihoodBoundary
from .refine import withinBoundary
from .refine import readManualStart
from .plot import plot_refined_results

Expand Down Expand Up @@ -717,8 +718,8 @@ def plot(self, X, y=None):
self.outPrefix + "/" + os.path.basename(self.outPrefix) + "_refined_fit")


def assign(self, X, slope=None):
'''Assign the clustering of new samples using :func:`~PopPUNK.refine.withinBoundary`
def assign(self, X, slope=None, cpus=1):
'''Assign the clustering of new samples
Args:
X (numpy.array)
Expand All @@ -728,6 +729,8 @@ def assign(self, X, slope=None):
Set to 0 for a vertical line, 1 for a horizontal line, or
2 to use a slope
cpus (int)
Number of threads to use
Returns:
y (numpy.array)
Cluster assignments by samples
Expand All @@ -736,11 +739,11 @@ def assign(self, X, slope=None):
raise RuntimeError("Trying to assign using an unfitted model")
else:
if slope == 2 or (slope == None and self.slope == 2):
y = withinBoundary(X/self.scale, self.optimal_x, self.optimal_y)
y = pp_sketchlib.assignThreshold(X/self.scale, 2, self.optimal_x, self.optimal_y, cpus)
elif slope == 0 or (slope == None and self.slope == 0):
y = withinBoundary(X/self.scale, self.core_boundary, 0, slope=0)
y = pp_sketchlib.assignThreshold(X/self.scale, 0, self.core_boundary, 0, cpus)
elif slope == 1 or (slope == None and self.slope == 1):
y = withinBoundary(X/self.scale, 0, self.accessory_boundary, slope=1)
y = pp_sketchlib.assignThreshold(X/self.scale, 1, 0, self.accessory_boundary, cpus)

return y

Expand Down
5 changes: 2 additions & 3 deletions PopPUNK/network.py
Original file line number Diff line number Diff line change
Expand Up @@ -390,15 +390,15 @@ def addQueryToNetwork(dbFuncs, rlist, qfile, G, kmers, estimated_length,

constructDatabase(tmpFile, kmers, sketchSize, tmpDirName, estimated_length, True, threads, False)
qlist1, qlist2, distMat = queryDatabase(rNames = list(unassigned),
qNames = list(unassigned),
qNames = list(unassigned),
dbPrefix = tmpDirName,
queryPrefix = tmpDirName,
klist = kmers,
self = True,
number_plot_fits = 0,
threads = threads)
queryAssignation = model.assign(distMat)

# identify any links between queries and store in the same links dict
# links dict now contains lists of links both to original database and new queries
for assignment, (query1, query2) in zip(queryAssignation, iterDistRows(qlist1, qlist2, self=True)):
Expand Down Expand Up @@ -459,7 +459,6 @@ def printClusters(G, outPrefix = "_clusters.csv", oldClusterFile = None,
if oldClusterFile != None:
oldAllClusters = readIsolateTypeFromCsv(oldClusterFile, mode = 'external', return_dict = False)
oldClusters = oldAllClusters[list(oldAllClusters.keys())[0]]
print('oldCluster is ' + str(oldClusters))
new_id = len(oldClusters.keys()) + 1 # 1-indexed
while new_id in oldClusters:
new_id += 1 # in case clusters have been merged
Expand Down

0 comments on commit 58bef9e

Please sign in to comment.