In [14]:
import requests
import flask
import pandas as pd
import numpy as np
import logging
from flask_sqlalchemy import SQLAlchemy

# Init database

In [15]:
from sqlalchemy.dialects.postgresql import JSONB, ARRAY, JSON
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy import Table, ForeignKey, Text
from sqlalchemy import create_engine, Column, Integer, MetaData
from sqlalchemy.engine.url import URL
from sqlalchemy.orm import relationship, backref, sessionmaker

In [16]:
with open("config.txt", "r") as f:
    DATABASE = eval(f.read())

engine = create_engine(URL(**DATABASE))

In [17]:
Base = declarative_base()

In [18]:
class Models(Base):
    __tablename__ = 'models'
    id = Column(Integer, primary_key=True)
    model = Column(JSONB)
    cv_results = Column(JSONB)
    status = Column(Text)
    datasets_id = Column(
        Integer,
        ForeignKey('datasets.id'), unique=True
    )
    datasets = relationship('Datasets',
                            backref=backref('models', uselist=False))


class Datasets(Base):
    __tablename__ = 'datasets'
    id = Column(Integer, primary_key=True)
    data = Column(Text)
    target = Column(Text)
    n_folds = Column(Text)
    fit_intercept = Column(Text)
    l2_coef = Column(Text)

In [19]:
Base.metadata.create_all(engine)
Session = sessionmaker(bind=engine)
session = Session()

# Backend methods

## Parallel db-checker-trainer

In [20]:
from multiprocessing import Process
from time import sleep


def check_models_db():
    for model in session.query(Models):
        if model.status == "new":
            m_str = "'check_model_db' find new model with id: "
            logging.debug(m_str + str(model.id))
            new_model = Models(id=model.id,
                               status="train")
            session.query(Models)\
                .filter_by(id=model.id)\
                .update({"status": "train"})
            session.commit()
            m_str = "script set 'train' status for model with id: "
            logging.debug(m_str + str(model.id))
            X_train, Y_train = get_train_data(model.datasets.data,
                                              model.datasets.target)
            fit_inter = eval(model.datasets.fit_intercept)
            n_folds = eval(model.datasets.n_folds)
            alpha_arr = eval(model.datasets.l2_coef)
            res = train_kfold(X_train,
                              Y_train,
                              k_folds=n_folds,
                              fit_intercept=fit_inter,
                              alpha_arr=alpha_arr)
            session.query(Models)\
                .filter_by(id=model.id)\
                .update({"model": res["model"],
                         "cv_results": res["cv_results"],
                         "status": "ready"})
            m_str = "script set 'ready' status for model with id: "
            logging.debug(m_str + str(model.id))
            session.commit()


def train_script():
    while True:
        sleep(10)
        check_models_db()
        m_str = "'train_script' check new models once in 10 sec."
        logging.debug(m_str)

In [21]:
from io import StringIO


def data_checker(data, target):
    data = StringIO(data)
    logging.debug("'data_cheker' working\n")
    try:
        df = pd.read_csv(data, sep=",")
    except Exception:
        return "'read_csv' end with error"
    if target not in df.columns:
        return "'target' column noi in data"
    if df.isnull().values.any():
        return "'data' contain NaN"
    return None

In [22]:
from io import StringIO
from sklearn.linear_model import Ridge
from sklearn.model_selection import KFold
from sklearn.model_selection import GridSearchCV
from sklearn.metrics import mean_squared_error, make_scorer
from sklearn import datasets
import sklearn

from sklearn.linear_model import LinearRegression


def get_free_id():
    count = 1
    idx_arr = [i[0] for i in session.query(Models.id).all()]
    while count in idx_arr:
        count += 1
    return count


def get_train_data(data, target):
    data = StringIO(data)
    df = pd.read_csv(data, sep=",")
    df = df.reindex(sorted(df.columns), axis=1)
    y_train = df[target].to_frame()
    X_train = df.drop(target, axis=1)
    return X_train, y_train


def predict_from_model(data, model_db):
    m_str = "'predict_from_model' with id: "
    logging.debug(m_str + str(model_db.id))
    data = StringIO(data)
    df = pd.read_csv(data, sep=",")
    X_predict = df.reindex(sorted(df.columns), axis=1)
    fit_inter = eval(model_db.datasets.fit_intercept)

    mdl = Ridge()
    if fit_inter:
        mdl.intercept_ = model_db.model["intercept"]
    coef_arr = []
    m_str = "'predict_from_model' with id: "
    logging.debug(m_str + str(model_db.model))
    for feature in X_predict.columns:
        for name_coef, val in model_db.model["coef"].items():
            if feature == name_coef:
                coef_arr.append(val)
    if len(coef_arr) != len(X_predict.columns):
        m_str = "data for prediction is not valid for this model"
        return {"error": m_str}
    if df.isnull().values.any():
        return {"error": "'data' contain NaN"}
    mdl.coef_ = np.array(coef_arr)
    res = mdl.predict(X_predict)
    m_str = "end 'predict_from_model' with id: "
    logging.debug(m_str + str(model_db.id))
    return {"result": str(res)}


