In [1]:
import pandas as pd

## load data
cluster_capacity_df = pd.read_csv('./data/tmp_cluster_20201029', names=['cluster_name', 'cluster_total_cpu', 'cluster_total_storage']).drop_duplicates()
table_cpu_cost_df = pd.read_csv('./data/tmp_table_cpu_cost_20201029', names=['project_name','table_name','job_count', 'table_cost_cpu']).drop_duplicates()
table_storage_cost_df = pd.read_csv('./data/tmp_table_storage_20201029', names=['project_name', 'table_name', 'total_storage']).drop_duplicates()
table_job_df = pd.read_csv('./data/tmp_table_input_relevence_20201029', names=['ndTable', 'stTable', 'relevence_size']).drop_duplicates()
table_updown_df = pd.read_csv('./data/tmp_table_output_relevence_20201029', names=['output', 'input', 'relevence_size']).drop_duplicates()

## clean data
cluster_capacity_df['cluster_name'] = cluster_capacity_df['cluster_name'].str.lower()
table_job_df['ndTable'] = table_job_df['ndTable'].str.lower()
table_job_df['stTable'] = table_job_df['stTable'].str.lower()
table_updown_df['output'] = table_updown_df['output'].str.lower()
table_updown_df['input'] = table_updown_df['input'].str.lower()
cluster_capacity_df.fillna(value={'cluster_total_cpu':0}, inplace=True)
table_cpu_cost_df.fillna(value={'table_cost_cpu':0}, inplace=True)


## data info
print("--------------------")
print("ClusterCapacityInfo : size = {}".format(
    cluster_capacity_df.shape[0]))
print(cluster_capacity_df.dtypes)
print(cluster_capacity_df.head())
print("--------------------")
print("TableCpuInfo : size = {}".format(table_cpu_cost_df.shape[0]))
print(table_cpu_cost_df.dtypes)
print(table_cpu_cost_df.head())
print("--------------------")
print("TableStorageInfo : size = {}".format(table_storage_cost_df.shape[0]))
print(table_storage_cost_df.dtypes)
print(table_storage_cost_df.head())
print("--------------------")
print("TableJobInfo : size = {}".format(
    table_job_df.shape[0]))
print(table_job_df.dtypes)
print(table_job_df.head())
print("--------------------")
print("TableUpDownInfo : size = {}".format(
            table_updown_df.shape[0]))
print(table_updown_df.dtypes)
print(table_updown_df.head())
print("--------------------")

--------------------
ClusterCapacityInfo : size = 14
cluster_name              object
cluster_total_cpu        float64
cluster_total_storage      int64
dtype: object
  cluster_name  cluster_total_cpu  cluster_total_storage
0        ay75k       6.126449e+11      90889947808845008
1        ay75a       4.318640e+11      83102415615917488
2        ay98a       9.940431e+11     186109695381537984
3        ay49c       4.420882e+12     516597957439482620
4        ay75i       1.235933e+12     319121595919632770
--------------------
TableCpuInfo : size = 1587059
project_name       object
table_name         object
job_count           int64
table_cost_cpu    float64
dtype: object
  project_name                    table_name  job_count  table_cost_cpu
0         abif     abc_item_behavior_1_d_001          1          1180.0
1         abif  abc_user_behavior_1_d_v2_003        247      29424455.0
2         abif  abc_user_behavior_2_d_v2_004        453      84391710.0
3         abif  abc_user_behavior_3

In [2]:
from tqdm import tqdm

table_size = table_cpu_cost_df.shape[0]
cluster_size = cluster_capacity_df.shape[0]
table_job_size = table_job_df.shape[0]
table_updown_size = table_updown_df.shape[0]

## get Cluster Probability during choosing cluster based on this total cpu
clusterRouletteProbability = cluster_capacity_df['cluster_total_cpu'] / cluster_capacity_df['cluster_total_cpu'].sum()
print('----clusterRouletteProbability----')
print(clusterRouletteProbability)
print('--------')


## inverted index from table to their index
table2index = {}
for index in tqdm(range(table_size), desc='Processing'):
    table2index[table_cpu_cost_df.iloc[index, 0] + '.' + table_cpu_cost_df.iloc[index, 1]] = index

