In [7]:
import numpy as np
from scipy.linalg import block_diag
from sympy import Matrix
from functools import reduce

### Generate Combination Matrix

In [8]:
def generate_combination_matrices():
    """
    Output to modeldata.py
    ----------------------
    Data.inbound_combination_matrices: dict
        Dictionary of block diagonal matrices containing the production
        efficiency of a factory {product: associated matrix}. Each
        product corresponds to a matrix with:
            - Columns: ∑|F| (total number of factories across all products)
            - Rows: # Factories (total number of factories)

    Data.outbound_combination_matrices: dict
        Dictionary of block diagonal matrices to apply outbound constraints
        on a per-factory basis. {product: associated matrix}. Each product
        corresponds to a matrix with:
            - Column: ∑|FxC| (total number of factories x customers
            across all products)
            - Rows: # Factories (total
            number of factories)
    """

    # Verify inputs type
    assert isinstance(
        Data.efficiency_per_product,
        dict), 'Efficiency per product must be given in a dictionary'
    assert isinstance(Data.factory_names,
                      dict), 'Factory names must be given in a dictionary'

    # Verify inputs values
    assert len(Data.factory_list
               ) > 0, 'Number of factories to optimize must be positive'
    assert np.all(np.hstack(list(Data.efficiency_per_product.values())) > 0
                  ), 'Efficiency must be positive'
    assert np.all(
        np.array([len(prod) for prod in Data.factory_names.values()]) <= len(
            Data.factory_list)), 'There are more factory names than allowed'

    # Build the combination matrix
    """
    Starting from the inside out, we first iterate over all the product. 
    For a given product "p", we put the corresponding efficiency value  
    in the place of the factories that produce the product and [] in the 
    place of factories that don't. Then, we form each product into a block 
    using the block_diag function. After which, we build the matrix using 
    block_diag on all the previous blocks. Lastly, add a negative sign per
    the model.
    """

    # Generator to iterate over efficiency elements
    efficiency_array = (
        eff_arr
        for eff_arr in np.hstack(list(Data.efficiency_per_product.values())))

    Data.inbound_combination_matrix = -block_diag(*[
        block_diag(*block) for block in [[
            next(efficiency_array)
            if factory in Data.factory_names[product] else []
            for factory in Data.factory_list
        ] for product in Data.product_list]
    ])
    """
    Starting from the inside out, we first iterate over all the product. 
    For a given product "p", we put a list [1 * #customer buying "p"] 
    in the place of the factories that product and [] in the place of 
    factories that don't. Then, we form each product into a block using
    the block_diag function. After which, we build the matrix using 
    block_diag on all the previous blocks.
    """

    Data.outbound_combination_matrix = block_diag(*[
        block_diag(*block)
        for block in [
            [
                [1] * Data.customer_sizes[product]
                if factory in Data.factory_names[product] else []
                for factory in Data.factory_list]
            for product in Data.product_list
        ]
   ])

    # Verify matrix dimension
    # Inbound combination matrix dimension == (Σ|C|, Σ#Fx#P)
    assert Data.inbound_combination_matrix.shape == (
        len(Data.factory_list) * len(Data.product_list), Data.dimF
    ), 'Dimension of inbound combination matrix is incorrect (Σ|C|, Σ#Fx#P)'

    # Outbound combination matrix dimension == (Σ|FxC|, Σ#Fx#P)
    assert Data.outbound_combination_matrix.shape == (
        len(Data.factory_list) * len(Data.product_list),
        Data.dimFC), 'Dimension of combination matrix is incorrect (Σ|FxC|, ' \
                     'Σ#Fx#P)'

    # Split the matrix into dictionary
    # Inbound combination dictionary
    Data.inbound_combination_matrices = dict(
        zip(
            Data.product_list,
            np.split(Data.inbound_combination_matrix,
                     len(Data.product_list),
                     axis=0)))

    # Outbound combination dictionary
    Data.outbound_combination_matrices = dict(
        zip(
            Data.product_list,
            np.split(Data.outbound_combination_matrix,
                     len(Data.product_list),
                     axis=0)))

### Combination Unit Test

In [9]:
class Data:
    pass

