# Locating Warehouses to Minimize Costs Case 1 - No Uncertainty

This notebook demonstrates how to use decision optimization to solve a common business problem in supply chain design: where should a company locate its distribution warehouses in order to minimize its supply costs. Typically, such a problem arises as part of an annual planning process in which the company forecasts the sales of its products at various retail stores it supplies and decides how to configure its distribution network to meet those demands. This notebook is the first of a series which considers several cases of this business problem. This case addresses the situation in which the decision maker has no uncertainty about those demands.
<p>
In this notebook, you will learn how to set up the optimization problem using IBM's OPL modeling language and how to solve it using IBM's Decision Optimization on Cloud service. The notebook also shows you how access data from a source in IBM's Object Store service and how to use Apache Spark to manage the data input to and output from the optimization service. In addition, the notebook shows how to visualize the data and solution on a map.
<p>
>This notebook is part of [IBM Decision Optimization on Cloud service with the Python Client ](https://developer.ibm.com/docloud/documentation/docloud/python-api/).

>You will need a valid subscription to Decision Optimization on Cloud ([here](https://developer.ibm.com/docloud)). 

Some familiarity with Python is recommended. This notebook runs on Python 2 with Spark 2.0.

## Table of Contents

- [The Business Problem](#The-Business-Problem)
- [The Data](#The-Data)
    - [Accessing the Data in the Object Store](#Accessing-the-Data-in-the-Object-Store)
- [The Application Data Model and the OPLCollector Class](#The-Application-Data-Model-and-the-OPLCollector-Class)
    - [The OPLCollector Class](#The-OPLCollector-Class)
    - [The Application Data Model for the Warehouse Location Application](#The-Application-Data-Model-for-the-Warehouse-Location-Application)
    - [Loading Data into the Application Data Model](#Loading-Data-into-the-Application-Data-Model)
- [Visualizing the Input Data and the Maps Class](#Visualizing-the-Input-Data-and-the-Maps-Class)
    - [The Maps Class](#The-Maps-Class)
    - [Visualizing the Input Data](#Visualizing-the-Input-Data)
- [The Decision Model and the Optimizer Class](#The-Decision-Model-and-the-Optimizer-Class)
    - [The OPL Model](#The-OPL-Model)
    - [The Optimizer Class](#The-Optimizer-Class)
- [Solving the Warehousing Model](#Solving-the-Warehousing-Model)
    - [Get Your Credentials for IBM Decision Optimization on Cloud](#Get-Your-Credentials-for-IBM Decision-Optimization-on-Cloud)
    - [Setting Up and Submit the Solve Job](#Setting-Up-and-Submit-the-Solve-Job)
    - [Retrieving the Optimal Solution](#Retrieving-the-Optimal-Solution)
    - [Visualizing the Results on a Map](#Visualizing-the-Results-on-a-Map)
- [Summary](#Summary)
- [Author](#Author)
- [References](#References)


### A Note on the Code

This notebook introduces three substantial code blocks, the classes <code>OPLCollector</code>, <code>Optimizer</code>, and <code>Maps</code>. The first two are general purpose APIs. <code>OPLCollector</code> manages the data for an optimization application using Apache Spark.  <code>Optimizer</code> manages interactions with the IBM Decision Optimization on Cloud solver. These two are general purpose APIs for optimization, not tied to this Warehousing example, that do not need to be customized. The notebook shows how to configure them through their public methods. You do not need to understand how the underlying code works in order to use them, and you can copy them into other notebooks to use them for other optimization problems. They have been placed into hidden cells to streamline the presentation of this example, but you can see them by going into edit mode if you want to know about their internal logic.

The third code block, <code>Maps</code>, provides capabilities to visualize the data for the Warehousing example on maps using Folium. Unlike the first two, this one is specific to the warehousing problem, although it can be used without modification for other instances of this problem with different data. It could serve as a template for visualizing other geographically-based optimization problems, but you would probably have to modify the code. It has also been placed into a hidden cell to streamline the presentation of this example, but you can see it by going into edit mode if you want to know about its internal logic.

## The Business Problem

A consumer packaged goods supplier needs to decide where to locate its warehouses to serve a set of retail stores at different locations. At the same time, it also needs to determine how much capacity each warehouse should have. The cost of opening a warehouse has a fixed component, related to the acquisition of land and designing the facility, and a variable component proportional to the capacity of the warehouse. The cost to ship the goods from a warehouse to a store depends on the distance between them. The objective is to minimize the cost of opening the warehouses and shipping the goods. Such an optimization application would typically be used as part of an annual planning process in which the company’s management would decide on sales targets and the capital investments needed to support them. This notebook assumes that all data, in particular the demands at the stores, are known with certainty before the decisions to open the wareshouses are made. The case 2 notebook addresses the case in which the demands are uncertain when the warehouse decisions are made.

## The Data

The data for the warehouse location problem consist of two JSON files. The first contains the characteristics of the distribution network, the potential warehouse locations and their capital costs, the store locations, and the transportation routes between the stores and the warehouses and their shipping costs. The second contains the demands at each store. These two constitute all the data required to optimize the warehouse network.
- <code>Warehousing-data.json</code> contains the <code>warehouses</code>, <code>stores</code>, <code>routes</code>, and <code>mapCoordinates</code>
- <code>Warehousing-sales_data-nominal_scenario.json</code> contains the <code>scenarios</code> and  <code>demands</code></li>

Extracts of these two files are displayed below:

In [1]:
Warehousing_data_json='''
{
  "routes" : [ {
    "location" : "Brockton, MA",
    "store" : "Malden, MA",
    "shippingCost" : 42.94
  }, {
    "location" : "Brockton, MA",
    "store" : "Medford, MA",
    "shippingCost" : 41.43
  }, ...
  } ],
  "stores" : [ {
    "storeId" : "Malden, MA"
  }, {
    "storeId" : "Medford, MA"
  }, {
    "storeId" : "Quincy, MA"
  }, ...
  } ],
  "warehouses" : [ {
    "location" : "Brockton, MA",
    "fixedCost" : 550000.0,
    "capacityCost" : 148.0
  }, {
    "location" : "Bristol, CT",
    "fixedCost" : 600000.0,
    "capacityCost" : 148.0
  }, ...
  } ],
  "mapCoordinates" : [ {
    "location" : "Malden, MA",
    "lon" : -71.06,
    "lat" : 42.42
  }, ...
  } ]
}
'''

In [2]:
Warehousing_sales_data_json='''
{
  "demands" : [ {
    "store" : "Malden, MA",
    "scenarioId" : "Nominal",
    "amount" : 104.0
  }, {
    "store" : "Medford, MA",
    "scenarioId" : "Nominal",
    "amount" : 50.0
  }, ...
  } ],
  "scenarios" : [ {
    "id" : "Nominal",
    "totalDemand" : 18065.0,
    "periods" : 1
    } ]  
}
'''

### Accessing the Data in the Object Store

In order to access the data, you need to use the "Insert to Code" function of DSX. This creates the credentials object called <code>credentials_1</code> in the following cell. The credentials are used in calls to the <code>getFromObjectStorage</code> method in the <code>OPLCollector</code> object described below. 
<p>
Make sure to rename the credentials object if it does not come out as <code>credentials_1</code>. You only need to create one credentials object for one of the files in the object store; it can be used to retreive all the other files using the <code>getFromObjectStorage</code> method by specifying its <code>filename=</code> and /or <code>container=</code> keyword parameters.

In [3]:
# @hidden_cell
credentials_1 = {
}

## The Application Data Model and the OPLCollector Class

The application data model is the schema of the data input to and output from the optimization. It is typically realized as the table schema of a relational database, and it has a corresponding representation as the tuple structure in the optimization model. In order to work with the application data, we use a couple of objects (one for input, the other for output) of a class called the <code>OPLCollector</code>. These objects hold the data as Spark datasets. 
<p>
The <code>OPLCollector</code> class itself does not require customization for each application. Instead, it is configured by specifying the schemas of the tables it contains, using a builder method, as will be shown below. The schemas themselves are instances of the Spark <code>StructType</code> class. The design of the <code>OPLCollector</code> class minimizes the amount of custom coding required to build an optimization-based application. (Note that, despite its name, <code>OPLCollector</code> has no dependence on the OPL modeling language and can be used with the DOCplex Python modeling language as well.) Here is the Python code for <code>OPLCollector</code> and some related functions (go to edit mode to see the contents of this hidden cell).

### The OPLCollector Class

In [4]:
# @hidden_cell
'''
Created on Feb 8, 2017

@author: bloomj
'''
import sys
import os
import json
import requests

try:
    from pyspark.sql import SparkSession
    from pyspark.sql import Row
    from pyspark.sql.types import StructType, StructField, StringType, IntegerType, DoubleType
except ImportError as e:
    print ("Error importing Spark Modules", e)
    sys.exit(1)

SPARK_CONTEXT = sc #sc is predefined
SQL_CONTEXT = sqlContext #sqlContext is predefined
SPARK_SESSION = SparkSession(SPARK_CONTEXT)

class OPLCollector(object):
    '''
    Represents an OPL data model in Spark.
    Note: Use of this class does not depend on OPL, and in particular, it can be used with the DOcplex Python API.
    An application data model (ADM) consists of a set of tables (OPL Tuplesets), each with its own schema.
    An ADM is represented by a dictionary in which the keys are the table names and the values are the table schemas.
    A builder is provided to create the ADM.

    The OPLCollector holds the actual data in Spark Datasets. There are several ways to populate
    the data.
    - Spark SQL operations can transform tables into other tables.
    - A builder is provided when the data is generated programmatically.
    - JSON deserialization and serialization are provided when data is exchanged with external applications or stores.

    The design of the OPLCollector class aims to reduce the amount of data that must be
    manipulated outside of Spark. Where possible, data is streamed among applications without
    creating auxiliary in-memory structures or files.

    The design of OPLCollector also aims to minimize the amount
    of custom coding required to build an application. Collectors are configured
    by specifying their schemas through builders rather than by extending with subclasses.
    '''

    def __init__(self, collectorName, applicationDataModel={}, sparkData={}):
        '''
        Creates a new OPLCollector instance.

        :param collectorName: the name of this collector.
        :type collectorName: String
        :param applicationDataModel: holds the table schemas for this collector. Each schema is a Spark StructType.
        Note that each collector has one and only one application data model.
        :type applicationDataModel: dict<String, StructType>
        :param sparkData: holds the actual data tables of this collector as a set of Spark datasets.
        :type sparkData: dict<String, Dataframe>
        '''
        self.name = collectorName
        self.applicationDataModel = applicationDataModel
        self.sparkData = sparkData
        self.size = {name: None for name in applicationDataModel.keys()}
        self.jsonDestination = None
        self.jsonSource = None

    def copy(self, name):
        """
        Creates a new OPLCollector instance with copies of the application data model and Spark datasets of this collector.
        The ADM copy is immutable. The Spark datasets themselves are immutable, but the copy supports the addTable, addData, and replaceTable methods.  
        Does not copy the JSONSource or JSONDestination fields.
         
        :param name of the new collector 
        :param tableNames tables to be copied (all tables in this collector, if absent)
        :return a new OPLCollector instance
        """
        result= OPLCollector(name, self.applicationDataModel, self.sparkData.copy())
        result.size= self.size.copy()
        return result
    
    def copy(self, name, *tables):
        result= OPLCollector(name)
        admBuilder= ADMBuilder(result);
        for table in tables:
            admBuilder.addSchema(table, self.getSchema(table))
        admBuilder.build()
        dataBuilder= DataBuilder(result.applicationDataModel, collector=result)
        for table in tables:
            dataBuilder.addTable(table, self.getTable(table))
            result.size[table]= self.size[table]
        dataBuilder.build();
        return result
  
    def getName(self):
        """
        Returns the name of this collector.
         
        :return collector name as a string
        """
        return self.name

    def addTables(self, other):
        """
        Adds a set of tables of data from another collector.
        An individual table can be set only once.

        :param other: another collector
        :type other: OPLCollector
        :raise ValueError: if the other ADM is empty or if a table name duplicates a name already present in this collector.
        """

        if not other.applicationDataModel:  # is empty
            raise ValueError("empty collector")
        for tableName in other.applicationDataModel.viewkeys():
            if tableName in self.applicationDataModel:
                raise ValueError("table " + tableName + " has already been defined")
        self.applicationDataModel.update(other.applicationDataModel)
        self.sparkData.update(other.sparkData)
        self.size.update(other.size)
        return self

    def addTable(self, tableName, table, size=None):
        """
        Adds an individual table of data.
        A table can be added only once.

        :param tableName:
        :type String
        :param table:
        :type Spark Dataframe
        :param size: number of rows in table (None if omitted)
        :return: this collector
        :raise ValueError: if table name duplicates a name already present in this collector.
        """
        if tableName in self.applicationDataModel:
            raise ValueError("table " + tableName + " has already been defined")
        self.applicationDataModel[tableName] = table.schema()
        self.sparkData[tableName] = table
        self.size[tableName] = size
        return self

    def replaceTable(self, tableName, table, size=None):
        """
        Replaces an individual table of data.

        :param tableName:
        :type String
        :param table:
        :type Spark Dataframe
        :param size: number of rows in table (None if omitted)
        :return: this collector
        :raise ValueError: if the table is not already defined in the ADM
        """
        if tableName not in self.applicationDataModel:
            raise ValueError("table " + tableName + "has not been defined")
        self.sparkData[tableName] = table
        if size is not None:
            self.size[tableName] = size
        else:
            self.size[tableName] = table.count()
        return None

    def addData(self, tableName, table, size=None):
        """
        Adds data to an existing table.
        Use when a table has several input sources.
        Does not deduplicate the data (i.e. allows duplicate rows).

        :param tableName:
        :type String
        :param table:
        :type Spark Dataframe
        :param size: number of rows in table (None if omitted)
        :return: this collector
        :raise ValueError: if the table is not already defined in the ADM
        """
        if tableName in self.applicationDataModel:
            raise ValueError("table " + tableName + " has already been defined")
        self.sparkData[tableName] = self.sparkData[tableName].union(table)
        count = (self.size[tableName] + size) if (self.size[tableName] is not None and size is not None) else None
        self.size[tableName] = count
        return self

    def setADM(self, applicationDataModel):
        """
        Sets the application data model for this OPLCollector.
        The ADM cannot be changed once set.
        """
        if (self.applicationDataModel): #is not empty or None
            raise ValueError("ADM has already been defined")
        self.applicationDataModel = applicationDataModel
        return self

    def getTable(self, tableName):
        return self.sparkData[tableName]

    def getSchema(self, tableName):
        return self.applicationDataModel[tableName]

    def selectSchemas(self, *tableNames):
        """
        Returns a subset of the application data model.
        """
        return {tableName: self.applicationDataModel[tableName] for tableName in tableNames}

    def selectTables(self, collectorName, *tableNames):
        """
        Creates a new OPLCollector from a subset of the tables in this collector.
        The tables in the new collector are copies of the tables in the original.
        """
        adm = self.selectSchemas(tableNames)
        data = {tableName: SPARK_SESSION.createDataFrame(self.sparkData[tableName], self.getSchema(tableName))
                for tableName in tableNames}
        size = {tableName: self.size[tableName] for tableName in tableNames}
        return OPLCollector(collectorName, adm, data, size)

    def getSize(self, tableName):
        """
        Returns the number of rows in a table.
        Note: the Spark data set count method is fairly expensive,
        so it is used only if there is no other way to count the number of rows.
        It is best to count the rows as the table is being deserialized, as is done in the fromJSON method.
        Once counted, the number is stored in the size map for future use.
        """
        if tableName not in self.size:
            raise ValueError("size not defined for table " + tableName)
        if self.size[tableName] is None:
            self.size[tableName] = self.sparkData[tableName].count()
        return self.size[tableName]

    def buildADM(self):
        """
        Creates the application data model for this collector
        """
        if(self.applicationDataModel): # is not empty
            raise ValueError("application data model has already been defined")
        return ADMBuilder(self)

    def buildData(self):
        """
        Creates a builder for the data tables for this collector.
        Uses this collector's application data model.

        :return: a new DataBuilder instance
        :raise ValueError: if the application data model has not been defined or if data tables have already been loaded
        """
        if not self.applicationDataModel:  # is empty
            raise ValueError("application data model has not been defined")
        if self.sparkData: # is not empty
            raise ValueError("data tables have already been loaded")
        return DataBuilder(self.applicationDataModel, collector=self)

    def setJsonSource(self, source):
        """
        Sets the source for the JSON text that populates the collector.
        There is a one-to-one correspondence between an OPLCollector instance and its JSON representation;
        that is, the JSON source file must fully include all the data tables to be populated in the collector instance.
        Thus, it makes no sense to have more than on JSON source for a collector or to change JSON sources.

        :param source: a file-like object containing the JSON text.
        :return: this collector instance
        :raise ValueError: if JSON source has already been set
        """
        if self.jsonSource is not None:
            raise ValueError("JSON source has already been set")
        self.jsonSource = source
        return self

    def fromJSON(self):
        """
        Provides a means to create a collector from JSON.
        You must first set the destination (an output stream, file, url, or string) where the JSON will be read.
        Then you call the deserializer fromJSON method.
        The application data model for the collector must already have been created.

        There is a one-to-one correspondence between an OPLCollector instance and its JSON representation;
        that is, the JSON source file must fully include all the data tables to be populated in the collector instance.
        Methods are provided to merge two collectors with separate JSON sources (addTables),
        add a data set to a collector (addTable), and to add data from a data set to an existing table in a collector.

        :return: this collector with its data tables filled
        :raise ValueError: if the data tables have already been loaded
        """
        if self.sparkData:  # is not empty
            raise ValueError("data tables have already been loaded")
        # data: dict {tableName_0: [{fieldName_0: fieldValue_0, ...}, ...], ...}
        data = json.load(self.jsonSource)
        builder= self.buildData()
        for tableName, tableData in data.viewitems():
            count = len(tableData)
            tableRows = (Row(**fields) for fields in tableData)
            builder= builder.addTable(tableName,
                                     SPARK_SESSION.createDataFrame(tableRows),
                                     count) # would like to count the rows as they are read instead,
                                            # but don't see how
        builder.build()
        return self

    def setJsonDestination(self, destination):
        """
        Sets the destination for the JSON serialization.
        Replaces an existing destination if one has been set previously.

        :param destination: an output string, stream, file, or URL
        :return: this collector
        """
        self.jsonDestination = destination
        return self

    def toJSON(self):
        """
        Provides a means to write the application data as JSON.
        You must first set the destination (an output stream, file, url, or string) where the JSON will be written.
        Then you call the serializer toJSON method.
        """
        self.jsonDestination.write("{\n")
        firstTable = True
        for tableName in self.sparkData:
            if not firstTable:
                self.jsonDestination.write(",\n")
            else:
                firstTable = False
            self.jsonDestination.write('"'+tableName+'" : [\n')
            firstRow = True
            for row in self.sparkData[tableName].toJSON().toLocalIterator():
                if not firstRow:
                    self.jsonDestination.write(",\n")
                else:
                    firstRow= False
                self.jsonDestination.write(row)
            self.jsonDestination.write("\n]")
        self.jsonDestination.write("\n}")

    def displayTable(self, tableName, out):
        """
        Prints the contents of a table.

        :param out: a file or other print destination where the table will be written
        """
        out.write("collector: " + self.getName() + "\n")
        out.write("table: " + tableName + "\n")
        self.getTable(tableName).show(self.getSize(tableName), truncate=False)

    def display(self, out):
        """
        Prints the contents of all tables in this collector.

        :param out: a file or other print destination where the tables will be written
        """
        for tableName in self.sparkData:
            self.displayTable(tableName, out)


# end class OPLCollector

def getFromObjectStorage(credentials, container=None , filename=None):
    """
    Returns a stream containing a file's content from Bluemix Object Storage.

    :param credentials a dict generated by the Insert to Code  service of the host Notebook
    :param container the name of the container as specified in the credentials (defaults to the credentials entry)
    :param filename the name of the file to be accessed (note: if there is more than one file in the container, 
    you might prefer to enter the names directly; otherwise, defaults to the credentials entry) 
    """

    if not container:
        container= credentials['container']
    if not filename:
        filename= credentials['filename']

    url1 = ''.join([credentials['auth_url'], '/v3/auth/tokens'])
    data = {'auth': {'identity': {'methods': ['password'],
            'password': {'user': {'name': credentials['username'],'domain': {'id': credentials['domain_id']},
            'password': credentials['password']}}}}}
    headers1 = {'Content-Type': 'application/json'}
    resp1 = requests.post(url=url1, data=json.dumps(data), headers=headers1)
    resp1_body = resp1.json()
    for e1 in resp1_body['token']['catalog']:
        if(e1['type']=='object-store'):
            for e2 in e1['endpoints']:
                        if(e2['interface']=='public'and e2['region']==credentials['region']):
                            url2 = ''.join([e2['url'],'/', container, '/', filename])
    s_subject_token = resp1.headers['x-subject-token']
    headers2 = {'X-Auth-Token': s_subject_token, 'accept': 'application/json'}
    resp2 = requests.get(url=url2, headers=headers2, stream=True)
    return resp2.raw       

class DataBuilder(object):
    """
        Builds the Spark datasets to hold the application data.
        Used when the data are created programmatically.
    """

    def __init__(self, applicationDataModel, collector=None):
        """
        Creates a builder for loading the Spark datasets.

        :param applicationDataModel
        :param collector: if present, loads the data tables and their sizes directly into the collector;
        if not present or null, the Spark data dict is returned directly
        :return: a new DataBuilder instance
        :raise ValueError: if the application data model has not been defined
        """
        if not applicationDataModel:  # is empty
            raise ValueError("application data model has not been defined")
        self.applicationDataModel = applicationDataModel
        self.collector = collector
        self.result = {}
        self.length = {}

    def addTable(self, tableName, data, size=None):
        """
        Get the external data and create the corresponding application dataset.
        Assumes that the schema of this table is already present in the ADM.

        :param data: a Spark dataset
        :param size: length number of rows in table (null if omitted)
        :return this builder instance
        :raise ValueError: if the table is not included in the ADM or if the table has already been loaded
        """
        if tableName not in self.applicationDataModel:
            raise ValueError("table " + tableName + "has not been defined")
        if tableName in self.result:
            raise ValueError("table " + tableName + "has already been loaded")
        self.result[tableName] = data
        self.length[tableName] = size
        return self
    
    def copyTable(self, tableName, data):
        return self.addTable(tableName, SPARK_SESSION.createDataFrame(data.rdd(), 
                             self.applicationDataModel[tableName]));

    def build(self):
        """
        Completes building the Spark data.
        Registers the application data sets as Spark SQL tables.
        If an OPLCollector has been supplied in the constructor, loads the data tables and their sizes into it.

        :return a dict of table names to Spark data sets containing the application data
        :raise ValueError:  if a table in the ADM has no associated data or if data tables have already been loaded into the collector
        """
        for tableName in self.applicationDataModel:
            if tableName not in self.result:
                raise ValueError("table " + tableName + "has no data")
        for tableName in self.result:
            self.result[tableName].createOrReplaceTempView(tableName)
        if self.collector is not None:
            if self.collector.sparkData: #is not empty
                raise ValueError("data tables have already been loaded")
            self.collector.sparkData= self.result
            self.collector.size = self.length
        return self.result

    def retrieveSize(self):
        """
        :return the size dict created by this builder
        Note: calling this method before the build method could return an inaccurate result
        """
        return self.length

# end class DataBuilder

class ADMBuilder(object):
    """
    Builds an Application Data Model that associates a set of Spark Datasets with their schemas.
    Usage:

    adm= ADMBuilder()\
        .addSchema("warehouse", buildSchema(
            ("location", StringType()),
            ("capacity", DoubleType()))\
        .addSchema("route", buildSchema(
            ("from", StringType()),
            ("to", StringType()),
            ("capacity", DoubleType()))\
        .build();
    """

    def __init__(self, collector=None):
        """
        Creates a new builder.
        :param collector if present, loads the application data model directly into the collector;
        if not present or null, the ADM map is returned directly
        """
        self.collector = collector
        self.result = {}

    def addSchema(self, tableName, tupleSchema):
        """
        Adds a new table schema to the ADM.

        :param tupleSchema can be built with the buildSchema function
        :return this builder
        :raise ValueError: if a schema for tableName has already been defined
        """
        if tableName in self.result:
            raise ValueError("tuple schema " + tableName + " has already been defined")
        self.result[tableName] = tupleSchema
        return self

    def build(self):
        """
        Completes building the application data model.
        If an OPLCollector has been supplied in the constructor, loads the ADM into it.

        :return the ADM
        :raise ValueError: if the ADM for the collector has already been defined
        """
        if self.collector is not None:
            if self.collector.applicationDataModel: #is not empty
                raise ValueError("application data model has already been defined")
            self.collector.applicationDataModel = self.result
        return self.result

# end class ADMBuilder

def buildSchema(*fields):
    """
    Creates a schema from a list field tuples
    The resulting schema is an instance of a Spark StructType.
    The fields in the schema are sorted in dictionary order so that they are consistent with the order generated by the JSON deserialization.
    :param fields:
    :type fields: tuple<String, DataType>
    :return:
    :rtype: StructType
    """
    return StructType((StructField(fieldName, fieldType, False, None) for (fieldName, fieldType) in sorted(fields)))
# end buildSchema

### The Application Data Model for the Warehouse Location Application

Using the tools in the <code>OPLCollector</code> class, the application data model (ADM) is defined in three collector objects, each of which has a corresponding representation in OPL and an associated JSON data file. The ADM is defined as follows:

In [5]:
networkDataModel = ADMBuilder()\
    .addSchema("warehouses", buildSchema(
        ("location",     StringType()),
        ("fixedCost",    DoubleType()),
        ("capacityCost", DoubleType())))\
    .addSchema("routes", buildSchema(
        ("location",     StringType()),
        ("store",        StringType()),
        ("shippingCost", DoubleType())))\
    .addSchema("stores", buildSchema(
        ("storeId",      StringType())))\
    .addSchema("mapCoordinates", buildSchema(
        ("location", StringType()),
        ("lon",      DoubleType()),
        ("lat",      DoubleType())))\
    .build()

demandDataModel = ADMBuilder()\
    .addSchema("demands", buildSchema(
        ("store",        StringType()),
        ("scenarioId",   StringType()),
        ("amount",       DoubleType())))\
    .addSchema("scenarios", buildSchema(
        ("id",           StringType()),
        ("totalDemand",  DoubleType()),
        ("periods",      DoubleType())))\
    .build()

resultDataModel= ADMBuilder()\
    .addSchema("objectives", buildSchema(
            ("problem",        StringType()),
            ("dExpr",            StringType()),
            ("scenarioId",   StringType()),
            ("iteration",        IntegerType()),
            ("value",            DoubleType())))\
    .addSchema("openWarehouses", buildSchema(
            ("location",        StringType()),
            ("scenarioId",   StringType()),
            ("iteration",        IntegerType()),
            ("open",            IntegerType()),
            ("capacity",        DoubleType())))\
    .addSchema("shipments", buildSchema(
            ("location",        StringType()),
            ("store",            StringType()),
            ("scenarioId",   StringType()),
            ("iteration",        IntegerType()),
            ("amount",            DoubleType())))\
    .build();
# Note: the "scenarioId" and "iteration" fields are not used in this notebook but are included for use in other contexts.

The corresponding OPL definitions are as follows:

In [6]:
 warehousing_data_dotmod = '''
 //Input data
 
 tuple Warehouse {
 	key string location;
 	float fixedCost;	// $/yr
 	float capacityCost;	// $/pallet/yr
 }
 
 tuple Store {
 	key string storeId; 
 }
 
 tuple Route {
 	key string location;
 	key string store;
 	float shippingCost;	// $/pallet
 }
 
 //Note: the mapCoordinates table is not used in the optimization and so is not sent to the optimizer

 tuple Demand {
 	key string store;
 	key string scenarioId;
 	float amount;		// pallets/period
 }
 
  tuple Scenario {
 	key string id;
 	float totalDemand;
 	float periods; 	//the number of periods per year during which this scenario prevails; periods = scenario probability * total periods/year
 }
 
 //Output data
 
  tuple Objective {
	key string problem;
 	key string dExpr;
 	key string scenarioId;
	key int iteration;
	float value;   
 }
 
 tuple Shipment {
  	key string location;
 	key string store;
 	key string scenarioId;
 	key int iteration;
 	float amount; 
 }
 
 tuple OpenWarehouse {
 	key string location;
 	key string scenarioId;
 	key int iteration;
 	int open;
 	float capacity;		// pallets
 }
 '''

### Loading Data into the Application Data Model

Using the <code>OPLCollector</code> class and its adjuncts, one can read the input data and create the Spark datasets with which to populate the decision model: 

In [7]:
networkDataSource = getFromObjectStorage(credentials_1, filename="Warehousing-data.json")      
demandDataSource =  getFromObjectStorage(credentials_1, filename="Warehousing-sales_data-nominal_scenario.json")

warehousingData= OPLCollector("warehousingData", networkDataModel).setJsonSource(networkDataSource).fromJSON()
warehousingData.addTables(OPLCollector("demandData", demandDataModel).setJsonSource(demandDataSource).fromJSON())

warehousingData.displayTable("warehouses", sys.stdout)

collector: warehousingData
table: warehouses
+------------+---------+------------------+
|capacityCost|fixedCost|location          |
+------------+---------+------------------+
|148.0       |550000.0 |Brockton, MA      |
|148.0       |600000.0 |Bristol, CT       |
|148.0       |600000.0 |Union City, NJ    |
|148.0       |500000.0 |New York, NY      |
|148.0       |500000.0 |Philadelphia, PA  |
|148.0       |550000.0 |Parkville, MD     |
|148.0       |500000.0 |Greensboro, NC    |
|148.0       |500000.0 |Goose Creek, SC   |
|148.0       |450000.0 |Lawrenceville, GA |
|148.0       |550000.0 |Jacksonville, FL  |
|148.0       |450000.0 |Birmingham, AL    |
|148.0       |450000.0 |Memphis, TN       |
|148.0       |500000.0 |Frankfort, KY     |
|148.0       |500000.0 |Akron, OH         |
|148.0       |500000.0 |Dayton, OH        |
|148.0       |500000.0 |West Lafayette, IN|
|148.0       |500000.0 |Taylor, MI        |
|148.0       |400000.0 |Dubuque, IA       |
|148.0       |500000.0 |Beloit,

The data set in this example has 28 potential locations for warehouses, 296 stores, and 8288 shipping routes. You can certainly print the tables comprising this data, using the command
<code>
warehousingData.display(sys.stdout)
</code>
which is commented out the preceding code cell. However, be prepared to look at some very large tables. The next section discusses more effective visualizations.

## Visualizing the Input Data and the Maps Class

Since the Warehouse Location model has an inherently geographic character, maps are the most effective way to visualize the data and the results. The following <code>Maps</code> class, which builds on the folium package https://folium.readthedocs.io/en/latest/index.html, creates maps designed for the warehousing data model. 

The <code>Maps</code> class does not require customization for different instances of the warehousing location model, say for example a supply chain in Europe instead of the United States as in this example, provided that the data follows the schema specified in this notebook. The <code>Maps</code> class does require an additional input dataset, which is otherwise not used for the optimization model, that specifies the geographic coordinates of the warehouse and store locations. The schema of that dataset is included in the <code>warehousingData</code> collector as follows:

<code>
mapCoordinatesDataModel = ADMBuilder()\
            .addSchema("mapCoordinates", buildSchema(
                ("location", StringType()),
                ("lon",      DoubleType()),
                ("lat",      DoubleType())))\
            .build()
</code>
Note: <code>"location"</code> can be either a warehouse location or a store location.

### The Maps Class

Here is the code for the <code>Maps</code> class (go to edit mode to see the contents of this hidden cell).

In [8]:
# @hidden_cell
try:
    import folium
except:
    if hasattr(sys, 'real_prefix'):
        #we are in a virtual env.
        !pip install folium 
    else:
        !pip install --user folium
        
    import folium

from pyspark.sql.types import StringType, IntegerType, DoubleType

class Maps(object):
    """
    Displays a map showing warehouses and shipping routes for the Warehousing application.
    """
    
    def __init__(self, stores=None, warehouses=None, routes=None, routesToShow=None, mapCoordinates=None):
        """
        Creates a new Maps instance.
        Matches the warehouse locations and stores with their map coordinates.
        Creates a base map for adding location and shipment information.
        Note: If your data has more than one scenario, you should select the scenario when creating the dataframes used in this map, e.g.
        routes= warehousingResult.getTable("shipments").select('*').where(shipments.scenarioId == "nominal")
        
        @param stores: a Spark dataframe containing store data (note: must contain a column "store" containing the store locations and may contain other columns as well)
        @param warehouses: a Spark dataframe containing warehouse data (note: must contain a column "location" containing the warehouse locations and may contain other columns as well)
        @param routes: a Spark dataframe containing route data (note: must contain columns "location" and "store" containing the end-point locations and may contain other columns as well)
        @param routesToShow: a Spark dataframe containing (location, store) pairs specifying the subset of routes to show (default=None shows all routes)
        @param mapCoordinates: a Spark dataframe containing the geographic coordinates of the warehouse and store locations
        """
                
        #Match map coordinates with store and warehouse locations      
        self.warehousesWithCoord= warehouses.join(mapCoordinates, "location")
        coord= mapCoordinates.withColumnRenamed("location", "store")
        self.storesWithCoord= stores.join(coord, "store")
                
        if routesToShow is not None:
            selectedRoutes= routes.join(routesToShow, [routes.location == routesToShow.location, routes.store == routesToShow.store] )\
                .select("*")\
                .drop(routesToShow.location).drop(routesToShow.store)
        else:
            selectedRoutes= routes
            
        routesWithCoord= selectedRoutes.join(mapCoordinates, "location")\
            .withColumnRenamed("lon", "locationLon")\
            .withColumnRenamed("lat", "locationLat")
        self.routesWithCoord= routesWithCoord.join(coord, "store")\
            .withColumnRenamed("lon", "storeLon")\
            .withColumnRenamed("lat", "storeLat")
            
        # Determine map center and range
        self.mapCenter= self.storesWithCoord.agg({"lat" : "avg", "lon" : "avg"}).first().asDict()
        self.mapCenter.update(self.storesWithCoord.agg({"lat" : "min", "lon" : "min"}).first().asDict())
        self.mapCenter.update(self.storesWithCoord.agg({"lat" : "max", "lon" : "max"}).first().asDict())     
    
    def getBasicMap(self):
        """
        Returns a basic map with no data displayed.
        Use it to add data markers.
        """
        return folium.Map(location=[self.mapCenter["avg(lat)"], self.mapCenter["avg(lon)"]], 
                          min_lat=self.mapCenter["min(lat)"], max_lat=self.mapCenter["max(lat)"],
                          min_lon=self.mapCenter["min(lon)"], max_lon=self.mapCenter["max(lon)"],
                          zoom_start=4)
        
    @staticmethod
    def makeLabel(label):
        """
        Used in formatCaption method
        
        @param label: a (possibly empty) string
        @return: a string
        """
        if len(label) >0:
            return label+": " 
        else: 
            return label
    
    @staticmethod
    def formatCaption(row, labelColumns, dataColumns):
        """
        Creates a caption for a popup on a map item from a Row in a Spark dataframe
        
        @param row: the current Row in a Spark dataframe
        @param labelColumns: the names of the columns with the identifier information to be displayed in a popup caption 
            (dictionary with string keys representing the label column names and string values representing the labels to use in the caption (the data themselves are strings))
        @param dataColumns: the names of the columns with the data to be displayed in a popup caption 
            (dictionary with string keys representing the data column names and string values representing the labels to use in the caption (the data themselves are numbers))
        @return an html text string
        """
        text= ""
        first= True
        if labelColumns: #is not empty
            for col, label in labelColumns.iteritems():
                if first:
                    text= Maps.makeLabel(label) + row[col]
                    first= False
                else:
                    text= text + "<br />" + Maps.makeLabel(label) + row[col]            
        if dataColumns: #is not empty
            for col, label in dataColumns.iteritems():
                if first:
                    text= Maps.makeLabel(label) + str(row[col])
                    first= False
                else:
                    text= text + "<br />" + Maps.makeLabel(label) + str(row[col])
                    
        return text
    
    def showWarehouses(self, tableMap=None, labelColumns={}, dataColumns={}):
        """
        Displays data from a table of warehouses in markers on a map. The table must include columns for the coordinates of each store.
        
        @param tableMap: the map to which this data is to be added (a folium.Map, which defaults to basicMap)
        @param dataColumns: the names of the columns with the data to be displayed in a popup caption 
            (dictionary with string keys representing the data column names and string values representing the labels to use in the caption (the data themselves are numbers))
        @return tableMap: the map with markers added for the new data
        """
        if tableMap is None:
            tableMap= self.getBasicMap()
            
        table= self.warehousesWithCoord
            
        for r in table.collect():
            row= r.asDict()
            text= Maps.formatCaption(row, labelColumns, dataColumns)
            caption= folium.Popup(folium.element.IFrame(html= text, width=200, height=75), max_width=2650)
            folium.Marker([row["lat"], row["lon"]], popup= caption).add_to(tableMap)
        return tableMap   
    
    def showStores(self, tableMap=None, labelColumns={}, dataColumns={}):
        """
        Displays data from a table of stores in markers on a map. The table must include columns for the coordinates of each store.
        
        @param tableMap: the map to which this data is to be added (a folium.Map, which defaults to basicMap)
        @param dataColumns: the names of the columns with the data to be displayed in a popup caption 
            (dictionary with string keys representing the data column names and string values representing the labels to use in the caption (the data themselves are numbers))
        @return tableMap: the map with markers added for the new data
        """
        if tableMap is None:
            tableMap= self.getBasicMap()
            
        table=self.storesWithCoord
            
        for r in table.collect():
            row= r.asDict()
            text= Maps.formatCaption(row, labelColumns, dataColumns)
            caption= folium.Popup(folium.element.IFrame(html= text, width=200, height=75), max_width=2650)
            folium.CircleMarker([row["lat"], row["lon"]], popup= caption, radius= 20, color= '#FF0000', fill_color= '#FF0000')\
                .add_to(tableMap)
        return tableMap
    
    def showRoutes(self, tableMap=None, labelColumns={}, dataColumns={}, color='#FF0000'):
        """
        Displays data from a table of routes in markers on a map. The table must include columns for the coordinates of each end of the route.
        
        @param tableMap: the map to which this data is to be added (a folium.Map, which defaults to basicMap)
        @param dataColumns: the names of the columns with the data to be displayed in a popup caption 
            (dictionary with string keys representing the data column names and string values representing the labels to use in the caption (the data themselves are numbers))
        @return tableMap: the map with markers added for the new data
        """
        if tableMap is None:
            tableMap= self.getBasicMap()
            
        table=self.routesWithCoord
            
        for r in table.collect():
            row= r.asDict()
            text= Maps.formatCaption(row, labelColumns, dataColumns)
            caption= folium.Popup(folium.element.IFrame(html= text, width=200, height=75), max_width=2650)
            tableMap.add_children(folium.PolyLine([[row["locationLat"], row["locationLon"]], [row["storeLat"], row["storeLon"]]], color, popup= caption))
    
# end class Maps

### Visualizing the Input Data

The next code cell creates a map showing the input data for the warehouses, stores and routes. Because of the huge number of routes (many of which are not used in the solution), the map would be extremely crowded if it were to show all of them. The example map shows the routes from all of the potential warehouse locations to the store in Olathe, KS. You can select the routes you want to show by changing the variable <code>showRoutes</code> in the next cell. Click on an icon to see the data associated with it. The large blue icons represent warehouses, the small red dots represent the stores, and the red lines represent the routes. Use the +/- buttons to zoom in or out.

In [9]:
routes= warehousingData.getTable("routes")
showRoutes= routes.select("location", "store").where(routes.store == "Olathe, KS") #Any relevant query should work here
mapDisplay= Maps(warehouses=warehousingData.getTable("warehouses"), 
                 stores=warehousingData.getTable("demands"), 
                 routes=warehousingData.getTable("routes"), 
                 routesToShow=showRoutes, 
                 mapCoordinates=warehousingData.getTable("mapCoordinates"))
inputsMap= mapDisplay.showWarehouses(labelColumns= {"location": ""}, dataColumns={"fixedCost": "Fixed Cost", "capacityCost": "Capacity Cost"})
inputsMap= mapDisplay.showStores(tableMap=inputsMap, labelColumns= {"store": ""}, dataColumns={"amount": "Demand"})
mapDisplay.showRoutes(tableMap=inputsMap, dataColumns={"shippingCost": "Shipping Cost"})
inputsMap

## The Decision Model and the Optimizer Class

### The OPL Model

Here is the statement of the Warehousing optimization model in IBM's Optimization Programming Language (OPL):

In [10]:
warehousing_inputs='''
 //Input Data
  
 {Warehouse} warehouses= ...;	//Denotes reading from a data source
 
 {Store} stores= ...;
 
 {Route} routes= ...;
 
 {Demand} demands= ...;
 float demand[routes]= [r: d.amount | r in routes,  d in demands: r.store==d.store]; //demand at the store at the end of route r
 
 {Scenario} scenarios= ...;
 Scenario scenario= first(scenarios); //scenarios is a singleton set
'''

In [11]:
warehousing_dotmod='''
 //Optimization Model
 
 //Decision variables
 dvar boolean open[warehouses];
 dvar float+ capacity[warehouses];		//pallets
 dvar float+ ship[routes] in 0.0..1.0;	//percentage of each store's demand shipped on each route
 
 dexpr float capitalCost= sum(w in warehouses) (w.fixedCost*open[w] + w.capacityCost*capacity[w]);
 dexpr float operatingCost= sum(r in routes) r.shippingCost*demand[r]*ship[r];
 
 constraint ctCapacity[warehouses];
 constraint ctDemand[stores];
 constraint ctSupply[routes];
 
 minimize capitalCost + scenario.periods*operatingCost;	// $/yr
 subject to {
 	 
 	forall(w in warehouses)
//	  Cannot ship more out of a warehouse than its capacity
 	  ctCapacity[w]: capacity[w] >= sum(r in routes: r.location==w.location) demand[r]*ship[r];
 	 
	forall(s in stores)
//    Must ship at least 100% of each store's demand
	  ctDemand[s]: sum(r in routes: r.store==s.storeId) ship[r] >= 1.0;
   	   
	forall(r in routes, w in warehouses: w.location==r.location)
//	  Can only ship along a supply route if its warehouse is open	  
	  ctSupply[r]: -ship[r] >= -open[w];	//ship[r] <= open[w]
   
 }
'''

In [12]:
warehousing_outputs= '''
 //Output Data
 
 {Objective} objectives= {
  <"Warehousing", "capitalCost", scenario.id, 0, capitalCost>,
  <"Warehousing", "operatingCost", scenario.id, 0, operatingCost>};
 
 {Shipment} shipments= {<r.location, r.store, scenario.id, 0, ship[r]*d.amount> | r in routes, d in demands: r.store==d.store && ship[r]>0.0};
 
 {OpenWarehouse} openWarehouses= {<w.location, scenario.id, 0, open[w], capacity[w]> | w in warehouses};
'''

The decision model takes a *declarative* form; that is, it specifies the logical conditions (*constraints*) that must hold among the decision variables at the optimal solution without specifying how to compute their values. 

The general form of an optimization problem, or *model*, has five components:
<p>
-  *Index sets* specify the scale of the model, that is, the distinct entities for which decisions are to be made or upon which constraints are to be placed. In the Warehousing model, the index sets consist of the warehouse locations, stores, and routes connecting them. In the OPL code above, these index sets are specified by tuple sets read as input.
-  *Decision variables* represent the decisions to be made. In the Warehousing model, the decision variables consist of the discrete (binary) choice whether or not to open a warehouse at each location, the capacity of each open warehouse, and amount to ship on each route from a warehouse location to a store. The latter two types of decisions are continuous amounts.
-  *Objective function* allows the optimization to rank alternative solutions. In the Warehousing model, the objective function consists of three components, or *decision expressions*: the cost of opening the warehouses which is fixed, the cost of capacity at the warehouses, which varies linearly with the amount of capacity, and the cost of shipping from the warehouses to the stores, which varies linearly with the amount shipped.
-  *Constraints* specify restrictions on the decisions, since not all combinations of decisions are allowed. The Warehousing model has three classes of constraints:
    -  Capacity constraints restrict the amount that can be shipped out of each warehouse to be less than or equal to its capacity.
    -  Demand constraints require that the total shipments to each store from all the warehouses must be at least the demand at that store.
    -  Supply constraints require that no shipments can occur from a warehouse unless it it open.
<p>

This latter constraint requires a bit of further explanation. The <code>shipment</code> variable on the left is continuous, meaning it can take any value between zero and one. On the other hand, the <code>open</code> variable on the right is discrete, meaning it can take only the values zero or one. Thus, if <code>open</code> is zero, <code>shipment</code> will be forced to zero. One the other hand, is <code>shipment</code> is positive, <code>open</code> will be forced to one. Notice also, that <code>shipment</code> for a route is defined as the *fraction* of a store's demand that is shipped on that route. While it is not obvious from OPL code, using this definition significantly improves the performance of the optimization solver algorithm for technical reasons beyond the scope of this notebook; this formulation of the facility location model is call the *tight* formulation.
<p>
The final component of the optimization model is
-  *Data*, which in the this Warehousing model is specified by the *application data model*. It is important to realize that, except for the application data model, the formulation of the optimization model is completely independent of the specific instance data used in the notebook. The principle of model-data separation enables a decision support application to scale nicely. The input data not only provides the concrete parameters of the model but also its scale in terms of such factors such as:
    - The number and characteristics of the warehouses
    - The number and characteristics of the stores
    - The number and characteristics of the shippijng routes

Thus, the key mathematical relationships need to be specified only once in order for the application to scale as the scope of the problem changes.


### The Optimizer Class

The actual computation uses a complex algorithm that is implemented in a solver engine, in this case IBM Decision Optimization on Cloud, which exposes IBM's CPLEX through a cloud-based interface. In order to simplify applications built on this cloud platform, such as the Warehousing model discussed in this notebook, the calls to the solver have been abstracted as the Optimizer class, shown below. This class is independent of the actual decision model and instance data, and so it can be reused in other decision optimization applications without modification. Here is the Python code for <code>Optimizer</code> class and some related functions (go to edit mode to see the contents of this hidden cell).

In [13]:
# @hidden_cell
'''
Created on Feb 9, 2017

@author: bloomj
'''
try:
    import docloud
except:
    if hasattr(sys, 'real_prefix'):
        #we are in a virtual env.
        !pip install docloud 
    else:
        !pip install --user docloud

from docloud.job import JobClient
from docloud.status import JobSolveStatus, JobExecutionStatus

from urlparse import urlparse

import fileinput
import urllib
import cStringIO
from pprint import pprint

class Optimizer(object):
    '''
     Handles the actual optimization task.
     Creates and executes a job builder for an optimization problem instance.
     Encapsulates the DOCloud API.
     This class is designed to facilitate multiple calls to the optimizer, such as would occur in a decomposition algorithm,
     although it transparently supports single use as well.
     In particular, the data can be factored into a constant data set that does not vary from run to run (represented by a JSON or .dat file)
     and a variable piece that does vary (represented by a Collector object).
     The optimization model can also be factored into two pieces, a best practice for large models and multi-models:
     A data model that defines the tuples and tuple sets that will contain the input and output data.
     An optimization model that defines the decision variables, decision expressions, objective function, 
     constraints, and pre- and post-processing data transformations.
     Factoring either the data or the optimization model in this fashion is optional.
     
     The problem instance is specified by the OPL model and input data received from the invoking (e.g. ColumnGeneration) instance.
     Input and output data are realized as instances of OPLCollector, which in turn are specified by their respective schemas.
     This class is completely independent of the specific optimization problem to be solved.
    '''

    def __init__(self, problemName, model=None, resultDataModel=None, credentials=None, *attachments):
        '''
         Constructs an Optimizer instance.
         The instance requires an optimization model as a parameter.
         You can also provide one or more data files as attachments, either in OPL .dat or in JSON format. This data does not
         change from solve to solve. If you have input data that does change, you can provide it to the solve method as an OPLCollector object.
         :param problemName: name of this optimization problem instance
         :type problemName: String    
         :param model: an optimization model written in OPL
         :type model: Model.Source object or String
         :param resultDataModel: the application data model for the results of the optimization
         :type resultDataModel: dict<String, StructType>
         :param credentials: DOcplexcloud url and api key
         :type credentials: {"url":String, "key":String}
         :param attachments: URLs for files representing the data that does not vary from solve to solve
         :type attachments: list<URL>
        '''
        self.name= problemName
        self.model= model
        self.resultDataModel= resultDataModel
        self.attachData(attachments)
        self.streamsRegistry= []
        self.history= []
        
        self.credentials= credentials
 
        self.jobclient= JobClient(credentials["url"], credentials["key"]);
        self.solveStatus= JobSolveStatus.UNKNOWN;
        
    def getName(self):
        """
        Returns the name of this problem
        """
        return self.name
    
    def setOPLModel(self, name, dotMods=None, modelText=None):
        '''
         Sets the OPL model.
         This method can take any number of dotMod arguments, but
         there are two common use cases:
         First, the optimization model can be composed of two pieces: 
             A data model that defines the tuples and tuple sets that will contain the input and output data.
             An optimization model that defines the decision variables, decision expressions, objective function, 
             constraints, and pre- and post-processing data transformations.
             The two are concatenated, so they must be presented in that order.
             If such a composite model is used, you do not need to import the data model into the optimization model using an OPL include statement.
         Second, you do not have to use a separate data model, in which case a single dotMod must be provided 
         which encompasses both the data model and the optimization model.  
        @param name: the name assigned to this OPL model (should have the format of a file name with a .mod extension)
        @type name: String
        @param dotMods: URLs pointing to OPL .mod files, which will be concatenated in the order given
        @type dotMods: List<URL>
        @param modelText: the text of the OPL model, which will be concatenated in the order given
        @type modelText: List<String>
        @return this optimizer
        @raise ValueError if a model has already been defined or if dotMods or modelText is empty
        '''
        if self.model is not None:
            raise ValueError("model has already been set")
        self.model= ModelSource(name=name, dotMods=dotMods, modelText=modelText)
        return self
    
    def setResultDataModel(self, resultDataModel):
        '''
        Sets the application data model for the results of the optimization
        @param resultDataModel: the application data model for the results of the optimization
        @type resultDataModel: dict<String, StructType>
        '''
        if self.resultDataModel is not None:
            raise ValueError("results data model has already been defined")        
        self.resultDataModel = resultDataModel
        return self
    
    def attachData(self, attachments):
        '''
        Attaches one or more data files, either in OPL .dat or in JSON format. This data does not
        change from solve to solve. If you have input data that does change, you can provide it as a Collector object.
        @param attachments: files representing the data that does not vary from solve to solve
        @type attachments: list<URL>
        @return this optimizer
        @raise ValueError if an item of the same name has already been attached
        '''
        self.attachments= {}
        if attachments is not None:
            for f in attachments:
                fileName= os.path.splitext(os.path.basename(urlparse(f)))[0]
                if fileName in self.attachments:
                    raise ValueError(fileName+ " already attached")
                self.attachments[fileName]= f
        return self;
    
    def solve(self, inputData=None, solutionId=""):
        '''
        Solves an optimization problem instance by calling the DOCloud solve service (Oaas).
        Creates a new job request, incorporating any changes to the variable input data, 
        for a problem instance to be processed by the solve service. 
        Once the problem is solved, the results are mapped to an instance of an OPL Collector.
        Note: this method will set a new destination for the JSON serialization of the input data.
        @param inputData: the variable, solve-specific input data
        @type inputData: OPLCollector
        @param solutionId: an identifier for the solution, used in iterative algorithms (set to empty string if not needed)
        @type solutionId: String
        @return: a solution collector
        '''
        inputs= []
        if self.model is None:
            raise ValueError("A model attachment must be provided to the optimizer")
        if self.model: #is not empty
            stream= self.model.toStream()
            inputs.append({"name":self.model.getName(), "file":stream})
            self.streamsRegistry.append(stream)
        if self.attachments: #is not empty
            for f in self.attachments:
                stream= urllib.FancyURLopener(self.attachments[f])
                inputs.append({"name":f, "file":stream})
                self.streamsRegistry.append(stream)
        if inputData is not None:
            outStream = cStringIO.StringIO()
            inputData.setJsonDestination(outStream).toJSON()
            inStream = cStringIO.StringIO(outStream.getvalue())
            inputs.append({"name": inputData.getName()+".json", "file": inStream})
            self.streamsRegistry.extend([outStream, inStream])
       
        response= self.jobclient.execute(
            input= inputs, 
            output= "results.json", 
            load_solution= True, 
            log= "solver.log", 
            gzip= True,
            waittime= 300,  #seconds
            delete_on_completion= False)
         
        self.jobid= response.jobid
        
        status= self.jobclient.get_execution_status(self.jobid)
        if status==JobExecutionStatus.PROCESSED:
            results= cStringIO.StringIO(response.solution)
            self.streamsRegistry.append(results)
            self.solveStatus= response.job_info.get('solveStatus') #INFEASIBLE_SOLUTION or UNBOUNDED_SOLUTION or OPTIMAL_SOLUTION or...
            solution= (OPLCollector(self.getName()+"Result"+solutionId, self.resultDataModel)).setJsonSource(results).fromJSON()
            self.history.append(solution)
        elif status==JobExecutionStatus.FAILED:
            # get failure message if defined
            message= ""
            if (response.getJob().getFailureInfo() != None):
                message= response.getJob().getFailureInfo().getMessage()
            print("Failed " +message)
        else:
            print("Job Status: " +status)
        
        for s in self.streamsRegistry:
            s.close();
        self.jobclient.delete_job(self.jobid);
        
        return solution
    
    def getSolveStatus(self):
        """
        @return the solve status as a string
        Attributes:
            UNKNOWN: The algorithm has no information about the solution.
            FEASIBLE_SOLUTION: The algorithm found a feasible solution.
            OPTIMAL_SOLUTION: The algorithm found an optimal solution.
            INFEASIBLE_SOLUTION: The algorithm proved that the model is infeasible.
            UNBOUNDED_SOLUTION: The algorithm proved the model unbounded.
            INFEASIBLE_OR_UNBOUNDED_SOLUTION: The model is infeasible or unbounded.
        """
        return self.solveStatus
    
# end class Optimizer        
    
class ModelSource(object):
    '''
     This class manages the OPL source code for an optimization model.
     It can use an OPL model specified by one or more files, indicated by their URLs, or
     it can use an OPL model specified by one or more Strings. 
     Use of one OPL component is the norm, but this class also
     enables factoring an OPL model into a data model and an optimization model.
     Using such a two-piece factorization is a best practice for large models and multi-models:
     The data model defines the tuples and tuple sets that will contain the input and output data.
     The optimization model defines the decision variables, decision expressions, objective function, 
     constraints, and pre- and post-processing data transformations.
     
     When the OPL model consists of multiple components, ModelSource concatenates them in the order
     presented, and it is not necessary to use OPL include statements to import the components.
     The multiple model files need not be located in the same resource folder.
     
     Note: developers generally need not use this class directly. Instead, it is recommended
     to use the setOPLModel method of the Optimizer class.
    '''
    
    def __init__(self, name= "OPL.mod", dotMods= None, modelText= None):
        '''
         Creates a new ModelSource instance from URLs pointing to OPL .mod files.
         This method can take any number of URL arguments, but
         there are two common use cases:
         First, the optimization model can be composed of two pieces: 
         A data model that defines the tuples and tuple sets that will contain the input and output data.
         An optimization model that defines the decision variables, decision expressions, objective function, 
         constraints, and pre- and post-processing data transformations.
         The two are concatenated, so they must be presented in that order.
         If such a composite model is used, you do not need to import the data model into the optimization model using an OPL include statement.
         Second, you do not have to use a separate data model, in which case a single model URL must be provided 
         which encompasses both the data model and the optimization model.
        
        @param name: the name assigned to this OPL model (should have the format of a file name with a .mod extension)
        @type name: String
        @param dotMods: URLs pointing to OPL .mod files, which will be concatenated in the order given
        @type dotMods: List<URL>
        @param modelText: the text of the OPL model, which will be concatenated in the order given
        @type modelText: List<String>
        @raise: ValueError if dotMods or modelText is empty
        '''
        
        self.name= name;
        if dotMods is not None and not dotMods: #is empty
            raise ValueError("argument cannot be empty");
        self.dotMods= dotMods;
        if modelText is not None and not modelText: #is empty
            raise ValueError("argument cannot be empty");
        self.modelText= modelText;
        
    def getName(self):
        '''
         @return:  the name assigned to this OPL model
         @type String 
        '''
        return self.name
    
    def isEmpty(self):
        '''
         @return true if both dotMods and modelText are null; false otherwise
        '''
        return self.dotMods is None and self.modelText is None
    
    def toStream(self):
        '''
         Concatenates the model components and creates an input file for reading them.
         
         @return a file
        '''
        if self.dotMods: #is not empty        
            result= fileinput.input((urllib.FancyURLopener(f) for f in self.dotMods))
            return result           
        if self.modelText: #is not empty
            result= cStringIO.StringIO("".join(self.modelText)) 
            return result           
        raise ValueError("model source is empty")
 
# end class ModelSource

## Solving the Warehousing Model

### Get Your Credentials for IBM Decision Optimization on Cloud

In order to use the IBM Decision Optimization on Cloud service, you need to have or obtain credentials at __[Decision Optimization on Cloud Free Trial](http://apps.admin.ibmcloud.com/manage/trial/docloud.html?RelayState=dropsolve_force_login-oaas)__. Insert them in the code below:

In [14]:
url = "" # ENTER YOUR URL HERE
key = "" # ENTER YOUR KEY HERE

### Setting Up and Submitting the Solve Job

Using the <code>Optimizer</code> class, the following solves the Warehousing problem:

In [15]:
problem= Optimizer("Warehousing", credentials={"url":url, "key":key})\
        .setOPLModel("Warehousing.mod", modelText= [warehousing_data_dotmod, warehousing_inputs, warehousing_dotmod, warehousing_outputs])\
        .setResultDataModel(resultDataModel)
warehousingResult= problem.solve(warehousingData.copy("warehousingDataNoCoord", "warehouses", "routes", "stores", "demands", "scenarios"))
# Note: the mapCoordinates table is not used in the optimization and so is not sent to the optimizer
problem.getSolveStatus()

u'OPTIMAL_SOLUTION'

**A note on debugging**: If the solve fails, the notebook will often not show debugging information that would enable diagnosing the problem. To see such information, go to  __[Drop Solve](https://dropsolve-oaas.docloud.ibmcloud.com/dropsolve)__ and click "log" or "info".

In [16]:
# This cell and the immediately following one print the objectives and openWarehouses tables
# as lists of Rows that can be exported by copy and paste.
# To use them, simply uncomment them and rerun the notebook.

warehousingResult.getTable("objectives").collect()


[Row(dExpr=u'capitalCost', iteration=0, problem=u'Warehousing', scenarioId=u'Nominal', value=6373620.0),
 Row(dExpr=u'operatingCost', iteration=0, problem=u'Warehousing', scenarioId=u'Nominal', value=4580688.489999998)]

In [17]:
warehousingResult.getTable("openWarehouses").collect()

[Row(capacity=0.0, iteration=0, location=u'Brockton, MA', open=0, scenarioId=u'Nominal'),
 Row(capacity=0.0, iteration=0, location=u'Bristol, CT', open=0, scenarioId=u'Nominal'),
 Row(capacity=0.0, iteration=0, location=u'Union City, NJ', open=0, scenarioId=u'Nominal'),
 Row(capacity=3961.0, iteration=0, location=u'New York, NY', open=1, scenarioId=u'Nominal'),
 Row(capacity=0.0, iteration=0, location=u'Philadelphia, PA', open=0, scenarioId=u'Nominal'),
 Row(capacity=0.0, iteration=0, location=u'Parkville, MD', open=0, scenarioId=u'Nominal'),
 Row(capacity=0.0, iteration=0, location=u'Greensboro, NC', open=0, scenarioId=u'Nominal'),
 Row(capacity=0.0, iteration=0, location=u'Goose Creek, SC', open=0, scenarioId=u'Nominal'),
 Row(capacity=1146.0, iteration=0, location=u'Lawrenceville, GA', open=1, scenarioId=u'Nominal'),
 Row(capacity=0.0, iteration=0, location=u'Jacksonville, FL', open=0, scenarioId=u'Nominal'),
 Row(capacity=0.0, iteration=0, location=u'Birmingham, AL', open=0, scenar

### Retrieving the Optimal Solution

The <code>objectives</code> table shows the optimal cost of the solution, and the <code>openWarehouses</code> table shows the optimal locations of the warehouses to open and their optimal capacities.

In [18]:
warehousingResult.displayTable("objectives", sys.stdout);

collector: WarehousingResult
table: objectives
+-------------+---------+-----------+----------+-----------------+
|dExpr        |iteration|problem    |scenarioId|value            |
+-------------+---------+-----------+----------+-----------------+
|capitalCost  |0        |Warehousing|Nominal   |6373620.0        |
|operatingCost|0        |Warehousing|Nominal   |4580688.489999998|
+-------------+---------+-----------+----------+-----------------+



In [19]:
openWarehouses= warehousingResult.getTable("openWarehouses").select('*').where("open == 1")
print("collector: WarehousingResult")
print("table: openWarehouses")
openWarehouses.show(openWarehouses.count())

collector: WarehousingResult
table: openWarehouses
+--------+---------+-----------------+----+----------+
|capacity|iteration|         location|open|scenarioId|
+--------+---------+-----------------+----+----------+
|  3961.0|        0|     New York, NY|   1|   Nominal|
|  1146.0|        0|Lawrenceville, GA|   1|   Nominal|
|  2720.0|        0|      Chicago, IL|   1|   Nominal|
|  1395.0|        0|       Dallas, TX|   1|   Nominal|
|   874.0|        0|       Denver, CO|   1|   Nominal|
|  5581.0|        0|  Los Angeles, CA|   1|   Nominal|
|  2388.0|        0|San Francisco, CA|   1|   Nominal|
+--------+---------+-----------------+----+----------+



### Visualizing the Results on a Map

Again, because of the large number of routes used in the optimal solution, the results are more effectively displayed on a map. Click on an icon to see the data associated with it. The large blue icons represent open warehouses, the small red dots represent the stores, and the red lines represent the shipments. Use the +/- buttons to zoom in or out.

In [20]:
mapCoordinatesDataSource = getFromObjectStorage(credentials_1, filename="MapCoordinates.json")
mapDisplay2= Maps(warehouses=warehousingResult.getTable("openWarehouses").select('*').where("open==1"), 
                 stores=warehousingData.getTable("demands"), 
                 routes=warehousingResult.getTable("shipments"), 
                 mapCoordinates=warehousingData.getTable("mapCoordinates"))
resultsMap= mapDisplay2.showWarehouses(labelColumns= {"location": ""}, dataColumns={"capacity": "Capacity"})
resultsMap= mapDisplay2.showStores(tableMap=resultsMap, labelColumns= {"store": ""}, dataColumns={"amount": "Demand"})
mapDisplay2.showRoutes(tableMap=resultsMap, dataColumns={"amount": "Shipment"})
resultsMap

## Summary

This notebook illustrates how to use Apache Spark and IBM Decision Optimization on Cloud to solve a common business problem in supply chain network design: where to locate and how to size warehouses that serve as distribution centers for multiple retail stores. It introduces two APIs, <code>OPLCollector</code> and <code>Optimizer</code> that simplify data handling and flow control of optimation models, and it uses IBM's Optimization Programming Language (OPL) to specify the optimization problem in a clear and natural way. It also introduces a <code>Maps</code> API to visualize the data and results for this application in a straightforward display. 

## Author

**Dr. Jeremy Bloom** is an offering manager for IBM Data Science Experience and an expert on decision optimization. In the course of his more than 35-year career, he has lead research programs for energy companies and developed software products using operation research to solve practical business problems. Dr. Bloom has a bachelor's degree in Electrical Engineering from Carnegie-Mellon University and a master's degree and doctorate from Massachusetts Institute of Technology in Operations Research.

## References

 - __[Decision Optimization on Cloud](http://dropsolve-oaas.docloud.ibmcloud.com/software/analytics/docloud)__
 - __[Decision Optimization on Cloud Python API](https://developer.ibm.com/docloud/documentation/docloud/python-api/)__
 - __[IBM Optimization Programming Language (OPL)](http://www.ibm.com/support/knowledgecenter/SSSA5P_12.7.1/ilog.odms.ide.help/OPL_Studio/maps/groupings/opl_Language.html)__

**Copyright © 2017 IBM. This notebook and its source code are released under the terms of the MIT License.**