# Context
**Background:**

Predicting the age of abalone from physical measurements.  The age of abalone is determined by cutting the shell through the cone, staining it, and counting the number of rings through a microscope -- a boring and time-consuming task.  Other measurements, which are easier to obtain, are used to predict the age.

**Problem:**

Build regression models by ‘sex’ which can predict ‘the number of rings’.

**Fetch data from ML data repository:**

We can fetch data to Greenplum Using following steps.

In [1]:
%load_ext sql
%sql postgresql://gpadmin:***@localhost:7000/postgres

In [2]:
%%sql
-- External Table
DROP EXTERNAL TABLE IF EXISTS abalone_external;
CREATE EXTERNAL WEB TABLE abalone_external(
    sex text
    , length float8
    , diameter float8
    , height float8
    , whole_weight float8
    , shucked_weight float8
    , viscera_weight float8
    , shell_weight float8
    , rings integer -- target variable to predict
) location('http://archive.ics.uci.edu/ml/machine-learning-databases/abalone/abalone.data')
format 'CSV'
(null as '?');

 * postgresql://gpadmin:***@localhost:7000/postgres
Done.
Done.


[]

In [3]:
%%sql
-- Create abalone table from an external table
DROP TABLE IF EXISTS abalone;
CREATE TABLE abalone AS (
    SELECT ROW_NUMBER() OVER() AS id, *
    FROM abalone_external
) DISTRIBUTED BY (sex);

 * postgresql://gpadmin:***@localhost:7000/postgres
Done.
4177 rows affected.


[]

**Train Test set split**

Before proceeding data exploration, let's split our dataset to train and test set without using MADlib.

Firstly, we fetch a random value between 0 and 1 to each row.
Then we create a percentile table that stores percentile values for each sex.
Finally, we join those 2 tables to obtain our training or test tables.

But since Ordered-Set Aggregate Function is not yet supported with Beta version 1, we will skip this step with GreenplumPython and implement it with SQL.

In [4]:
%%sql
CREATE TEMP TABLE temp_abalone_label AS
    (SELECT *, random() AS __samp_out_label FROM abalone);

CREATE TEMP TABLE train_percentile_disc AS
    (SELECT sex, percentile_disc(0.8) within GROUP (ORDER BY __samp_out_label) AS __samp_out_label
    FROM temp_abalone_label GROUP BY sex);
CREATE TEMP TABLE test_percentile_disc AS
    (SELECT sex, percentile_disc(0.2) within GROUP (ORDER BY __samp_out_label) AS __samp_out_label
    FROM temp_abalone_label GROUP BY sex);

DROP TABLE IF EXISTS abalone_train;
CREATE TABLE abalone_train AS
    (SELECT temp_abalone_label.*
        FROM temp_abalone_label
        INNER JOIN train_percentile_disc
        ON temp_abalone_label.__samp_out_label <= train_percentile_disc.__samp_out_label
        AND temp_abalone_label.sex = train_percentile_disc.sex
    );
DROP TABLE IF EXISTS abalone_test;
CREATE TABLE abalone_test AS
    (SELECT temp_abalone_label.*
        FROM temp_abalone_label
        INNER JOIN test_percentile_disc
        ON temp_abalone_label.__samp_out_label <= test_percentile_disc.__samp_out_label
        AND temp_abalone_label.sex = test_percentile_disc.sex
    )

 * postgresql://gpadmin:***@localhost:7000/postgres
4177 rows affected.
3 rows affected.
3 rows affected.
Done.
3343 rows affected.
Done.
837 rows affected.


[]

Note that these features could be supported by GreenplumPython in future release.

# Import preparation

We connect to Greenplum database named "postgres"

In [5]:
import greenplumpython as gp

In [6]:
db = gp.database(host="localhost", dbname="postgres")

# Data Exploration

Get access to existed table "abalone"

In [7]:
abalone = db.table("abalone")

Take a look on table

In [8]:
# SELECT * FROM abalone ORDER BY id LIMIT 5;

abalone.order_by("id").head(5)

id,sex,length,diameter,height,whole_weight,shucked_weight,viscera_weight,shell_weight,rings
1,M,0.455,0.365,0.095,0.514,0.2245,0.101,0.15,15
2,M,0.35,0.265,0.09,0.2255,0.0995,0.0485,0.07,7
3,F,0.53,0.42,0.135,0.677,0.2565,0.1415,0.21,9
4,M,0.44,0.365,0.125,0.516,0.2155,0.114,0.155,10
5,I,0.33,0.255,0.08,0.205,0.0895,0.0395,0.055,7


Observe the distribution of data on different segments

In [9]:
# SELECT gp_segment_id, COUNT(*)
# FROM abalone
# GROUP BY 1
# ORDER BY gp_segment_id;

import greenplumpython.builtin.function as F

abalone.group_by("gp_segment_id").assign(count=lambda _: F.count())

count,gp_segment_id
1307,2
2870,1


Since we already have table "abalone_train" ad "abalone_test" in the database, we can get access to them.

In [10]:
abalone_train = db.table("abalone_train")
abalone_test = db.table("abalone_test")

# Execute the OLS Linear Regression Function by 'sex'

## Training Process

**Creation of training function**

In [11]:
from typing import List

# CREATE TYPE linreg_type AS (
#    col_nm text[]
#    , coef float8[]
#    , intercept float8
#    , serialized_linreg_model bytea
#    , created_dt text
# );


class LinregType:
    col_nm: List[str]
    coef: List[float]
    intercept: float
    serialized_linreg_model: bytes
    created_dt: str


# -- Create function
# -- Need to specify the return type -> API will create the corresponding type in Greenplum to return a row
# -- Will add argument to change language extensions, currently plpython3u by default


@gp.create_array_function
def linreg_func(length: List[float], shucked_weight: List[float], rings: List[int]) -> LinregType:
    from sklearn.linear_model import LinearRegression
    import numpy as np

    X = np.array([length, shucked_weight]).T
    y = np.array([rings]).T

    # OLS linear regression with length, shucked_weight
    linreg_fit = LinearRegression().fit(X, y)
    linreg_coef = linreg_fit.coef_
    linreg_intercept = linreg_fit.intercept_

    # Serialization of the fitted model
    import six
    import datetime

    pickle = six.moves.cPickle
    serialized_linreg_model = pickle.dumps(linreg_fit, protocol=3)

    return {
        "col_nm": ["length", "shucked_weight"],
        "coef": linreg_coef[0],
        "intercept": linreg_intercept[0],
        "serialized_linreg_model": serialized_linreg_model,
        "created_dt": str(datetime.datetime.now()),
    }

**Apply "linreg_fitted" function to our train set**

In [12]:
# DROP TABLE IF EXISTS plc_linreg_fitted;
# CREATE TABLE plc_linreg_fitted AS (
#    SELECT
#        a.sex
#        , (plc_linreg_func(
#            a.length_agg
#            , a.shucked_weight_agg
#            , a.rings_agg)
#        ).*
#    FROM (
#        SELECT
#            sex
#            , ARRAY_AGG(length) AS length_agg
#            , ARRAY_AGG(shucked_weight) AS shucked_weight_agg
#            , ARRAY_AGG(rings) AS rings_agg
#        FROM abalone_split
#        WHERE split = 1
#        GROUP BY sex
#    ) a
# ) DISTRIBUTED BY (sex);

linreg_fitted = (
   abalone_train.group_by("sex").assign(
      result = lambda t: linreg_func(
         t["length"],
         t["shucked_weight"],
         t["rings"]
      )
   )
).assign(
        col_nm=lambda t: t["result"]["col_nm"],
        coef=lambda t: t["result"]["coef"],
        intercept=lambda t: t["result"]["intercept"],
        serialized_linreg_model=lambda t: t["result"]["serialized_linreg_model"],
        created_dt=lambda t: t["result"]["created_dt"]
    )


**Take a look at models built**

In [13]:
linreg_fitted[["sex", "col_nm", "coef", "intercept", "created_dt"]]

sex,col_nm,coef,intercept,created_dt
F,"['length', 'shucked_weight']","[26.137109355672255, -8.424186514283774]",-0.2382484440311447,2022-10-27 08:58:52.204444
M,"['length', 'shucked_weight']","[22.38069869528873, -6.071067605694741]",0.7650399388740272,2022-10-27 08:58:52.204623
I,"['length', 'shucked_weight']","[15.321569717681122, 0.4916985308303924]",1.2825426862371936,2022-10-27 08:58:52.206658


**Get summary of parameter's coefficient for three sex**


- Using ARRAY_APPEND

In [14]:
# SELECT
#   sex,
#   UNNEST(ARRAY_APPEND(col_nm, 'intercept')) AS col_nm2,
#   UNNEST(ARRAY_APPEND(coef, intercept)) AS coef2
# from linreg_fitted;

unnest = gp.function("unnest")
array_append = gp.function("array_append")

linreg_fitted.assign(
    col_nm2=lambda t: unnest(array_append(t["col_nm"], "intercept")),
    coef2=lambda t: unnest(array_append(t["coef"], t["intercept"])),
)[["sex", "col_nm2", "coef2"]]

sex,col_nm2,coef2
F,length,26.13710935567225
F,shucked_weight,-8.424186514283774
F,intercept,-0.2382484440311447
M,length,22.38069869528873
M,shucked_weight,-6.071067605694741
M,intercept,0.7650399388740272
I,length,15.321569717681122
I,shucked_weight,0.4916985308303924
I,intercept,1.2825426862371936


## Prediction

**Creation of prediction function**

In [15]:
@gp.create_function
def linreg_pred_func(serialized_model: bytes, length: float, shucked_weight: float) -> float:
    # Deserialize the serialized model
    import six

    pickle = six.moves.cPickle
    model = pickle.loads(serialized_model)

    features = [length, shucked_weight]

    # Predict the target variable
    y_pred = model.predict([features])

    return y_pred[0][0]

**Join model table and test set table**

In [16]:
linreg_test_fit = linreg_fitted.inner_join(
    abalone_test,
    cond=lambda t1, t2: t1["sex"] == t2["sex"],
    self_columns=["col_nm", "coef", "intercept", "serialized_linreg_model", "created_dt"],
    other_columns=["*"],
)

**Predict test set**

In [17]:
# CREATE TABLE plc_linreg_pred_dot AS (
#    SELECT
#        test.id
#        , test.sex
#        , test.rings
#        , plc_linreg_pred_dot_func(
#            model.coef
#            , model.intercept
#            , ARRAY[length, shucked_weight]
#        ) AS y_pred
#    FROM
#        (SELECT * FROM abalone_split WHERE split=0) AS test
#        , plc_linreg_fitted AS model
#    WHERE test.sex = model.sex
# ) DISTRIBUTED BY (sex);


linreg_pred = linreg_test_fit.assign(
    rings_pred=lambda t:
        linreg_pred_func(
            t["serialized_linreg_model"],
            t["length"],
            t["shucked_weight"],
        ),
)[["id", "sex", "rings", "rings_pred"]]

In [18]:
# SELECT * FROM plc_linreg_pred WHERE sex='I' ORDER BY id LIMIT 5;

linreg_pred[lambda t: t["sex"] == "I"].order_by("id").head(5)

id,sex,rings,rings_pred
5,I,7,6.382667711581284
46,I,7,7.30097849758049
59,I,4,5.056978605363945
177,I,5,6.1383390591565705
178,I,8,6.134897169440757


## Evaluation of model

**Creation of evaluation return type**

In [19]:
# CREATE TYPE plc_linreg_eval_type AS (
#    mae float8
#    , mape float8
#    , mse float8
#    , r2_score float8
# );


class linreg_eval_type:
    mae: float
    mape: float
    mse: float
    r2_score: float

**Creation of evaluation function**

In [20]:
@gp.create_array_function
def linreg_eval(y_actual: List[float], y_pred: List[float]) -> linreg_eval_type:
    from sklearn.metrics import mean_absolute_error, mean_squared_error, r2_score
    import numpy as np

    mae = mean_absolute_error(y_actual, y_pred)
    mse = mean_squared_error(y_actual, y_pred)
    r2_score = r2_score(y_actual, y_pred)

    y_pred_f = np.array(y_pred, dtype=float)
    mape = 100 * sum(abs(y_actual - y_pred_f) / y_actual) / len(y_actual)

    # eval_score = [mae, mape, mse, r2_score]

    return {"mae": mae, "mape": mape, "mse": mse, "r2_score": r2_score}

Evaluate our model

In [21]:
# SELECT
#    sex
#    , (linreg_eval(rings_agg, y_pred_agg)).*
# FROM (
#    SELECT
#        sex
#        , ARRAY_AGG(rings) AS rings_agg
#        , ARRAY_AGG(y_pred) AS y_pred_agg
#    FROM plc_linreg_pred
#    GROUP BY sex
# ) a


linreg_pred.group_by("sex").assign(result=lambda t: linreg_eval(t["rings"], t["rings_pred"])).assign(
        mae=lambda t: t["result"]["mae"],
        mape=lambda t: t["result"]["mape"],
        mse=lambda t: t["result"]["mse"],
        r2_score=lambda t: t["result"]["r2_score"],
)[["mae", "mape", "mse", "r2_score", "sex"]]

mae,mape,mse,r2_score,sex
1.98380905186679,17.929775625631404,6.825198413375387,0.1113883891158246,F
2.083924421181288,19.24815026065281,7.770403748522946,0.1852820980108832,M
1.325978986090538,15.628214597897044,3.5806270466689174,0.4548776278576368,I