In [10]:
class DataTest:
    """Class for generating random test inputs and verify that the combination
    matrix is constructed correctly"""

    no_products = None
    customer_names = None
    no_factories = None
    no_customers = None

    @classmethod
    def generate_random_inputs(cls):
        """Generate random Data inputs"""

        cls.no_customers = np.random.randint(1, 20)
        cls.no_factories = np.random.randint(1, 10)
        cls.no_products = np.random.randint(1, 10)

        if cls.no_customers == 1:
            cls.customer_names = dict(
                zip(range(cls.no_products), [np.array([0])] * cls.no_products))
        else:  # Allow for one customer
            cls.customer_names = {
                prod: np.sort(
                    np.random.choice(np.arange(cls.no_customers),
                                     size=np.random.randint(
                                         1, cls.no_customers),
                                     replace=False))
                for prod in range(cls.no_products)
            }

        if cls.no_factories == 1:
            Data.factory_names = dict(
                zip(range(cls.no_products), [np.array([0])] * cls.no_products))
        else:  # Allow for one factory
            Data.factory_names = {
                prod: np.sort(
                    np.random.choice(np.arange(cls.no_factories),
                                     size=np.random.randint(
                                         1, cls.no_factories),
                                     replace=False))
                for prod in range(cls.no_products)
            }

        # Inputs from Data
        Data.factory_list = list(range(cls.no_factories))
        Data.product_list = list(range(cls.no_products))

        Data.customer_sizes = {
            prod: len(cls.customer_names[prod])
            for prod in range(cls.no_products)
        }

        Data.factory_sizes = {
            prod: len(Data.factory_names[prod])
            for prod in range(cls.no_products)
        }

        Data.efficiency_per_product = dict(
            zip(range(cls.no_products), [
                np.random.uniform(0.8, 1, Data.factory_sizes[prod])
                for prod in range(cls.no_products)
            ]))

        Data.dimF = sum(Data.factory_sizes.values())
        Data.dimC = sum(Data.customer_sizes.values())

        Data.dimFC = sum([
            Data.factory_sizes[prod] * Data.customer_sizes[prod]
            for prod in range(cls.no_products)
        ])

    @classmethod
    def alternative_combination_matrix(cls):
        """Construct the combination matrix in an alternative method"""
        
        # Alternative Inbound Combination Matrix, 
        # Here we start by choosing out the appropriate index and 
        # replace those elements with the efficiency values
        DataTest.inbound_combination_matrix = np.zeros(
            (cls.no_factories * cls.no_products,
             cls.no_factories * cls.no_products))

        matrix_index = np.hstack([
            cls.no_factories * prod + Data.factory_names[prod]
            for prod in Data.product_list
        ])

        DataTest.inbound_combination_matrix[matrix_index, matrix_index] = np.hstack(
            list(Data.efficiency_per_product.values()))

        DataTest.inbound_combination_matrix = -DataTest.inbound_combination_matrix[:, ~np.all(
            DataTest.inbound_combination_matrix == 0, axis=0)]

        # Alternative Outbound Combination Matrix
        # Here we just iterate over all the products
        # and find out which factories produce and which 
        # don't. Then we either put [1, ..., 1] or [] as
        # appropriate 

        block_list = []

        for prod in Data.product_list:

            subblock_list = []

            for fac in Data.factory_list:

                if fac in Data.factory_names[prod]:
                    subblock_list.append([1] * Data.customer_sizes[prod])
                else:
                    subblock_list.append([])

            block_list.append(block_diag(*subblock_list))

        DataTest.outbound_combination_matrix = block_diag(*block_list)

In [12]:
# Generate Random Inputs
DataTest.generate_random_inputs()

# Generate the combination matrix
generate_combination_matrices()

# Generate the alternative combination matrix
DataTest.alternative_combination_matrix()

In [13]:
# Check inbound equality
np.array_equal(Data.inbound_combination_matrix, DataTest.inbound_combination_matrix)

# Check outbound equality
np.array_equal(Data.outbound_combination_matrix, DataTest.outbound_combination_matrix)

True

### Generate Capacity Matrix

In [42]:
def generate_capacity_matrix():
    """
    Output to modeldata.py
    ----------------------
    Data.inbound_capacity_matrix: numpy.ndarray
        All zero matrix representing the inbound 
        section of the joint capacity constraints.
        #Rows: Depends on the number constraints and the 
        intersection of factories in each of the joint combination
        #Columns: ∑|F| (total number of factories across all products)
        
    Data.outbound_capacity_matrix: numpy.ndarray
        Sum of the outbound combination matrices, use to represent the 
        joint constraints on capacity of factories. For example, to
        limit the capacity of bulk + bag, we add the bulk matrix to
        the bag matrix.
        #Rows: Depends on the number constraints and the 
        intersection of factories in each of the joint combination,
        but will be the same as the inbound capacity matrix.
        #Columns: ∑|FxC| (total number of factories x customers 
        across all products)

    Data.capacity_matrix: numpy.ndarray
        Capacity matrix to realize the factories' production capacity,
        made by concatenating the inbound and outbound capacity matrix.
        
    """

    # Verify inputs types
    assert isinstance(Data.capacity_constraints,
                      list), 'Capacity constraints must be in a list'

    assert isinstance(
        Data.inbound_combination_matrices,
        dict), 'Inbound combination matrices must be in a dictionary'

    assert isinstance(
        Data.outbound_combination_matrices,
        dict), 'Outbound combination matrices must be in a dictionary'

    # Verify for the case cap_cons = []
    assert len(Data.capacity_constraints
               ) > 0, 'At least one capacity constraints must be defined'

    # Verify for the case cap_cons = [[1, 2], []] (The [] is not allowed)
    assert np.all(
        np.array([len(cons) for cons in Data.capacity_constraints]) > 0
    ), 'Capacity constraints cannot be empty'

    # Verify for the case caps_cons = [[1, 2], [1, 2]] (the [1, 2] cannot repeat)
    # and also that order doesn't matter ([1, 2] is equivalent to [2, 1])
    orderless_capacity_constraints = [
        sorted(cons) for cons in Data.capacity_constraints
    ]

    assert len({
        tuple(cons)
        for cons in orderless_capacity_constraints
    }) == len(orderless_capacity_constraints
              ), 'Capacity constraints (in any order) cannot repeat'

    # Verify for the case cap_cons = [[1, 2], [0, 0]] (the [0, 0] is not allowed)
    assert np.all(
        np.array([len(set(cons)) for cons in Data.capacity_constraints]) ==
        np.array([len(cons) for cons in Data.capacity_constraints])
    ), 'A single product combination cannot appear more than once in one constraints'

    # Verify that the capacity combinations are in the original product_list
    assert set(reduce(lambda a, b: a + b, Data.capacity_constraints)).issubset(
        set(Data.product_list)), (
            'Capacity combinations list not valid. ' +
            'Some products are not in the defined product list')

    # Build the capacity matrix
    # First build the outbound matrix by adding up all the capacity constraints
    Data.outbound_capacity_matrix = np.vstack([
        np.sum(
            [Data.outbound_combination_matrices[prod] for prod in combination],
            axis=0) for combination in Data.capacity_constraints
    ])
    # Strip away all-zeros rows
    Data.outbound_capacity_matrix = Data.outbound_capacity_matrix[
        ~np.all(Data.outbound_capacity_matrix == 0, axis=1)]

    # Then we build the all zero inbound matrix with the same number of rows
    # as the outbound matrix
    Data.inbound_capacity_matrix = np.zeros(
        (Data.outbound_capacity_matrix.shape[0], Data.dimF))

    # Verify output dimensions
    # Find the number of distinct factories over all the capacity constraints
    row_dim = sum([
        len(
            reduce(lambda a, b: a.union(b),
                   [set(Data.factory_names[prod]) for prod in cons]))
        for cons in Data.capacity_constraints
    ])

    # Outbound capacity dimension == (row_dim, Σ|FxC|)
    assert Data.outbound_capacity_matrix.shape == (
        row_dim, Data.dimFC
    ), 'Dimension of outbound capacity matrix is incorrect (row_dim, Σ|FxC|)'

In [43]:
Data.capacity_constraints = [list(cons) for cons in {tuple(sorted(np.random.choice(Data.product_list,
                     size=np.random.randint(1,
                                            len(Data.product_list) + 1),
                     replace=False).tolist())) for i in range(20)}]

print(Data.capacity_constraints)

generate_capacity_matrix()

Data.outbound_capacity_matrix.shape

[[0, 1], [1, 3], [2], [0], [1, 2], [0, 1, 3], [1, 2, 3], [0, 2, 3], [0, 3], [2, 3], [1], [0, 2], [0, 1, 2, 3], [0, 1, 2], [3]]


