In [2]:
import random
from deap import base, creator, tools

# Define the fitness function
def evalOneMax(individual):
    return (sum(individual),)  # Sum the number of 1s in the individual

# Set up the environment
creator.create("FitnessMax", base.Fitness, weights=(1.0,))  # Objective: maximize the fitness
creator.create("Individual", list, fitness=creator.FitnessMax)

toolbox = base.Toolbox()
toolbox.register("attr_bool", random.randint, 0, 1)
toolbox.register("individual", tools.initRepeat, creator.Individual, toolbox.attr_bool, 10)  # 10 genes per individual
toolbox.register("population", tools.initRepeat, list, toolbox.individual)

toolbox.register("evaluate", evalOneMax)
toolbox.register("mate", tools.cxTwoPoint)
toolbox.register("mutate", tools.mutFlipBit, indpb=0.1)  # 10% mutation probability
toolbox.register("select", tools.selTournament, tournsize=3)

# Create a population and evaluate it
population = toolbox.population(n=50)  # 50 individuals in the population
NGEN = 10  # Number of generations

for gen in range(NGEN):
    offspring = toolbox.select(population, len(population))
    offspring = list(map(toolbox.clone, offspring))

    for child1, child2 in zip(offspring[::2], offspring[1::2]):
        toolbox.mate(child1, child2)
        del child1.fitness.values
        del child2.fitness.values

    for mutant in offspring:
        toolbox.mutate(mutant)
        del mutant.fitness.values

    invalid_ind = [ind for ind in offspring if not ind.fitness.valid]
    for ind in invalid_ind:
        ind.fitness.values = toolbox.evaluate(ind)

    population = offspring

# Extracting and printing the best result
best_ind = tools.selBest(population, 1)[0]
print("Best individual is %s, %s" % (best_ind, best_ind.fitness.values))


Best individual is [1, 1, 1, 1, 1, 1, 1, 1, 1, 1], (10.0,)




In [1]:
from mrjob.job import MRJob
from mrjob.step import MRStep

class MRWeatherAnalysis(MRJob):

    def steps(self):
        return [
            MRStep(mapper=self.mapper_get_temps,
                   reducer=self.reducer_get_max_temp),
            MRStep(reducer=self.reducer_find_max_temp_year)
        ]

    def mapper_get_temps(self, _, line):
        # Split the line into components
        parts = line.split(',')
        try:
            year = parts[0]
            temp = float(parts[1])
            yield year, temp
        except ValueError:
            pass  # ignore lines with invalid data

    def reducer_get_max_temp(self, year, temps):
        # Send the max temperature for each year to the same reducer
        yield None, (max(temps), year)

    def reducer_find_max_temp_year(self, _, year_temp_pairs):
        # Find the year with the maximum temperature
        yield max(year_temp_pairs)


In [2]:

# Import necessary modules from mrjob
from mrjob.job import MRJob
from mrjob.step import MRStep

# Define the MapReduce job class
class MRWeatherAnalysis(MRJob):

    def steps(self):
        return [
            MRStep(mapper=self.mapper_get_temps,
                   reducer=self.reducer_get_max_temp),
            MRStep(reducer=self.reducer_find_max_temp_year)
        ]

    def mapper_get_temps(self, _, line):
        # Split the line into components assuming the format "Year,Temperature"
        parts = line.split(',')
        if len(parts) >= 2:
            try:
                year = parts[0].strip()
                temp = float(parts[1].strip())
                yield year, temp
            except ValueError:
                pass  # ignore lines with invalid data

    def reducer_get_max_temp(self, year, temps):
        # Yield the max temperature for each year
        yield None, (max(temps), year)

    def reducer_find_max_temp_year(self, _, year_temp_pairs):
        # Find and yield the year with the maximum temperature
        yield 'Hottest Year', max(year_temp_pairs, key=lambda x: x[0])

# Create an instance of the job
mr_job = MRWeatherAnalysis(args=['weather.csv'])  # Ensure 'weather_data.csv' is the path to your data file

# Execute the job
with mr_job.make_runner() as runner:
    runner.run()
    
    # Print results directly
    for line in runner.stream_output():
        key, value = mr_job.parse_output_line(line)
        print(f'{key}: Year {value[1]} with max temp {value[0]}°C')


TypeError: expected str, bytes or os.PathLike object, not NoneType

In [3]:
# Import necessary modules from mrjob
from mrjob.job import MRJob
from mrjob.step import MRStep
import io

