# Data Science on Python with GreenplumPython
**Background:**

Predicting the age of abalone from physical measurements.  The age of abalone is determined by cutting the shell through the cone, staining it, and counting the number of rings through a microscope -- a boring and time-consuming task.  Other measurements, which are easier to obtain, are used to predict the age.

**Problem:**

Build regression models by ‘sex’ which can predict ‘the number of rings’ of abalone.

# Fetch data from ML data repository

We assume we have already table abalone in Database.

In [None]:
%%sql
-- External Table
DROP EXTERNAL TABLE IF EXISTS abalone_external;
CREATE EXTERNAL WEB TABLE abalone_external(
    sex text
    , length float8
    , diameter float8
    , height float8
    , whole_weight float8
    , shucked_weight float8
    , viscera_weight float8
    , shell_weight float8
    , rings integer -- target variable to predict
) location('http://archive.ics.uci.edu/ml/machine-learning-databases/abalone/abalone.data')
format 'CSV'
(null as '?');

In [None]:
%%sql
-- Create abalone table from an external table
DROP TABLE IF EXISTS abalone;
CREATE TABLE abalone AS (
    SELECT ROW_NUMBER() OVER() AS id, *
    FROM abalone_external
) DISTRIBUTED BY (sex);

# Import preparation

In [1]:
import greenplumpython as gp

We haven't decided yet how to implement this display functionality, so for the moment just put this function in the cell.

In [2]:
from tabulate import tabulate
def display(table: gp.Table):
    return tabulate(table.fetch(), headers="keys", tablefmt="html")

## Data preparation

In [3]:
db = gp.database(host="localhost", dbname="postgres", user="gpadmin", password="....")

# Data Exploration

Get access to existed table "abalone"

In [4]:
abalone = gp.table("abalone", db)

Take a look on table

In [5]:
# SELECT * FROM abalone ORDER BY id LIMIT 5;

display(abalone.top(5, ["id"]))

id,sex,length,diameter,height,whole_weight,shucked_weight,viscera_weight,shell_weight,rings
1,M,0.455,0.365,0.095,0.514,0.2245,0.101,0.15,15
2,M,0.35,0.265,0.09,0.2255,0.0995,0.0485,0.07,7
3,F,0.53,0.42,0.135,0.677,0.2565,0.1415,0.21,9
4,M,0.44,0.365,0.125,0.516,0.2155,0.114,0.155,10
5,I,0.33,0.255,0.08,0.205,0.0895,0.0395,0.055,7


Observe the distribution of data on different segments : apply aggregate on columns

In [6]:
# SELECT gp_segment_id, COUNT(*) 
# FROM abalone
# GROUP BY 1
# ORDER BY gp_segment_id;

count = gp.aggregate("count", db) # -- Get access to existing aggregate in Greenplum
display(count(abalone["id"], group_by=["gp_segment_id"]).to_table())

count,gp_segment_id
1307,2
2870,1


# Train Test set split

GreenplumPython can also do what MADlib can do, for example, Train Test set to split.
Here is one of the possibilities to achieve it.

Firstly, we fetch a random value between 0 and 1 to each row. 

In [7]:
# SELECT *, random() AS __samp_out_label FROM abalone

temp_abalone_label = abalone[["*", "random() AS __samp_out_label"]]

Then we create a percentile table that stores percentile values for each sex.

In [8]:
# SELECT sex, percentile_disc(0.8) with GROUP (ORDER BY __samp_out_label) AS __samp_out_label
# FROM temp_abalone_label GROUP BY sex

percentile_disc = gp.ordered_aggregate("percentile_disc", db=db)
temp_abalone_train_perc = percentile_disc(
    0.8,
    group_by=["sex"],
    order_by=[temp_abalone_label["__samp_out_label"]],
    as_name="__samp_out_label",
).to_table()

