Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Getting DEAP working with Spark #268

Open
ryanpeach opened this issue Mar 9, 2018 · 11 comments
Open

Getting DEAP working with Spark #268

ryanpeach opened this issue Mar 9, 2018 · 11 comments

Comments

@ryanpeach
Copy link

Goal

I'm trying to get DEAP to parallelize across a spark cluster. I have seen this referenced by other users, as it allows for tight integration with existing server architecture easily via yarn. I have followed several tutorials online cited in the references. I have working code for deap, and then code that I have attempted to transform to use spark. The same error 'Can't get attribute Individual on module deap.creator' is the one to usually occur.

Working Code

    import numpy as np
    import random

    from deap import base
    from deap import creator
    from deap import tools
    from deap import algorithms

    creator.create("FitnessMax", base.Fitness, weights=(1.0,))
    creator.create("Individual", list, fitness=creator.FitnessMax)

    def evalOneMax(individual):
        return sum(individual),

    toolbox = base.Toolbox()
    toolbox.register("attr_bool", random.randint, 0, 1)
    toolbox.register("individual", tools.initRepeat, creator.Individual,
        toolbox.attr_bool, 100)
    toolbox.register("population", tools.initRepeat, list, toolbox.individual)
    toolbox.register("evaluate", evalOneMax)
    toolbox.register("mate", tools.cxTwoPoint)
    toolbox.register("mutate", tools.mutFlipBit, indpb=0.05)
    toolbox.register("select", tools.selTournament, tournsize=3)

    # Define parallelism outside main
    if __name__=="__main__":

        pop = toolbox.population(n=300)
        hof = tools.HallOfFame(1)
        stats = tools.Statistics(lambda ind: ind.fitness.values)
        stats.register("avg", np.mean)
        stats.register("std", np.std)
        stats.register("min", np.min)
        stats.register("max", np.max)

        pop, log = algorithms.eaSimple(pop, toolbox, cxpb=0.5, mutpb=0.2, ngen=40,
                                       stats=stats, halloffame=hof, verbose=True)

Ran:

python3 test-deap.py

Output

    gen nevals  avg     std     min max
    0   300     50.5533 4.78893 38  63
    1   184     54.2433 3.85627 44  65
    2   191     57.2667 3.38756 47  65
    3   182     60.0333 3.15154 51  69
    4   179     62.35   2.95762 53  71
    5   176     64.3267 2.77968 55  73
    6   183     66.2467 2.80223 58  75
    7   182     68.1733 2.66019 59  79
    8   186     69.8367 2.83255 62  77
    9   179     71.7233 2.70557 59  78
    10  187     73.3667 2.63165 63  80
    11  169     74.7933 2.43255 65  80
    12  182     76.0967 2.42363 65  82
    13  204     77.3    2.36995 67  83
    14  203     78.6267 2.31818 70  84
    15  182     79.8933 2.29535 72  84
    16  183     81.02   2.29483 72  86
    17  185     81.87   2.41242 73  87
    18  190     83.0633 2.13057 74  87
    19  182     84.06   2.16096 75  89
    20  194     84.8167 2.41724 77  91
    21  174     85.9633 2.22755 79  91
    22  180     86.8033 2.23263 79  92
    23  177     87.7533 2.3831  78  93
    24  174     88.61   2.34334 79  93
    25  171     89.6167 2.36144 78  95
    26  195     90.57   2.4695  81  95
    27  169     91.5233 2.23072 82  96
    28  173     92.3733 2.16347 83  97
    29  203     93.1    2.13151 85  97
    30  179     93.6067 2.41356 84  98
    31  169     94.3067 2.23293 86  99
    32  184     94.8933 2.49706 85  99
    33  175     95.8733 2.14413 88  99
    34  168     96.2167 2.30428 88  99
    35  173     96.88   2.22537 87  100
    36  171     97.33   2.29951 87  100
    37  184     97.89   2.05375 91  100
    38  175     98.0333 2.52565 88  100
    39  176     98.6667 2.07579 90  100
    40  175     98.6867 2.32562 91  100

Not Working Code

    from pyspark import SparkContext

    import numpy as np
    import random

    from deap import base
    from deap import creator
    from deap import tools
    from deap import algorithms

    creator.create("FitnessMax", base.Fitness, weights=(1.0,))
    creator.create("Individual", list, fitness=creator.FitnessMax)

    def evalOneMax(individual):
        return sum(individual),

    toolbox = base.Toolbox()
    toolbox.register("attr_bool", random.randint, 0, 1)
    toolbox.register("individual", tools.initRepeat, creator.Individual,
        toolbox.attr_bool, 100)
    toolbox.register("population", tools.initRepeat, list, toolbox.individual)
    toolbox.register("evaluate", evalOneMax)
    toolbox.register("mate", tools.cxTwoPoint)
    toolbox.register("mutate", tools.mutFlipBit, indpb=0.05)
    toolbox.register("select", tools.selTournament, tournsize=3)

    # Define parallelism outside main
    if __name__=="__main__":
        sc = SparkContext(appName="DEAP")

        def sparkMap(algorithm, population):
            return sc.parallelize(population).map(algorithm).collect()

        toolbox.register("map", sparkMap)

        pop = toolbox.population(n=300)
        hof = tools.HallOfFame(1)
        stats = tools.Statistics(lambda ind: ind.fitness.values)
        stats.register("avg", np.mean)
        stats.register("std", np.std)
        stats.register("min", np.min)
        stats.register("max", np.max)

        pop, log = algorithms.eaSimple(pop, toolbox, cxpb=0.5, mutpb=0.2, ngen=40,
                                       stats=stats, halloffame=hof, verbose=True)

