In [None]:
11111111111111111111111111111111111111111111111111111111111111111111111111111111 # 80 characters

In [3]:
%%file multiple_cluster_engine.py
import ipyparallel as ipp
from collections import defaultdict
import os
import psutil
import time
from datetime import datetime
from tqdm import tqdm
import copy
import itertools

import logging # logging can create duplicate entries if you don't reload logging
try:
    from importlib import reload # Python 3
except: # Python 2 reload is a builtin
    pass

class MultipleClusterEngine(object):
    def __init__(self, 
                 cluster_job_name, 
                 n_cpus_list, 
                 ram_limit_in_GB,
                 input_file_names,
                 output_parent_dir,
                 function_to_process,
                 function_kwargs_dict): # always put function args in a dictionary
        reload(logging)
        self.cluster_job_name = cluster_job_name
        self.n_cpus_list = n_cpus_list
        self.ram_limit_in_GB = ram_limit_in_GB
        self.output_parent_dir = output_parent_dir
        self.input_file_names = input_file_names
        self.function_to_process = lambda kwargs: function_to_process(**kwargs)
        self.function_kwargs_dict = function_kwargs_dict
        
        assert cluster_job_name, "Needs cluster name"
        assert len(n_cpus_list) > 0, "Needs the number of CPUs per cluster"
        assert os.path.isdir(self.output_parent_dir), "Output directory doesn't exist"
        assert len(self.input_file_names) > 0, "Need input files"

        # used by engine
        self.client_dict = {}
        self.load_balanced_view_dict = {}
        self.async_results_dict = defaultdict(list) # collects all the async_results
        self.file_to_cluster_order_dict = defaultdict(list) # remembers which file is sent to which cluster
        self.cluster_indexes = None
        self.logger_status = None
        self.logger_failure = None
        self.start_time = None
        self.end_time = None
        self.cluster_output_dir = None
        self.cluster_RAM_use_dict = {}
        self.cluster_pid_dict = {}

    def create_cluster_output_dir(self):
        subdirs = [name for name in os.listdir(self.output_parent_dir) if 
                   os.path.isdir(os.path.join(self.output_parent_dir, name))]
        existing_results_dir = []
        for subdir in subdirs:
            try:
                existing_results_dir.append(int(subdir.strip(self.cluster_job_name)))
            except ValueError:
                pass
        dir_index = max(existing_results_dir) + 1 if existing_results_dir else 0
        self.cluster_output_dir = os.path.join(self.output_parent_dir, 
                                               self.cluster_job_name + str(dir_index))
        os.makedirs(self.cluster_output_dir)
                
    def create_logger(self, logger_name, log_file):
        l = logging.getLogger(logger_name)
        fileHandler = logging.FileHandler(log_file)
        l.addHandler(fileHandler)
        l.setLevel(logging.INFO)
    
    def activate_logger(self):
        self.create_logger('status', os.path.join(self.cluster_output_dir, "status.log"))
        self.create_logger('failure', os.path.join(self.cluster_output_dir, "failure.log"))
        self.create_logger('ram_usage', os.path.join(self.cluster_output_dir, "ram_usage.log"))
        self.logger_status = logging.getLogger('status')
        self.logger_status.propagate = False
        self.logger_failure = logging.getLogger('failure')
        self.logger_failure.propagate = False
        self.logger_ram_usage = logging.getLogger('ram_usage')
        self.logger_ram_usage.propagate = False
        
    def profile_memory_for_cluster(self, cluster_id): # RAM in GB
        return sum(psutil.Process(pid).memory_info().rss for pid in self.cluster_pid_dict[cluster_id]) / 1e9
        
    def profile_memory_for_all_clusters(self):
        self.cluster_RAM_use_dict.clear()
        for jth_cluster in sorted(self.load_balanced_view_dict):
            self.cluster_RAM_use_dict[jth_cluster] = self.profile_memory_for_cluster(jth_cluster)           
            self.logger_ram_usage.info('{}: {}th cluster uses {} GB of RAM'.format(
                datetime.now(), jth_cluster, self.cluster_RAM_use_dict[jth_cluster]))
        self.logger_ram_usage.info('{}: All clusters uses {} GB of RAM'.format(
                datetime.now(), sum(self.cluster_RAM_use_dict.values())))
        
    def clear_memory_on_cluster(self, cluster_id):
        import gc
        self.client_dict[cluster_id][:].apply_async(gc.collect)
        
    def start_cluster(self, n_cpus, cluster_id):
        self.logger_status.info("\tAttempting to start cluster job "
                "{}'s {}th cluster with {} CPUs".format(self.cluster_job_name, cluster_id, n_cpus))
        os.system("ipcluster start --n={} --profile={}{} --daemonize".format(
            n_cpus, self.cluster_job_name, cluster_id)) # should deprecate to use a safer OS call

        attempt_ctr = 0 
        while attempt_ctr < 3: # Attempt to connect to client 3 times
            time.sleep(10) # hard coded
            try:
                client = ipp.Client(profile='{}{}'.format(self.cluster_job_name, cluster_id))
            except ipp.error.TimeoutError:
                attempt_ctr += 1
            else:
                self.cluster_pid_dict[cluster_id] = client[:].apply_async(os.getpid).get()
                self.logger_status.info('\t\tCPU processes ready for action: {}'.format(
                    self.cluster_pid_dict[cluster_id]))
                return client
            # if there is any other error other than TimeoutError, then the error will be raised
            
    def start_all_clusters(self):
        self.activate_logger()
        self.logger_status.info('Starting Multiple Cluster Engine')
        #self.logger_status.info('Attempting to start all clusters')
        for cluster_id, n_cpus in enumerate(self.n_cpus_list):
            self.client_dict[cluster_id] = self.start_cluster(n_cpus, cluster_id)
            self.load_balanced_view_dict[cluster_id] = self.client_dict[cluster_id].load_balanced_view()            
        self.start_time = datetime.now()
        self.logger_status.info('All clusters started at {}'.format(self.start_time))
        self.cluster_indexes = itertools.cycle(sorted(self.load_balanced_view_dict))
        
    def kill_cluster(self, cluster_id): # use better arguments
        self.logger_status.info('\tAttempting to kill {}{} with CPU processes: {}'.format(
            self.cluster_job_name, cluster_id, self.cluster_pid_dict[cluster_id]))
        self.load_balanced_view_dict.pop(cluster_id)
        # client.purge_everything()
        self.client_dict[cluster_id].close()
        self.client_dict.pop(cluster_id)
        os.system('ipcluster stop --profile={}{}'.format(self.cluster_job_name, cluster_id))
        self.logger_status.info('\t\tCluster successfully killed')
        self.cluster_indexes = itertools.cycle(sorted(self.load_balanced_view_dict))
        time.sleep(5) # hard-coded
        
    def kill_all_clusters(self):
        self.end_time = datetime.now()
        n_surviving_clusters = len(self.client_dict)
        self.logger_status.info('Killing all clusters')
        for cluster_id in sorted(self.client_dict):
            self.kill_cluster(cluster_id)
        self.logger_status.info('All clusters have been killed')
        self.logger_status.info('Multiple Cluster Engine shut down at {}'.format(self.end_time))
        self.logger_status.info('Processed {} files using {} surviving clusters in {} minutes'.format(
            len(self.input_file_names), n_surviving_clusters, (self.end_time - self.start_time).seconds / 60.0))
        logging.shutdown()
        
    def early_kill_cluster(self, cluster_id):
        self.logger_failure.info(('Killing {}{} which was processing file {} '
            'due to exceeding RAM limit').format(self.cluster_job_name, 
             cluster_id, self.file_to_cluster_order_dict[cluster_id][-1]))
        self.client_dict[cluster_id].close()
        os.system('ipcluster stop --profile={}{}'.format(self.cluster_job_name, cluster_id))
        self.load_balanced_view_dict.pop(cluster_id)
        self.client_dict.pop(cluster_id)
        self.async_results_dict.pop(jth_cluster)
        
    def kill_cluster_if_ram_limit_exceeded(self): # only kills at max 1 cluster per method call
        print('function kill_cluster_if_ram_limit_exceeded called')
        print(self.cluster_RAM_use_dict.values(), self.ram_limit_in_GB)
        if sum(self.cluster_RAM_use_dict.values()) > self.ram_limit_in_GB:
            cluster_id = sorted(self.cluster_RAM_use_dict, 
                                 key=self.cluster_RAM_use_dict.get, reverse=True)[0]
            self.early_kill_cluster(cluster_id)
            print('killing cluster{}'.format(cluster_id))
            self.cluster_indexes = itertools.cycle(sorted(self.load_balanced_view_dict))
        assert len(self.load_balanced_view_dict) != 0, 'All clusters have been killed prematurely'
        
    def create_kwargs_dict_list(self, input_file_name, cluster_id, n_cpus):
        function_kwargs_dict = copy.deepcopy(self.function_kwargs_dict)
        function_kwargs_dict.update({'input_file_name': input_file_name,
                                    'cluster_output_dir': self.cluster_output_dir,
                                    'cluster_id': cluster_id,
                                    'n_cpus': n_cpus})
        function_kwargs_dict_list = []
        for cpu_id in range(n_cpus):
            function_kwargs_dict_list.append(copy.deepcopy(function_kwargs_dict))
            function_kwargs_dict_list[cpu_id]['cpu_id'] = cpu_id
        return function_kwargs_dict_list 
    
    def check_if_function_in_cluster_failed(self, jth_cluster):
        if self.async_results_dict[jth_cluster] == []: # cluster just started, so it
            return # doesn't have any files sent to the cluster yet
        else:
            exception = self.async_results_dict[jth_cluster][-1].exception()
            if exception:
                self.logger_failure.info('{}th cluster has error {} on file {}'.format(
                    jth_cluster, exception.args[0], self.file_to_cluster_order_dict[jth_cluster][-1]))
            self.async_results_dict.pop(jth_cluster)
                                     
    def run_clusters(self):
        small_file_ctr = 1 # determine if you want to have queue or differently ordered queue
        big_file_ctr = 0
        
        for ith_file in tqdm(range(len(self.input_file_names))):
            while True:
                # for jth_cluster in self.cluster_indexes: # infinite loop
                time.sleep(1) # hard coded delay time; want to do expected log time lag
                ### insert code here to kill cluster if RAM usage too great, 
                self.profile_memory_for_all_clusters()
                self.kill_cluster_if_ram_limit_exceeded()
                jth_cluster = next(self.cluster_indexes)
                
                if (not self.async_results_dict[jth_cluster][-1:]
                    or self.async_results_dict[jth_cluster][-1].done()): # check if cluster i is available                       
                    self.clear_memory_on_cluster(jth_cluster)
                    self.check_if_function_in_cluster_failed(jth_cluster) # check if previous file failed to process
                    
                    if jth_cluster == 0: # Send large files to large cluster (ALWAYS has id == 0)
                        index = big_file_ctr
                        big_file_ctr += 1
                    else: # Send small files to small clusters (ALWAYS have id > 0)
                        index = -small_file_ctr
                        small_file_ctr += 1
                                                                                   
                    kwargs_dict_list = self.create_kwargs_dict_list(
                        self.input_file_names[index],
                        jth_cluster, 
                        len(self.client_dict[jth_cluster].ids))                    
                    
                    async_result = self.load_balanced_view_dict[jth_cluster].map_async(
                        self.function_to_process, kwargs_dict_list)                                              
                    self.async_results_dict[jth_cluster].append(async_result)
                    self.file_to_cluster_order_dict[jth_cluster].append(self.input_file_names[index])
                    # write status to file--it will only have start times, no end times
                    self.logger_status.info(("{} is the {}th file and is sent to "
                        "{}th cluster for processing").format(
                        self.input_file_names[index], ith_file, jth_cluster))
                    break # break out of inner loop to determine if other clusters are available

        while not all(self.async_results_dict[jth_cluster][-1].done()
                      for jth_cluster in self.async_results_dict): # wait for all clusters to finish
            time.sleep(1) # hard coded wait time
            self.profile_memory_for_all_clusters()
            self.kill_cluster_if_ram_limit_exceeded()
            print('ending of while loop body')
                
        cluster_set = set()
        for jth_cluster in self.cluster_indexes:
            if jth_cluster in cluster_set:
                break
            cluster_set.add(jth_cluster)
            self.check_if_function_in_cluster_failed(jth_cluster) # check if last file failed to process
        # async_results_dict; save to disk for later inspection?
        # take a look at whether the results cache takes too much RAM
        
    def main(self):
        self.create_cluster_output_dir()
        self.start_all_clusters()
        self.run_clusters()
        self.kill_all_clusters()