In [9]:
temp_abalone_test_perc = percentile_disc(
    0.2,
    group_by=["sex"],
    order_by=[temp_abalone_label["__samp_out_label"]],
    as_name="__samp_out_label",
).to_table()

Finally, we join those 2 tables to obtain our training or test tables.

In [10]:
# SELECT * 
# FROM temp_abalone_label
# INNER JOIN temp_abalone_train_perc
# ON temp_abalone_label.__samp_out_label <= temp_abalone_train_perc.__samp_out_label
# AND temp_abalone_label.sex = temp_abalone_train_perc.sex

abalone_train = temp_abalone_label.inner_join(
    temp_abalone_train_perc,
    (temp_abalone_label["__samp_out_label"] <= temp_abalone_train_perc["__samp_out_label"]) 
    & (temp_abalone_label["sex"] == temp_abalone_train_perc["sex"]),
    targets=[temp_abalone_label["*"]]
)
display(count(abalone_train["*"], group_by=["sex"]).to_table())

count,sex
1012,F
1208,M
1092,I


In [11]:
abalone_test = temp_abalone_label.inner_join(
    temp_abalone_test_perc,
    (temp_abalone_label["__samp_out_label"] <= temp_abalone_test_perc["__samp_out_label"]) & (temp_abalone_label["sex"] == temp_abalone_test_perc["sex"]),
    targets=[temp_abalone_label["*"]]
)
display(count(abalone_test["*"], group_by=["sex"]).to_table())

count,sex
297,F
324,M
293,I


**gp.ordered_aggregate is not yet been included in beta version 2**

# Execute the OLS Linear Regression Function by 'sex'

In [12]:
from typing import List

#CREATE TYPE plc_linreg_type AS (
#    col_nm text[]
#    , coef float8[]
#    , intercept float8
#    , serialized_linreg_model bytea
#    , created_dt text
#);

class PlcLinregType:
        col_nm: List[str]
        coef: List[float]
        intercept: float
        serialized_linreg_model: bytes
        created_dt: str
            
# -- Create function
# -- Need to specify the return type -> API will create the corresponding type in Greenplum to return a row
# -- Will add argument to change language extensions, currently plpython3u by default

@gp.create_array_function
def plc_linreg_func(length: List[float], shucked_weight: List[float], rings: List[int]) -> PlcLinregType:
        from sklearn.linear_model import LinearRegression
        import numpy as np

        X = np.array([length, shucked_weight]).T
        y = np.array([rings]).T

        # OLS linear regression with length, shucked_weight
        linreg_fit = LinearRegression().fit(X, y)
        linreg_coef = linreg_fit.coef_
        linreg_intercept = linreg_fit.intercept_

        # Serialization of the fitted model
        import six; import datetime
        pickle = six.moves.cPickle
        serialized_linreg_model = pickle.dumps(linreg_fit, protocol=2)

        return {
                'col_nm': ['length', 'shucked_weight'],
                'coef': linreg_coef[0],
                'intercept': linreg_intercept[0],
                'serialized_linreg_model': serialized_linreg_model,
                'created_dt': str(datetime.datetime.now())
        }

In [13]:
# DROP TABLE IF EXISTS plc_linreg_fitted;
# CREATE TABLE plc_linreg_fitted AS (
#    SELECT
#        a.sex
#        , (plc_linreg_func(
#            a.length_agg
#            , a.shucked_weight_agg
#            , a.rings_agg)
#        ).*
#    FROM (
#        SELECT
#            sex
#            , ARRAY_AGG(length) AS length_agg
#            , ARRAY_AGG(shucked_weight) AS shucked_weight_agg
#            , ARRAY_AGG(rings) AS rings_agg
#        FROM abalone_split
#        WHERE split = 1
#        GROUP BY sex
#    ) a
#) DISTRIBUTED BY (sex);

plc_linreg_fitted = plc_linreg_func(
                                abalone_train["length"],
                                abalone_train["shucked_weight"],
                                abalone_train["rings"], group_by=["sex"]
).to_table()