## get total Flow
totalFlow = 0
for index in tqdm(range(table_job_size), desc='Processing'):
    totalFlow += table_job_df.iloc[index, 2]    
for index in tqdm(range(table_updown_size), desc='Processing'):
    totalFlow += table_updown_df.iloc[index, 2]

print('----totalFlow----')
print(totalFlow)
print('--------')

Processing:   0%|          | 2260/1587059 [00:00<01:10, 22595.98it/s]

----clusterRouletteProbability----
0     0.020064
1     0.014143
2     0.032554
3     0.144780
4     0.040476
5     0.096798
6     0.085117
7     0.087404
8     0.095132
9     0.070813
10    0.087447
11    0.049736
12    0.041575
13    0.133961
Name: cluster_total_cpu, dtype: float64
--------


Processing: 100%|██████████| 1587059/1587059 [01:10<00:00, 22651.50it/s]
Processing: 100%|██████████| 1708783/1708783 [00:36<00:00, 47176.87it/s]
Processing: 100%|██████████| 801708/801708 [00:17<00:00, 47009.45it/s]

----totalFlow----
305053213.15901905
--------





In [3]:
import random

## evalute roulette Random algorithm
def rouletteRandom(probability):
    sum = 0
    ran = random.random()
    for num, r in zip(range(len(probability)), probability):
        sum += r
        if ran < sum :
            break
    return num

In [4]:
## For each individual, we calculate it's assessment, which is innerFlow and innerFlow.
## Addition, we stores the usedCPUMap(key=cluster_index,value=totalCPUForThisPlancement) and clusterTableMap(key=cluster_index,value=list(table_index))
class Individual:
    
    def __init__(self, entity, usedCPUMap, clusterTableMap, execute):
        self.entity = entity
        self.innerFlow = 0;
        self.crossFlow = 0
        self.usedCPUMap = usedCPUMap
        self.clusterTableMap = clusterTableMap
        if execute:
            self.figureAssessment()
        
    def figureAssessment(self):
        self.figureTableJob()
        self.figureTableUpdown()
        self.printInfo()
        
    def printInfo(self):
        print("----IndividualInfo----")
        print("InnerFlow: " + str(self.innerFlow))
        print("CrossFlow: " + str(self.crossFlow))
        print("CPUUsage: ")
        for index in range(len(self.usedCPUMap)):
            print(str(index) + ":" + str(self.usedCPUMap[index] / cluster_capacity_df.iloc[index, 1]))
        print("--------")
        
    def figureTableJob(self):
        for index in range(table_job_size):
            if table_job_df.iloc[index, 0] in table2index.keys():
                left = table2index[table_job_df.iloc[index, 0]]
            else:
                continue
            if table_job_df.iloc[index, 1] in table2index.keys():
                right = table2index[table_job_df.iloc[index, 1]]
            else:
                continue
            if self.entity[left] != self.entity[right]:
                self.crossFlow += table_job_df.iloc[index, 2]
            else:
                self.innerFlow += table_job_df.iloc[index, 2]
        
    def figureTableUpdown(self):
        for index in range(table_updown_size):
            if table_updown_df.iloc[index, 0] in table2index.keys():
                left = table2index[table_updown_df.iloc[index, 0]]
            else:
                continue
            if table_updown_df.iloc[index, 1] in table2index.keys():
                right = table2index[table_updown_df.iloc[index, 1]]
            else:
                continue
            if self.entity[left] != self.entity[right]:
                self.crossFlow += table_job_df.iloc[index, 2]
            else:
                self.innerFlow += table_job_df.iloc[index, 2]

In [5]:
## random generates a valid ndividual along with it's usedCPUMap and clusterTableMap
def figureInitPopulation():
    usedCPUMap = np.zeros([cluster_size], dtype = np.int64)
    clusterTableMap = {}
    entity = np.zeros([table_size], dtype = np.int8)
    for index in tqdm(range(table_size), desc='Processing'):
        while True:
            choosedCluster = rouletteRandom(clusterRouletteProbability)
            if usedCPUMap[choosedCluster] + table_cpu_cost_df.iloc[index, 3] <= cluster_capacity_df.iloc[choosedCluster, 1]:
                usedCPUMap[choosedCluster] += table_cpu_cost_df.iloc[index, 3]
                if choosedCluster not in clusterTableMap.keys():
                    clusterTableMap[choosedCluster] = []
                clusterTableMap[choosedCluster].append(index)
                entity[index] = choosedCluster
                break
    return Individual(entity, usedCPUMap, clusterTableMap, True)