Overwriting multiple_cluster_engine.py


In [None]:
mce.async_results_dict

In [None]:
# time between profiles
# write RAM failure

In [None]:
class MultipleClusterEnginePrototype(object):
    def cluster_release_memory():
        # after each map/reducer step, use gc.collect()
        pass

In [None]:
# if necessary, recreate engine here if cluster shut down
# figure out queue vs deque; deque is better
# # write a crap load of documentation
# write shell script for configuration and installation

In [None]:
# weakref
# unittest with a mapper/reducer? after each map/reducer step, use gc.collect()

# MCE works on files. Hence, if you don't have any datafiles, then just create some empty files
# SSD for parallel reading (not HDD); determine if you are IO constrained
# RAM usage is heavier in Python 3 than Python 2; though Python 3 memory management is better
# during function failure: benefit (error type will be saved to failure.log) and weakness 
#     (it doesn't say what line code failed at so you have to debug your function outside 
#     of the MCE instance. You have to debug as if it were just calling the function by itself on some data)

In [None]:
https://github.com/donnemartin/data-science-ipython-notebooks/tree/master/mapreduce

In [1]:
import itertools

class Temp0(object):
    def __init__(self):
        self.load_balanced_view_dict = {i: 1 for i in range(5)}
        self.cluster_indexes = itertools.cycle(sorted(self.load_balanced_view_dict))
        self.cluster_indexes_updated = False
        
    def mutate_cluster_indexes(self, i):
        if i == 3:
            self.load_balanced_view_dict = {5: 1, 6: 1, 7: 1}
            self.cluster_indexes = itertools.cycle(sorted(self.load_balanced_view_dict))
        return i
        
        self.cluster_indexes = itertools.cycle(sorted(self.load_balanced_view_dict))
        
    def generator(self):
        def gen():
            for i in self.cluster_indexes:
                yield i
        gen1 = gen()
        while True:
            if self.cluster_indexes_updated == True:
                gen1 = gen()
                self.cluster_indexes_updated = False
            temp_value = next(gen1)
            if temp_value == 3: # do a mutation
                self.load_balanced_view_dict = {5: 1, 6: 1, 7: 1}
                self.cluster_indexes = itertools.cycle(sorted(self.load_balanced_view_dict))
                self.cluster_indexes_updated = True
            return temp_value
    
    def incrementer(self):
        for i in self.cluster_indexes:
            print(self.mutate_cluster_indexes(i))

In [7]:
temp0 = Temp0()
#temp0.incrementer()

In [4]:
import itertools

class Temp2(object):
    def __init__(self):
        self.load_balanced_view_dict = {i: 1 for i in range(5)}
        self.cluster_indexes = None
        self.cluster_indexes_updated = False
        
    def mutate_cluster_indexes(self, i):
        if i == 3:
            self.load_balanced_view_dict = {5: 1, 6: 1, 7: 1}
            self.cluster_indexes_updated = True
        return i        
        
    def cluster_indexes_generator(self):
        def gen_in_gen():            
            def gen_in_gen_in_gen():
                for i in itertools.cycle(sorted(self.load_balanced_view_dict)):
                    yield i
            actual_gen = gen_in_gen_in_gen()
            while True:
                if self.cluster_indexes_updated:
                    actual_gen = gen_in_gen_in_gen()
                    self.cluster_indexes_updated = False            
                yield next(actual_gen)            
        return gen_in_gen()
    
    def safe_incrementer(self):
        self.cluster_indexes = self.cluster_indexes_generator()
        for i in self.cluster_indexes:
            self.mutate_cluster_indexes(i)
            print(i)

In [6]:
temp2 = Temp2()
#temp2.safe_incrementer()