### Map functions: map_row() and map_partition()

Example use cases:
* Group grade processing with map_row().
* Simple Micromodeling example: Model training and scoring of housing data. Data are partitioned on the basis of the home style feature, and a different model is trained in the Advanced SQL Engine for each home style. Then, test data are scored with the corresponding models. Both operations are performed on the user end through using map_partition().

Example goals:
* Using the teradataml DataFrame methods map_row() and map_partition() to apply a Python function to each row or group of rows (partition).

Notes:
* Map functions use the SCRIPT Table Operator (STO) Database object in the background. Therefore, to use map functions the STO **must be enabled** in your target Advanced SQL Engine, and the Teradata Packages for In-nodes Analytics must be installed on its nodes. Specifically, the required packages for Python are **teradata-python** and **teradata-python-addons**
* For the Map functions to operate correctly, the **Python** version and the Python *dill* add-on library on the client must be **the same version** as in the In-nodes installed distribution.
* This notebook utilizes several Python packages in addition to **teradataml** which you may need to install on your client.
* For this example, no additional data files are needed; this example utilizes teradaml built-in data.

Notebook workflow:
1. Setup environment.
2. Illustrate map_row() examples with a few different ways to use map_row().
3. Illustrate map_partition() examples: We use map_partition() for Micromodeling, that is
   * in a first example we train a different mode for different partitions in a Database table; then,
   * in a second example we score partitions of test data in a Database talbe with the corresponding trained models.

#### Import Statements

In [None]:
import sys
import getpass
import numpy as np
import pandas as pd
import statsmodels.api as sm
import statsmodels.formula.api as smf

from collections import OrderedDict
from teradataml import create_context, remove_context, load_example_data, DataFrame
from teradataml.dataframe.sql_functions import case
from teradatasqlalchemy.types import FLOAT, CLOB
from sqlalchemy import func
from sqlalchemy.sql import literal_column
from base64 import b64encode, b64decode
from dill import dumps, loads
from numpy import asarray
from pandas import isna, concat, read_csv, Series

# For formatting the output for better readability (only for demo - not required otherwise)
from IPython.core.display import HTML

#### Create context

In [None]:
# Specify a Vantage system to connect to. Specify default database accordingly, or delete the argument.
host = input("Host: ")
username = input("Username: ")
password = getpass.getpass()
# Specify a database name and the database argument, if desired to connect to another than the default.
database = "xxxxx"
con = create_context(host = host, username = username, password = password, database = database)

### A. map_row(): Load data

In [None]:
# The map_row() examples use the 'admissions_train' dataset. In these examples, the average 'gpa'
# of each student is calculated based on the value in 'admitted' column. Load the example data.
#
load_example_data("dataframe", "admissions_train")
df = DataFrame('admissions_train')
df.head(5)

#### map_row(): Example 1

Create the user defined function to increase the 'gpa' by the percentage provided. Note that the input to and the output from the function is a pandas Series object.

In [None]:
def increase_gpa(row, p=20):
    row['gpa'] = row['gpa'] + row['gpa'] * p/100
    return row

In [None]:
# Apply the user defined function to the DataFrame.
# Note that since the output of the user defined function expects the same
# columns with the same types, we can skip passing the 'returns' argument.
increase_gpa_20 = df.map_row(increase_gpa)

# Print the result.
increase_gpa_20.head(5)

#### map_row(): Example 2

Use the same user defined function with a lambda notation to pass the percentage 'p = 40'.

In [None]:
increase_gpa_40 = df.map_row(lambda row: increase_gpa(row, p = 40))

increase_gpa_40.head(5)

#### map_row(): Example 3

Use the same user defined function with functools.partial to pass the percentage 'p = 50'.

In [None]:
from functools import partial
increase_gpa_50 = df.map_row(partial(increase_gpa, p = 50))

increase_gpa_50.head(5)

#### map_row(): Example 4

Use a lambda function to increase the 'gpa' by 50 percent, and return numpy ndarray.

In [None]:
from numpy import asarray
increase_gpa_lambda = lambda row, p=20: asarray([row['id'], row['masters'], row['gpa'] + row['gpa'] * p/100,
                                                 row['stats'], row['programming'], row['admitted']])

increase_gpa_100 = df.map_row(lambda row: increase_gpa_lambda(row, p=100))

increase_gpa_100.head(5)

#### map_row(): Example 5

Using non-default chunk_size.

In [None]:
# Using chunk_size = 5
out_df = df.map_row(increase_gpa, chunk_size=5)

out_df.head(5)

### B. map_partition(): Load data

In [None]:
# Load the example data and create the input DataFrames.
#
print("Loading data")
load_example_data("GLMPredict", ["housing_test","housing_train"])

print("Creating dataframes")
train = DataFrame('housing_train')
test = DataFrame('housing_test')

In [None]:
# Print a preview of the DataFrames
#
print("Train dataset")
display(train.to_pandas().head(5))
print("\n")
print("Test dataset")
display(test.to_pandas().head(5))

#### map_partition(): Example 1

Model training.

In [None]:
# Define the function that we want to use to fit multiple GLM models, one for each home style.
# We will be using the statsmodels package here.