# Define the MapReduce job class
class MRWeatherAnalysis(MRJob):

    def steps(self):
        return [
            MRStep(mapper=self.mapper_get_temps,
                   reducer=self.reducer_get_max_temp),
            MRStep(reducer=self.reducer_find_max_temp_year)
        ]

    def mapper_get_temps(self, _, line):
        # Split the line into components assuming the format "Year,Temperature"
        parts = line.split(',')
        try:
            year = parts[0].strip()
            temp = float(parts[1].strip())
            yield year, temp
        except ValueError:
            pass  # ignore lines with invalid data

    def reducer_get_max_temp(self, year, temps):
        # Yield the max temperature for each year
        yield None, (max(temps), year)

    def reducer_find_max_temp_year(self, _, year_temp_pairs):
        # Find and yield the year with the maximum temperature
        yield 'Hottest Year', max(year_temp_pairs, key=lambda x: x[0])

# Generate sample data
data = [
    '1990,15.6',
    '1991,15.9',
    '1990,16.2',
    '1992,15.2',
    '1991,17.5',
    '1992,18.1',
    '1993,14.3'
]

# Create an instance of the job
mr_job = MRWeatherAnalysis(args=['-'])

# Simulate the input file as in-memory string stream
with mr_job.make_runner() as runner:
    # Start the context for the MapReduce job
    runner.run(stdin=io.StringIO('\n'.join(data)))
    
    # Print results directly
    for line in runner.stream_output():
        key, value = mr_job.parse_output_line(line)
        print(f'{key}: Year {value[1]} with max temp {value[0]}°C')


TypeError: expected str, bytes or os.PathLike object, not NoneType

In [5]:
import numpy as np
import random

# Constants
NUM_PARAMETERS = 4  # Number of parameters (e.g., temperature, feed rate, etc.)
POPULATION_SIZE = 50  # Number of individuals in the population
MAX_GENERATIONS = 100  # Number of generations
MUTATION_RATE = 0.01  # Probability of mutation
PARAMETER_RANGES = np.array([[100, 300],  # Inlet air temperature range
                             [0.1, 1.0],  # Feed rate range
                             [200, 500],  # Air flow rate range
                             [1000, 3000]])  # Atomizer speed range

# Neural network simulation
def predict_nn(parameters):
    # Simulate NN predictions. In practice, you would replace this with actual model predictions.
    # For simplicity, let's assume better performance is when parameters are closer to the middle of their range.
    ideal = np.mean(PARAMETER_RANGES, axis=1)
    return -np.sum((parameters - ideal) ** 2, axis=1)  # Negative squared distance from the ideal point

# Genetic Algorithm Components
def initialize_population(size):
    return np.random.uniform(low=PARAMETER_RANGES[:, 0], high=PARAMETER_RANGES[:, 1], size=(size, NUM_PARAMETERS))

def evaluate_fitness(population):
    return predict_nn(population)

def select(population, fitness, num_parents):
    # Tournament selection
    parents = np.zeros((num_parents, NUM_PARAMETERS))
    for i in range(num_parents):
        random_ids = np.random.randint(0, len(population), 4)
        best_id = random_ids[np.argmax(fitness[random_ids])]
        parents[i] = population[best_id]
    return parents

def crossover(parents):
    offspring = np.zeros((POPULATION_SIZE, NUM_PARAMETERS))
    crossover_point = np.random.randint(1, NUM_PARAMETERS-1)
    for i in range(POPULATION_SIZE):
        parent1_index = i % parents.shape[0]
        parent2_index = (i+1) % parents.shape[0]
        offspring[i, :crossover_point] = parents[parent1_index, :crossover_point]
        offspring[i, crossover_point:] = parents[parent2_index, crossover_point:]
    return offspring

def mutate(offspring):
    for idx in range(offspring.shape[0]):
        for jdx in range(NUM_PARAMETERS):
            if np.random.rand() < MUTATION_RATE:
                random_value = np.random.uniform(PARAMETER_RANGES[jdx, 0], PARAMETER_RANGES[jdx, 1])
                offspring[idx, jdx] = random_value
    return offspring

def genetic_algorithm():
    population = initialize_population(POPULATION_SIZE)
    for generation in range(MAX_GENERATIONS):
        fitness = evaluate_fitness(population)
        parents = select(population, fitness, POPULATION_SIZE // 2)
        offspring = crossover(parents)
        population = mutate(offspring)
    best_index = np.argmax(evaluate_fitness(population))
    return population[best_index]

# Run the genetic algorithm
best_parameters = genetic_algorithm()
print("Optimized Parameters:", best_parameters)


Optimized Parameters: [2.00137684e+02 5.40950559e-01 3.45301289e+02 1.99384196e+03]