Ran:

spark-submit --master local test-deap.py

Output

Full Text (pastebin)

Highlights:

    Traceback (most recent call last):
      File "/Users/ryapeach/Documents/Workspace/relay-death/test-deap.py", line 45, in <module>
        stats=stats, halloffame=hof, verbose=True)
      File "/usr/local/lib/python3.6/site-packages/deap-1.2.2-py3.6-macosx-10.13-x86_64.egg/deap/algorithms.py", line 150, in eaSimple
        fitnesses = toolbox.map(toolbox.evaluate, invalid_ind)
      File "/Users/ryapeach/Documents/Workspace/relay-death/test-deap.py", line 32, in sparkMap
        return sc.parallelize(population).map(algorithm).collect()
    AttributeError: Can't get attribute 'Individual' on <module 'deap.creator' from '/usr/local/lib/python3.6/site-packages/deap-1.2.2-py3.6-macosx-10.13-x86_64.egg/deap/creator.py'>

References:

@cmd-ntrf
Copy link
Member

Hi Ryan,

First, thanks for the detailed issue, it was a pleasant read. Second, we do have a solution for this. It has been sitting in the pull-request list for almost three years, and I am the one to blame. I have taken a too long DEAP hiatus, during which I have been teaching Spark among other things.

Here is the PR : #76

There a some conflicts since the patch is a bit old, but you should be able to merge without too much effort and then test it with Spark. It should solve your issue. Let us know the outcome. The patch was normally meant for DEAP 2.0, this could give us the necessary motivation to assemble a new release.

Cheers,
Felix

@ryanpeach
Copy link
Author

ryanpeach commented Mar 19, 2018

Great, thanks @cmd-ntrf.

I’ve been working with deap parallelization for a while now. Unfortunately I’m unable to publish some of my company tutorials on how to do it xD

Needless to say I’ve become somewhat aquainted with the source code and the like.

It would be a good idea I think to expand our documentation on the parallelization options! My one hint would be that I’ve had luck with DEAP on AWS cfncluster using Scoop. I would like to make it automatic with boto, but I wouldn’t be able to publish that either unfortunately without corporate approval :(

Just saying, it was damn hard! And if anyone not bound by an agreement has examples they should publish (and PM me for advice on bug fixes and directions lol)

@cmd-ntrf
Copy link
Member

Do you have specific examples of parallelization that would be of value to you? We thought we had it covered, but clearly there are some issues if you found it "damn hard" ;).

Also, I am curious, who do you work for?

@ryanpeach
Copy link
Author

Well I think the specific parallelization example (as opposed to something very general like: "Use Scoop") with AWS would be helpful to some people. The problem really isn't as much with DEAP, as it is with the combined 3 documentation, between DEAP, SCOOP, and whatever cloud platform you are working with.

I work with Keysight technologies, I use DEAP mostly for hyperparameter optimization. I'll see after a while if we can't publish some of my tutorials.

@jbrant
Copy link

jbrant commented May 16, 2018

I'm having the same issue and looked at the above referenced pull request, but it appears to address problems with pickling when creator.create is called outside of the global scope; however, in my case (and, it appears, in the OPs example as well), I am calling that function in the global scope and getting a similar exception:

  File "C:\Users\brantj\AppData\Local\spark-2.3.0-bin-hadoop2.7\python\lib\pyspark.zip\pyspark\worker.py", line 229, in main
  File "C:\Users\brantj\AppData\Local\spark-2.3.0-bin-hadoop2.7\python\lib\pyspark.zip\pyspark\worker.py", line 224, in process
  File "C:\Users\brantj\AppData\Local\spark-2.3.0-bin-hadoop2.7\python\lib\pyspark.zip\pyspark\serializers.py", line 372, in dump_stream
    vs = list(itertools.islice(iterator, batch))
  File "C:\Users\brantj\AppData\Local\Continuum\anaconda3\envs\utility\lib\site-packages\pyspark\rdd.py", line 1354, in takeUpToNumLeft
    yield next(iterator)
  File "C:\Users\brantj\AppData\Local\spark-2.3.0-bin-hadoop2.7\python\lib\pyspark.zip\pyspark\serializers.py", line 145, in load_stream
    yield self._read_with_length(stream)
  File "C:\Users\brantj\AppData\Local\spark-2.3.0-bin-hadoop2.7\python\lib\pyspark.zip\pyspark\serializers.py", line 170, in _read_with_length
    return self.loads(obj)
  File "C:\Users\brantj\AppData\Local\spark-2.3.0-bin-hadoop2.7\python\lib\pyspark.zip\pyspark\serializers.py", line 559, in loads
    return pickle.loads(obj, encoding=encoding)