In [14]:
display(plc_linreg_fitted[["sex", "col_nm", "coef", "intercept", "created_dt"]])

sex,col_nm,coef,intercept,created_dt
F,"['length', 'shucked_weight']","[25.655445083760167, -8.681278374889335]",0.180538,2022-07-07 17:39:34.610881
M,"['length', 'shucked_weight']","[23.788185330232736, -6.668176274823593]",0.303382,2022-07-07 17:39:34.611556
I,"['length', 'shucked_weight']","[15.306253695675569, 0.6961453148775523]",1.23322,2022-07-07 17:39:34.616763


We have two versions of union:
- The first one we can define two target lists for two tables to join inside the union function (if we need to rename columns or select specific rows)
- The second one we don't support target list selection, the user needs to do all the preprocessing on two tables before union them (rename, column selection, ...)

In [15]:
# SELECT sex, UNNEST(col_nm) AS col_nm, UNNEST(coef) AS coef
# FROM plc_linreg_fitted
# UNION
# SELECT sex, 'intercept' AS col_nm, intercept AS coef
# FROM plc_linreg_fitted;

unnest = gp.function("unnest", db)

display(
        plc_linreg_fitted.union(
                plc_linreg_fitted,
                my_targets = [
                        plc_linreg_fitted["sex"],
                        unnest(plc_linreg_fitted["col_nm"]).rename("col_nm"),
                        unnest(plc_linreg_fitted["coef"]).rename("coef"),
                ],
                other_targets = [
                        plc_linreg_fitted["sex"],
                        "'intercept' AS col_name",
                        plc_linreg_fitted["intercept"].rename("coef"),
                ]
        )
)

sex,col_nm,coef
F,shucked_weight,-8.11133
I,shucked_weight,0.486813
F,length,25.6108
M,length,23.1582
M,intercept,0.280546
I,intercept,1.10943
M,shucked_weight,-6.57603
I,length,14.9034
F,intercept,0.144295


In [16]:
# SELECT sex, UNNEST(col_nm) AS col_nm, UNNEST(coef) AS coef
# FROM plc_linreg_fitted
# UNION
# SELECT sex, 'intercept' AS col_nm, intercept AS coef
# FROM plc_linreg_fitted;

plc_linreg_fitted_1 = plc_linreg_fitted[[
                        plc_linreg_fitted["sex"],
                        unnest(plc_linreg_fitted["col_nm"]).rename("col_nm"),
                        unnest(plc_linreg_fitted["coef"]).rename("coef")]
                ]
plc_linreg_fitted_2 = plc_linreg_fitted[[
                        plc_linreg_fitted["sex"],
                        "'intercept' AS col_name",
                        plc_linreg_fitted["intercept"].rename("coef")]
                ]
display(
        plc_linreg_fitted_1.union(
                plc_linreg_fitted_2
        )
)

sex,col_nm,coef
I,shucked_weight,-0.215205
M,shucked_weight,-6.88948
M,length,23.9315
I,intercept,1.04682
F,intercept,0.239293
M,intercept,0.0312261
F,shucked_weight,-8.39858
I,length,15.9479
F,length,25.6955


 # Prediction

## Currently can't support, because function can take care only one table at the time

In [17]:
@gp.create_array_function
def plc_linreg_pred_func(serialized_model: bytes, features: List[float]) -> List[float]:
        # Deserialize the serialized model
        import six
        pickle = six.moves.cPickle
        model = pickle.loads(serialized_model)

        # Predict the target variable
        y_pred = model.predict([features])

        return y_pred[0]

In [18]:
plc_linreg_pred = plc_linreg_pred_func(
                                plc_linreg_fitted["serialized_linreg_model"],
                                abalone_test["length"],
                                abalone_test["shucked_weight"],
                                group_by=["sex"]
).to_table()

Exception: Cannot pass arguments from more than one tables