def glm_fit(rows):
    """
    DESCRIPTION:
        Function that accepts an iterator on a pandas DataFrame (TextFileObject) created using
        'chunk_size' with pandas.read_csv(), and fits a GLM model to the corresponding data.
        The underlying data is the housing data with 12 independent variable (inluding the home style)
        and one dependent variable (price).
    
    RETURNS:
        A numpy.ndarray object with two elements:
        * The homestyle value (type: str)
        * The GLM model that was fit to the corresponding data, which is serialized using pickle
          and base64 encoded. We use decode() to make sure it is of type str, and not bytes.
    """
    # Read the entire partition/group of rows in a pandas DataFrame - pdf.
    data = rows.read()

    # Add the 'intercept' column along with the features.
    data['intercept'] = 1.0

    # We will not process the partition if there are no rows here.
    if data.shape[0] > 0:
        # Fit the model using R-style formula to specify categorical variables as well.
        # We use 'disp=0' to prevent sterr output.
        model = smf.glm('price ~ C(recroom) + lotsize + stories + garagepl + C(gashw) +'
                        ' bedrooms + C(driveway) + C(airco) + C(homestyle) + bathrms +'
                        ' C(fullbase) + C(prefarea)',
                        family=sm.families.Gaussian(), data=data).fit(disp=0)

        # We serialize and base64 encode the model in prepration to output it.
        modelSer = b64encode(dumps(model))
        
        # The user function can either return a value of supported type
        # (numpy array, pandas Series, or pandas DataFrame),
        # or just print it to find it's way to the output.
        # Here we return it as a numpy ndarray object.
        
        # Note that we use decode for the serialized model so that it is
        # represented in the ascii form (which is what base64 encoding does),
        # instead of bytes.
        return asarray([data.loc[0]['homestyle'], modelSer.decode('ascii')])


In [None]:
# Apply the 'glm_fit' function defined above to create a model for every homestyle in
# the training dataset.
print("Fitting the model to the training data...")

# We specify the output column names and their types here with the 'returns'
# argument since the output is not similar to the input.
model = train.map_partition(glm_fit, data_partition_column = 'homestyle',
                            returns = OrderedDict([('homestyle', train.homestyle.type),
                                                   ('model', CLOB())]))

# The model table has been created successfully.
print("Model table has been created!")
display(model.to_pandas().head())

#### map_partition(): Example 2

In this example, we score observations on the basis of the models trained in Example 1 above.

We use window function 'row_number()' to assign row numbers to each subset of data corresponding to a particular homestyle. The idea is to extend the table to add the model corresponding to the homestyle as the last column value for the first row in the partition. This makes it easier for the scoring function to read the model and then score the input records based on it.

In [None]:
# Create row number column ('row_id') in the 'test' DataFrame.
test_with_row_num = test.assign(row_id = func.row_number().over(partition_by=test.homestyle.expression, order_by=test.sn.expression.desc()))

# Join it with the model we created based on the value of homestyle.
temp = test_with_row_num.join(model, on = [(test_with_row_num.homestyle == model.homestyle)], rsuffix='r', lsuffix='l')

# Set the model column to NULL when row_id is not 1.
temp = temp.assign(modeldata = case([(temp.row_id == 1, literal_column(temp.model.name))], else_ = None))

# Drop the extraneous columns created in the processing.
temp = temp.assign(homestyle = temp.l_homestyle).drop('l_homestyle', axis=1).drop('r_homestyle',axis=1).drop('model', axis=1)

# Reorder the columns to have the housing data columns positioned first, followed by the row_id and modeldata.
new_test = temp.select(test.columns + ['row_id', 'modeldata'])

In [None]:
DELIMITER = '\t'
QUOTECHAR = None

def glm_score(rows):
    """
    DESCRIPTION:
        Function that accepts an iterator on a pandas DataFrame (TextFileObject) created using
        'chunk_size' with pandas.read_csv(), and scores it based on the model found in the data.
        The undelrying data is the housing data with 12 independent variable (inluding the home style)
        and one dependent variable (price).
        
        The function outputs the values itself, rather than returning objects of supported type.
    
    RETURNS:
        None.
    """
    model = None
    for chunk in rows:
        # We process data only if there is any, i.e. only when the chunk read has any rows.
        if chunk.shape[0] > 0:
            if model is None:
                # We read the model once (it is found only once) per partition.
                model = loads(b64decode(chunk.loc[0].iloc[-1]))

            # Exclude the row_id and modeldata columns from the scoring dataset as they are not longer required.
            chunk = chunk.iloc[:,:-2]
            # For prediction, exclude the first two columns ('sn' - not relevant, and 'price' - the dependent variable).
            prediction = model.predict(chunk.iloc[:,2:])
            
            # We now concat the chunk with the prediction column (pandas Series) to form a DataFrame.
            outdf = concat([chunk, prediction], axis=1)
                            
            # We just cannot return this DataFrame yet as we have more chunks to process.
            # In such scenarios, we can either:
            #   1. print the output here, or
            #   2. keep concatenating the results of each chunk to create a final resultant pandas DataFrame to return.
            # We are opting for option #1 here.
            for _, row in outdf.iterrows():
                if QUOTECHAR is not None:
                    # A NULL value should not be enclosed in quotes.
                    # The CSV module has no support for such output with writer, and hence the custom formatting.
                    values = ['' if isna(s) else "{}{}{}".format(QUOTECHAR,
                                                                 str(s), QUOTECHAR) for s in row]
                else:
                    values = ['' if isna(s) else str(s) for s in row]
                print(DELIMITER.join(values), file=sys.stdout)

In [None]:
print("Applying glm_score function to the test data...")
# Note that here the output of the function is going to have one more column than the input,
# and we must specify the same.
returns = OrderedDict([(col.name, col.type) for col in test._metaexpr.c] + [('prediction', FLOAT())])

# Note that we are using the 'data_order_column' argument here to order by the 'row_id'
# column so that the model is read before any data that need to be scored.
prediction = new_test.map_partition(glm_score,
                                    returns = returns,
                                    data_partition_column = 'homestyle',
                                    data_order_column = 'row_id')
print("Scoring complete!")

# Print a sample of the scoring result.
prediction.to_pandas().head(5)

In [None]:
# Finally, remove_context() to mark the end of the session and drop all temporary objects created.
remove_context()