AttributeError: Can't get attribute 'Individual' on <module 'deap.creator' from 'C:\\Users\\brantj\\AppData\\Local\\Continuum\\anaconda3\\lib\\site-packages\\deap\\creator.py'>

That being said, I'll try merging the pull request and test.

@karan10111
Copy link

karan10111 commented May 16, 2018

I just tried merging the PR. The code which wasn't working for @ryanpeach worked.
I couldn't really understand what @jbrant is trying to say. All the tests, seems to be working fine.

@karan10111
Copy link

karan10111 commented May 16, 2018

There was minor issue though, that pull request has dct.iteritems, which I had to change to dct.items(), in the MetaCreator class, for python 3.6.4.

@jbrant
Copy link

jbrant commented May 16, 2018

I just meant that the OP's example called "creator.create" in the global scope, but the PR fix was to allow that call within a local scope, which originally didn't work due to an issue with visibility of pickled objects by worker nodes during distributed execution.

That being said, I was incorrect in assuming that it wouldn't fix the problem - merging PR #76 also got it working for me. I've created PR #280 which incorporates the fix into the latest baseline and it seems to be passing CI checks.

@omarcr
Copy link

omarcr commented Mar 12, 2019

please see rsteca/sklearn-deap#59

I am having issue with this as well.

Thanks,
Omar

@jfrfonseca
Copy link

jfrfonseca commented Aug 22, 2019

I created an workaround so that DEAP could work with the Reusable Processes Framework loky.

The example below rewrites DEAP's onemap_mp.py example

`
import array
import random
import sys

if sys.version_info < (2, 7):
    print("mpga_onemax example requires Python >= 2.7.")
    exit(1)

import numpy

from deap import algorithms
from deap import base
from deap import creator
from deap import tools

# NEW IMPORTS
from loky import get_reusable_executor

# NEW FUNCTION
# We define the "creator preparation" procedure to be reused by each process
def prepare_creator():
    creator.create("FitnessMax", base.Fitness, weights=(1.0,))
    creator.create("Individual", array.array, typecode='b', fitness=creator.FitnessMax)

# We RUN the creator preparation procedure so the root process can be prepared
prepare_creator()

# The following code is the original preparation as for DEAP's example
toolbox = base.Toolbox()

# Attribute generator
toolbox.register("attr_bool", random.randint, 0, 1)

# Structure initializers
toolbox.register("individual", tools.initRepeat, creator.Individual, toolbox.attr_bool, 100)
toolbox.register("population", tools.initRepeat, list, toolbox.individual)

def evalOneMax(individual):
    return sum(individual),

toolbox.register("evaluate", evalOneMax)
toolbox.register("mate", tools.cxTwoPoint)
toolbox.register("mutate", tools.mutFlipBit, indpb=0.05)
toolbox.register("select", tools.selTournament, tournsize=3)

if __name__ == "__main__":
    random.seed(64)

    # The following code was altered to use LOKY's reusable processes
    # Process Pool of 4 workers
    pool = get_reusable_executor(4, reuse=True)

    # The following code is the mentioned workaround
    # After initiating the pool, we define a "supermap" function that guarantees that every process will execute the creator preparation procedure and then run the mapped function 
    def supermap(*args, **kwargs):
        prepare_creator()
        return pool.map(*args, **kwargs)

    # We then register our "supermap" function as the parallel directive for DEAP
    toolbox.register("map", supermap)

    # The following code is the same as DEAP's example
    pop = toolbox.population(n=100)
    hof = tools.HallOfFame(1)

    stats = tools.Statistics(lambda ind: ind.fitness.values)
    stats.register("avg", numpy.mean)
    stats.register("std", numpy.std)
    stats.register("min", numpy.min)
    stats.register("max", numpy.max)

    algorithms.eaSimple(pop, toolbox, cxpb=0.5 mutpb=0.2, ngen=40, stats=stats, halloffame=hof)

`

I believe this approach could also work with PYSPARK, due to its "reusable execution environment" architecture.

It seems that the problem arises from DEAP's unusual design for object scope and memory space, reusing several global objects (for example, the "creator"), that are hard to share among different processes.

@berndtlindner
Copy link

berndtlindner commented Sep 11, 2019

Any further feedback on this issue being resolved?
Perhaps fixing problems stopping PR #76 from being merged?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

7 participants