In [2]:
from google.colab import drive
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [3]:
%cd /content/drive/MyDrive/Colab Notebooks/planning_optimization/Planning_optimization

/content/drive/MyDrive/Colab Notebooks/planning_optimization/Planning_optimization


In [4]:
!pip install ortools

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/


In [5]:
!pip install numpyencoder

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/


In [4]:
from ortools.sat.python import cp_model
import time
import json
from numpyencoder import NumpyEncoder

In [5]:
def import_data(input_file):
    with open(input_file) as f:
        data = json.load(f)
        return data

In [6]:
def CP(data, time_limit):
    start = time.time()
    model = cp_model.CpModel()
    
    start_day = min(data['s'])
    end_day = max(data['e'])
    x = {}
    # x[i,j]: canh dong i se duoc thu hoach vao ngay thu j
    for i in range(data['N']):
        for j in range(start_day, end_day + 1):
            x[i,j] = model.NewIntVar(0, 1, 'x[%s,%s]'%(str(i), str(j)))
    y = {}
    # y[j]: ngay thu j co thu hoach bat ki canh dong nao hay khong
    for j in range(start_day, end_day + 1):
        y[j] = model.NewIntVar(0, 1, 'y[%s]'%(str(j)))
    
    product = {}
    # product[j]: san luong thu hoach ngay thu j
    for j in range(start_day, end_day + 1):
        product[j] = model.NewIntVar(0, data['M'], 'day[%s]'%(str(j)))

    max_product = model.NewIntVar(0, data['M'], 'max_product')
    min_product = model.NewIntVar(0, data['M'], 'min_product')

    # Moi canh dong chi thu hoach mot ngay, mot lan
    for i in range(data['N']):
        model.Add(sum(x[i,j] for j in range(start_day, end_day + 1)) == 1)
    # Canh dong i phai thu hoach trong khoang [si, ei]
    for i in range(data['N']):
        model.AddLinearConstraint(sum(x[i,j]*j for j in range(start_day, end_day + 1))\
                                  , data['s'][i], data['e'][i])
    # Ngay j thu hoach san luong bang product[j]
    for j in range(start_day, end_day + 1):
        model.Add(product[j] == sum(x[i,j]*data['d'][i] for i in range(data['N'])))
    # Neu product[j] != 0 thi prodcut[j] phai thuoc [m, M]
    for j in range(start_day, end_day + 1):
        b = model.NewBoolVar('b')
        model.Add(product[j] != 0).OnlyEnforceIf(b)
        model.Add(product[j] == 0).OnlyEnforceIf(b.Not())
        model.Add(product[j] >= data['m']).OnlyEnforceIf(b)
    # Kiem tra ngay j co thu hoach hay khong
    for j in range(start_day, end_day + 1):
        model.AddMaxEquality(y[j], [x[i,j] for i in range(data['N'])])

    # max_product, min_product
    model.AddMaxEquality(max_product, [product[j] for j in range(start_day, end_day + 1)])
    model.AddMinEquality(min_product, [(product[j] + data['M']*(1 - y[j])) for j in range(start_day, end_day + 1)])


    # Ham muc tieu
    model.Minimize(max_product - min_product)

    solver = cp_model.CpSolver()

    if time_limit:
        solver.parameters.max_time_in_seconds = 120
    status = solver.Solve(model)
    
    if status == cp_model.OPTIMAL or status == cp_model.FEASIBLE:
        solution = {}
        solution["obj"] = solver.ObjectiveValue()
        solution["time"] = time.time() - start
        solution["field"] = []
        for i in range(data["N"]):
            for j in range(start_day, end_day + 1):
                if solver.Value(x[i,j]) == 1:
                    solution["field"].append(j)
                    break
    else:
        solution = {}
        solution["obj"] = "No Solution"
        solution["time"] = time.time() - start
    return solution

In [7]:
def export(output_file, solution):
    if len(solution) == 3:
        with open(output_file, "w+") as f:
            json.dump({"Time": solution["time"], "Result": solution["obj"],
                      "Solution": solution["field"]}, f, cls=NumpyEncoder)
    else:
        with open(output_file, "w+") as f:
            json.dump({"Time": solution["time"], "Result": solution["obj"]}, f, cls=NumpyEncoder)


In [8]:
def process(input_file, output_file, time_limit):
    data = import_data(input_file)
    solution = CP(data, time_limit)
    export(output_file, solution)

TEST CONSTRAINT PROGRAMMING

In [9]:
import os
import glob

In [None]:
for path in glob.glob("data/data_v2/HomologousField/**.json"):
    type, name = path.split("/")[-2:]
    print("Type: ", type, " | Name: ", name)
    input_file = f"data/data_v2/{type}/{name}"
    output_file = f"results/data_v2/{type}/constraint_programming/result_{name}"
    time_limit = True
    process(input_file, output_file, time_limit)

Type:  HomologousField  | Name:  sample_1000_10_50.json
Type:  HomologousField  | Name:  sample_1000_40_85.json
Type:  HomologousField  | Name:  sample_1000_50_100.json
Type:  HomologousField  | Name:  sample_2000_10_50.json


In [None]:
type = "HomologousField"
name = "sample_2000_50_100.json"
input_file = f"data/data_v2/{type}/{name}"
output_file = f"results/data_v2/{type}/constraint_programming/result_{name}"
time_limit = True
try:
  process(input_file, output_file, time_limit)
except Exception as e:
  print(e)

In [None]:
!pwd

/content