(24, 4)

### Unit Testing

In [49]:
class DataTest:
    """Class for generating random test inputs and verify that the capacity
    matrix is constructed correctly"""
    @classmethod
    def generate_random_inputs(cls):
        """Generate random Data inputs"""

        cls.no_customers = np.random.randint(1, 20)
        cls.no_factories = np.random.randint(1, 10)
        cls.no_products = np.random.randint(1, 10)

        if cls.no_customers == 1:
            cls.customer_names = dict(
                zip(range(cls.no_products), [np.array([0])] * cls.no_products))
        else:  # Allow for one customer
            cls.customer_names = {
                prod: np.sort(
                    np.random.choice(np.arange(cls.no_customers),
                                     size=np.random.randint(
                                         1, cls.no_customers),
                                     replace=False))
                for prod in range(cls.no_products)
            }

        if cls.no_factories == 1:
            Data.factory_names = dict(
                zip(range(cls.no_products), [np.array([0])] * cls.no_products))
        else:  # Allow for one factory
            Data.factory_names = {
                prod: np.sort(
                    np.random.choice(np.arange(cls.no_factories),
                                     size=np.random.randint(
                                         1, cls.no_factories),
                                     replace=False))
                for prod in range(cls.no_products)
            }

        # Inputs from Data
        Data.factory_list = list(range(cls.no_factories))
        Data.product_list = list(range(cls.no_products))

        Data.customer_sizes = {
            prod: len(cls.customer_names[prod])
            for prod in range(cls.no_products)
        }

        Data.factory_sizes = {
            prod: len(Data.factory_names[prod])
            for prod in range(cls.no_products)
        }

        Data.efficiency_per_product = dict(
            zip(range(cls.no_products), [
                np.random.uniform(0.8, 1, Data.factory_sizes[prod])
                for prod in range(cls.no_products)
            ]))

        Data.dimF = sum(Data.factory_sizes.values())
        Data.dimC = sum(Data.customer_sizes.values())

        Data.dimFC = sum([
            Data.factory_sizes[prod] * Data.customer_sizes[prod]
            for prod in range(cls.no_products)
        ])

        Data.capacity_constraints = [
            list(cons) for cons in {
                tuple(
                    sorted(
                        np.random.choice(Data.product_list,
                                         size=np.random.randint(
                                             1,
                                             len(Data.product_list) + 1),
                                         replace=False).tolist()))
                for i in range(20)
            }
        ]

    @classmethod
    def alternative_capacity_matrix(cls):
        """Construct the capacity matrix in an alternative method"""

        # Alternative Outbound capacity Matrix
        # Here we just iterate over all the products
        # and find out which factories produce and which
        # don't. Then we either put [1, ..., 1] or [] as
        # appropriate

        block_list = []

        for prod in Data.product_list:

            subblock_list = []

            for fac in Data.factory_list:

                if fac in Data.factory_names[prod]:
                    subblock_list.append([1] * Data.customer_sizes[prod])
                else:
                    subblock_list.append([])

            block_list.append(block_diag(*subblock_list))

        DataTest.outbound_capacity_matrix = block_diag(*block_list)

In [56]:
Data.capacity_constraints = [list(cons) for cons in {tuple(sorted(np.random.choice(Data.product_list,
                     size=np.random.randint(1,
                                            len(Data.product_list) + 1),
                     replace=False).tolist())) for i in range(20)}]

print(Data.capacity_constraints)

generate_capacity_matrix()

Data.outbound_capacity_matrix.shape

[[1, 2], [0, 1, 2], [0, 1, 3], [1, 2, 3], [0, 3], [2, 3], [0, 2], [0, 1, 2, 3], [0], [1, 3]]


(18, 4)

In [67]:
for cons in Data.capacity_constraints[:2]:
    
    block_list = []
    
    # Iterate over the product    
    for prod in cons:

        subblock_list = []

        # Iterate over the factory the list
        for fac in Data.factory_list:

            if fac in Data.factory_names[prod]:
                subblock_list.append([1] * Data.customer_sizes[prod])
            else:
                subblock_list.append([])

        block_list.append(block_diag(*subblock_list))

DataTest.outbound_capacity_matrix = block_diag(*block_list)