In [6]:
## init cpu_num - 1 individuals for first population using alomost all cpu process to accelerate 
import numpy as np
import multiprocessing

population = []
population_size = multiprocessing.cpu_count() - 1

results = []
pool = multiprocessing.Pool(population_size)

for i in range(population_size):
    results.append(pool.apply_async(func=figureInitPopulation))

pool.close()
pool.join()

for result in results:
    population.append(result.get())
        

Processing: 100%|██████████| 1587059/1587059 [04:06<00:00, 6432.41it/s]
Processing: 100%|██████████| 1587059/1587059 [04:07<00:00, 6417.31it/s]
Processing: 100%|██████████| 1587059/1587059 [04:07<00:00, 6416.14it/s]
Processing: 100%|██████████| 1587059/1587059 [04:07<00:00, 6404.45it/s]
Processing: 100%|██████████| 1587059/1587059 [04:07<00:00, 6404.53it/s]
Processing: 100%|██████████| 1587059/1587059 [04:07<00:00, 6403.62it/s]
Processing: 100%|██████████| 1587059/1587059 [04:08<00:00, 6398.36it/s]
Processing: 100%|██████████| 1587059/1587059 [04:08<00:00, 6387.65it/s]
Processing: 100%|██████████| 1587059/1587059 [04:09<00:00, 6372.28it/s]
Processing: 100%|██████████| 1587059/1587059 [04:09<00:00, 6365.17it/s]
Processing: 100%|██████████| 1587059/1587059 [04:09<00:00, 6362.37it/s]


----IndividualInfo----
InnerFlow: 6631745.745196534
CrossFlow: 64285963.040695064
CPUUsage: 
0:0.9308601534522722
1:0.7759400640483624
2:0.827718805884811
3:0.7353533877669575
4:0.6667907020876743
5:0.769346210905032
6:0.7278863274636168
7:0.6408767819497337
8:0.8045350469679611
9:0.5635677160293014
10:0.6364328998556473
11:0.8132625834024327
12:0.8669729884273214
13:0.8584904782859965
--------
----IndividualInfo----
InnerFlow: 8213191.457417835
CrossFlow: 62704517.3284736
CPUUsage: 
0:0.5959155451726074
1:0.730533113385975
2:0.6448135245785184
3:0.826951039745757
4:0.7796576661139895
5:0.7498311396943466
6:0.6194145090120358
7:0.7489699243138668
8:0.70334494841168
9:0.5876522729591453
10:0.601485913208933
11:0.9999999999972106
12:0.8989794500619384
13:0.8399757514462934
--------
----IndividualInfo----
InnerFlow: 7011786.8061866565
CrossFlow: 63905921.979704194
CPUUsage: 
0:0.8226781082364479
1:0.6245271680522638
2:0.5364763258994071
3:0.717018423342744
4:0.7538592645862471
5:0.7022472

In [7]:
## select two individuals for population based on their assessment and probability
def selection(population):
    sum = 0
    populationProbability = []
    for individual in population:
        sum += individual.innerFlow
    for individual in population:
        populationProbability.append(individual.innerFlow / sum)
    individual_1 = rouletteRandom(populationProbability)
    individual_2 = rouletteRandom(populationProbability)
    while individual_1 == individual_2:
            individual_2 = rouletteRandom(populationProbability)
    return individual_1, individual_2

## generate one individuals base on their parents to store good gene
def crossover(individual_1, individual_2):
    stage = 1000
    entity = np.zeros([table_size], dtype = np.int8)
    usedCPUMap = np.zeros([cluster_size], dtype = np.int64)
    clusterTableMap = {}
    for index in range(table_size):
        if index % stage == 0:
            if index + stage <= table_size:
                end = index + stage
            else:
                end = table_size
            if random.randint(0,1) == 0:
                for i in range(index, end):
                    entity[i] = population[individual_1].entity[i]
            else:
                for i in range(index, end):
                    entity[i] = population[individual_2].entity[i]
        usedCPUMap[entity[index]] += table_cpu_cost_df.iloc[index, 3]
        if entity[index] not in clusterTableMap.keys():
            clusterTableMap[entity[index]] = []
        clusterTableMap[entity[index]].append(index)
    return Individual(entity, usedCPUMap, clusterTableMap, False)

## mutate this individuals to extent search space
def mutate(individual):
    mutateProbability = 0.001
    num = int(table_size * mutateProbability)
    for _ in range(num):
        chooseTable = random.randint(0, table_size)
        newCluster = rouletteRandom(clusterRouletteProbability)
        if newCluster != individual.entity[chooseTable]:
            individual.usedCPUMap[individual.entity[chooseTable]] -= table_cpu_cost_df.iloc[chooseTable, 3]
            individual.usedCPUMap[newCluster] += table_cpu_cost_df.iloc[chooseTable, 3]
            individual.clusterTableMap[individual.entity[chooseTable]].remove(chooseTable)
            individual.clusterTableMap[newCluster].append(chooseTable)
            individual.entity[chooseTable] = newCluster
            

## repair this individual to make it feasible after crossover and mutate
def repair(individual):
    while True:
        valid = True
        for index in range(cluster_size):
            while individual.usedCPUMap[index] > cluster_capacity_df.iloc[index, 1]:
                valid = False
                chooseTable = random.choice(individual.clusterTableMap[index])
                newCluster = rouletteRandom(clusterRouletteProbability)
                if newCluster != individual.entity[chooseTable]:
                    individual.usedCPUMap[individual.entity[chooseTable]] -= table_cpu_cost_df.iloc[chooseTable, 3]
                    individual.usedCPUMap[newCluster] += table_cpu_cost_df.iloc[chooseTable, 3]
                    individual.clusterTableMap[individual.entity[chooseTable]].remove(chooseTable)
                    individual.clusterTableMap[newCluster].append(chooseTable)
                    individual.entity[chooseTable] = newCluster
        if valid:
            break

In [8]:
## generate a feasible individual base on parent population
def run():
    individual_1, individual_2 = selection(population)
    individual = crossover(individual_1,individual_2)
    mutate(individual)
    repair(individual)
    individual.figureAssessment()
    return individual

In [9]:
import pickle

breeding_rate = multiprocessing.cpu_count() - 1
# one for 7 minutes
iterations = 200
minCrossFlows = []

In [None]:
## perform <iterations> iterations
## generation <breeding_rate> individuals for each iteration using <breeding_rate> process to accelerate 
    
import os

for times in tqdm(range(iterations), desc='Processing'):
    results = []
    pool = multiprocessing.Pool(breeding_rate)

    for i in range(breeding_rate):
        results.append(pool.apply_async(func=run))

    pool.close()
    pool.join()

    for result in results:
        population.append(result.get())
        
    minInnerFlow = population[0].innerFlow
    minInnerFlowIndex = 0;
    minCrossFlow = population[0].crossFlow
    while len(population) > population_size:
        minInnerFlow = population[0].innerFlow
        minInnerFlowIndex = 0;
        for index in range(len(population)):
            if population[index].innerFlow < minInnerFlow:
                minInnerFlowIndex = index
            if population[index].crossFlow < minCrossFlow:
                minCrossFlow = population[index].crossFlow
        del population[minInnerFlowIndex]
    
    minCrossFlows.append(minCrossFlow)
    print("----" + str(times + 1) + " iteration----")
    print("minCrossFlow: " +  str(minCrossFlow))
    print("CrossFlows: ")
    for i in population:
        print(i.crossFlow)
    print("--------")
    
    isExists= os.path.exists("./placement")
    if not isExists:
        os.mkdir("./placement")
    for index in range(len(population)):
        pickle.dump(population[index], open("./placement/data" + str(index), 'wb'))

    pickle.dump(minCrossFlows,open("./placement/flows", 'wb'))
    
print("----itertions for minCrossFlow----")
print(minCrossFlows)
print("--------")

Processing:   0%|          | 0/1 [00:00<?, ?it/s]

In [None]:
first = pickle.load(open("./placement/data" + str(0),'rb'))
first.entity

In [None]:
flows = pickle.load(open("./placement/flows",'rb'))
flows