def train_kfold(X_train_df,
                y_train_df,
                k_folds=2,
                fit_intercept=True,
                alpha_arr=[1, 0.2, 0.1, 0.01, 0.001]):
    logging.debug("'train_kfold' function is progress...")
    X_train, y_train = np.array(X_train_df), np.array(y_train_df)
    parameters = [{'fit_intercept': [fit_intercept],
                   'alpha': alpha_arr}]
    clf = GridSearchCV(Ridge(),
                       parameters,
                       cv=k_folds,
                       scoring=make_scorer(
                           mean_squared_error,
                           greater_is_better=False))
    clf.fit(X_train, y_train)
    best_model = clf.best_estimator_
    result_json = {}
    result_json["model"] = {}
    result_json["model"]["intercept"] = best_model.intercept_[0]
    result_json["model"]["coef"] = {}
    for ind, coef in enumerate(best_model.coef_[0]):
        ind_buf = X_train_df.columns[ind]
        result_json["model"]["coef"][ind_buf] = coef
    result_json["cv_results"] = {}
    for ind, alp in enumerate(alpha_arr):
        m_dict = {
            "mean_mse": -clf.cv_results_['mean_test_score'][ind]
        }
        result_json["cv_results"][alp] = m_dict
    logging.debug("'train_kfold' function end")
    return result_json

# Flask Server

##### set logger parameters to level=DEBUG

In [23]:
import logging

logger = logging.getLogger()
logger.setLevel(logging.DEBUG)
logging.debug("test")

DEBUG:root:test


In [None]:
from flask import Flask, jsonify, request, render_template
import threading


app = Flask(__name__)
app.logger.setLevel(logging.DEBUG)


@app.route('/model/')
@app.route('/model/<id>', methods=['GET'])
def model_info(id=None):
    logging.debug("'GET' branch in model/<id>")
    model_db = session.query(Models).get(id)
    if model_db is not None:
        return jsonify(response={"model": model_db.model,
                                 "cv_results": model_db.cv_results})
    return jsonify(response={"error": "Not valid model_id"})


@app.route('/model/<id>/predict', methods=['GET', 'POST'])
def model(id=None):
    if request.method == 'GET':
        logging.debug("'GET' branch in /model/<id>/predict")
        m_dict = {"'/model/<id>/predict' status": "work"}
        return jsonify(response = m_dict)
    if request.method == 'POST':
        logging.debug("'POST' branch in /model/<id>/predict")
        model_db = session.query(Models).get(id)
        if model_db is not None:
            m_str = "model with id : {} in DB".format(model_db.id)
            logging.debug(m_str)
            if model_db.status == "train":
                m_dict = {"error":
                          "model already in training stage"}
                return jsonify(response=m_dict)
            if model_db.status == "new":
                m_dict = {"error":
                          "model in line for training"}
                return jsonify(response=m_dict)
            else:
                res = predict_from_model(
                    request.form["data"],
                    model_db
                )
        return jsonify(response=res)
    return jsonify(response={"error": "Not valid model_id"})


@app.route('/hello', methods=['GET'])
def hello_world():
    arr = {}
    arr["blah"] = []
    arr["blah"].append("stuff")
    return jsonify(response=arr)


@app.route('/train', methods=['GET', 'POST'])
def train():
    if request.method == 'GET':
        logging.debug("'GET' branch in app/train")
        return jsonify(response={"status": "work"})
    if request.method == 'POST':
        logging.debug("'POST' branch in app/train")
        error = data_checker(
            request.form["data"],
            request.form["target"]
        )
        if error != None:
            logging.error("'data_checker' end with error")
            return jsonify(response={'error': error})
        model_id = get_free_id()
        
        d_obj = Datasets(id=model_id,
                         data=request.form["data"],
                         target=request.form["target"],
                         n_folds=str(request.form["n_folds"]),
                         fit_intercept=str(
                             request.form["fit_intercept"]
                         ),
                         l2_coef=str(request.form["l2_coef"]))
        new_model = Models(id=model_id,
                           model={},
                           cv_results={},
                           status="new",
                           datasets_id=model_id,
                           datasets=d_obj)
        session.add(new_model)
        session.commit()
        logging.debug("'POST' commit after checker")
    return jsonify(response={"model_id": model_id})
        
if __name__ == '__main__':
    p = Process(target=train_script)
    p.start()
    app.run()

INFO:werkzeug: * Running on http://127.0.0.1:5000/ (Press CTRL+C to quit)
DEBUG:root:'train_script' check new models once in 10 sec.
INFO:werkzeug:127.0.0.1 - - [01/May/2020 15:18:15] "[37mGET /hello HTTP/1.1[0m" 200 -
DEBUG:root:'GET' branch in app/train
INFO:werkzeug:127.0.0.1 - - [01/May/2020 15:18:15] "[37mGET /train HTTP/1.1[0m" 200 -
DEBUG:root:'POST' branch in app/train
DEBUG:root:'data_cheker' working

DEBUG:root:'POST' commit after checker
INFO:werkzeug:127.0.0.1 - - [01/May/2020 15:18:16] "[37mPOST /train HTTP/1.1[0m" 200 -
DEBUG:root:'check_model_db' find new model with id: 11
DEBUG:root:script set 'train' status for model with id: 11
DEBUG:root:'train_kfold' function is progress...
DEBUG:root:'train_kfold' function end
DEBUG:root:script set 'ready' status for model with id: 11
DEBUG:root:'train_script' check new models once in 10 sec.
DEBUG:root:'GET' branch in model/<id>
INFO:werkzeug:127.0.0.1 - - [01/May/2020 15:18:20] "[37mGET /model/11 HTTP/1.1[0m" 200 -
DEBUG: