In [0]:
pip install mlflow

Python interpreter will be restarted.
Collecting mlflow
  Using cached mlflow-2.4.1-py3-none-any.whl (18.1 MB)
Collecting gunicorn<21
  Using cached gunicorn-20.1.0-py3-none-any.whl (79 kB)
Collecting docker<7,>=4.0.0
  Using cached docker-6.1.3-py3-none-any.whl (148 kB)
Collecting pyyaml<7,>=5.1
  Using cached PyYAML-6.0-cp39-cp39-manylinux_2_5_x86_64.manylinux1_x86_64.manylinux_2_12_x86_64.manylinux2010_x86_64.whl (661 kB)
Collecting gitpython<4,>=2.1.0
  Using cached GitPython-3.1.31-py3-none-any.whl (184 kB)
Collecting cloudpickle<3
  Using cached cloudpickle-2.2.1-py3-none-any.whl (25 kB)
Collecting alembic!=1.10.0,<2
  Using cached alembic-1.11.1-py3-none-any.whl (224 kB)
Collecting Flask<3
  Using cached Flask-2.3.2-py3-none-any.whl (96 kB)
Collecting markdown<4,>=3.3
  Using cached Markdown-3.4.3-py3-none-any.whl (93 kB)
Collecting querystring-parser<2
  Using cached querystring_parser-1.2.4-py2.py3-none-any.whl (7.9 kB)
Collecting importlib-metadata!=4.7.0,<7,>=3.7.0
  Using c

In [0]:
from pyspark.sql.functions import to_date,col,avg
from pyspark.sql.types import LongType, StringType, StructField, StructType, BooleanType, ArrayType, IntegerType, DoubleType, FloatType, TimestampType
import pyspark.sql.functions as F
from pyspark.sql import Window
from pprint import pprint as pp
import pandas as pd
import json
from urllib.request import  urlopen
import requests
import statsmodels.api as sm



#Read TrainTestData

In [0]:
TrainDf = spark. \
  read. \
  parquet('wasbs://gpluse-cluster-2@bovianalytics.blob.core.windows.net/Projects/ChenYoungYan/11022022/Output/0405Split/TrainTestData/TrainDf50/')

TestDf = spark. \
  read. \
  parquet('wasbs://gpluse-cluster-2@bovianalytics.blob.core.windows.net/Projects/ChenYoungYan/11022022/Output/0405Split/TrainTestData/TestDf50/')

In [0]:
TrainDf.agg(F.count('*'),
    F.countDistinct('HerdIdentifier').alias('NumberOfHerds'),
    F.countDistinct('HerdIdentifier','AnimalIdentifier').alias('NumberOfCows')).\
    display()

TestDf.agg(F.count('*'),
    F.countDistinct("HerdIdentifier").alias('NumberOfHerds'),
    F.countDistinct("HerdIdentifier","AnimalIdentifier").alias('NumberOfCows')).\
    display()

count(1),NumberOfHerds,NumberOfCows
17902,83,12538


count(1),NumberOfHerds,NumberOfCows
4521,80,4133


In [0]:
x_train = TrainDf.select("features").\
    rdd.flatMap(lambda x: x).collect()
y_train = TrainDf.select("Decay305Vetor").\
    rdd.flatMap(lambda x: x).collect()

x_test = TestDf.select("features").\
    rdd.flatMap(lambda x: x).collect()
y_test = TestDf.select("Decay305Vetor").\
    rdd.flatMap(lambda x: x).collect()

featureList =['MultiparousCow','Autumn','Spring','Summer','Magnitude','TimeToPeakYield','Offset','Decay','TestDayMilkYield','AgeInMonths','HM305','MeanMagnitude', 'MeanTimeToPeakYield', 'MeanOffset', 'MeanDecay']

# check reference

In [0]:
sorted(set([(i[0], i[1]) for i in TrainDf.select("CalvingSeason", "CalvingSeasonEncode").collect()]),
    key=lambda x: x[0])

Out[4]: [('Autumn', SparseVector(3, {0: 1.0})),
 ('Spring', SparseVector(3, {1: 1.0})),
 ('Summer', SparseVector(3, {2: 1.0})),
 ('Winter', SparseVector(3, {}))]

In [0]:
sorted(set([(i[0], i[1]) for i in TestDf.select("CalvingSeason", "CalvingSeasonEncode").collect()]),
    key=lambda x: x[0])

Out[5]: [('Autumn', SparseVector(3, {0: 1.0})),
 ('Spring', SparseVector(3, {1: 1.0})),
 ('Summer', SparseVector(3, {2: 1.0})),
 ('Winter', SparseVector(3, {}))]

In [0]:
sorted(set([(i[0], i[1]) for i in TrainDf.select("ParityGroup","ParityGroupEncode").collect()]),
    key=lambda x: x[0])

Out[6]: [('MultiparousCow', SparseVector(1, {0: 1.0})),
 ('PrimiparousCow', SparseVector(1, {}))]

In [0]:
sorted(set([(i[0], i[1]) for i in TestDf.select("ParityGroup","ParityGroupEncode").collect()]),
    key=lambda x: x[0])

Out[7]: [('MultiparousCow', SparseVector(1, {0: 1.0})),
 ('PrimiparousCow', SparseVector(1, {}))]

# Model building

In [0]:
import mlflow
import statsmodels.api as sm
import matplotlib.pyplot as plt
from sklearn.model_selection import RandomizedSearchCV,GridSearchCV
import numpy as np
from sklearn.linear_model import LinearRegression,Ridge,Lasso
from sklearn.model_selection import cross_val_score, cross_validate
from sklearn.ensemble import RandomForestRegressor
from sklearn.neural_network import MLPRegressor
from sklearn.metrics import mean_squared_error, r2_score,mean_absolute_percentage_error,mean_absolute_error

## Linear Regression

In [0]:
X = sm.add_constant(x_train)
mod = sm.OLS(y_train,X)
res = mod.fit()
print(res.summary())

                            OLS Regression Results                            
Dep. Variable:                      y   R-squared:                       0.272
Model:                            OLS   Adj. R-squared:                  0.271
Method:                 Least Squares   F-statistic:                     445.7
Date:                Wed, 05 Jul 2023   Prob (F-statistic):               0.00
Time:                        15:19:49   Log-Likelihood:             1.0312e+05
No. Observations:               17902   AIC:                        -2.062e+05
Df Residuals:                   17886   BIC:                        -2.061e+05
Df Model:                          15                                         
Covariance Type:            nonrobust                                         
                 coef    std err          t      P>|t|      [0.025      0.975]
------------------------------------------------------------------------------
const          0.0022   4.03e-05     55.181      0.0

### train

In [0]:
with mlflow.start_run(run_name = 'Linear Regression') as run:
    
    
    # model
    
    lm = LinearRegression()
    lm.fit(x_train,y_train)
    coeffs = lm.coef_.tolist()[0]
    y_pred = lm.predict(x_train)
   
    # log parameters
    mlflow.log_param('data','train')
    mlflow.log_param('type','whole')
    
    plt.figure(figsize=(20,7))
    plt.bar(featureList,coeffs)
    plt.axhline(y=0, color='r', linestyle='--')
    plt.ylabel('β', fontsize=20,rotation=0)
    plt.xticks(rotation=25,fontsize= 13)
    plt.yticks(fontsize= 13)
    plt.savefig("β.png",bbox_inches='tight')
    plt.close()
    
     # log metrics
    mlflow.log_metric('rmse',mean_squared_error(y_train,y_pred, squared=False))
    mlflow.log_metric('r2',r2_score(y_train,y_pred))
    mlflow.log_metric('mape',mean_absolute_percentage_error(y_train,y_pred))
    mlflow.log_metric('mae',mean_absolute_error(y_train,y_pred))
    mlflow.log_artifact("β.png")

### test

In [0]:
with mlflow.start_run(run_name = 'Linear Regression') as run:
    
    # train the model
    
    lm = LinearRegression()
    lm.fit(x_train,y_train)
       
    # use the model to predict the test data
    
    y_test_pred = lm.predict(x_test)
   
    # log parameters
    mlflow.log_param('data','test')
    mlflow.log_param('type','whole')
    
     # log metrics
    mlflow.log_metric('rmse',mean_squared_error(y_test,y_test_pred, squared=False))
    mlflow.log_metric('r2',r2_score(y_test,y_test_pred))
    mlflow.log_metric('mape',mean_absolute_percentage_error(y_test,y_test_pred))
    mlflow.log_metric('mae',mean_absolute_error(y_test,y_test_pred))
    

## Ridge Regression

### train

#### CV

In [0]:

with mlflow.start_run(run_name = 'RidgeRegression') as run:
    
    
    # model
    
    RR = Ridge()
    
     # log parameters
    mlflow.log_param('type','CV')
    
    # cross validation
    #tag
    
    alpha_values = {'alpha':[i*0.1 for i in range(345,355,1)]}
   # alpha_values = {'alpha':[i for i in range(30,40,1)]}
    mlflow.set_tags(alpha_values)
    
    grid_search = GridSearchCV(RR, 
                           alpha_values,
                           scoring=["r2",'neg_root_mean_squared_error'],
                           cv=10,
                           refit='r2', 
                           return_train_score=True,
                           n_jobs=-1)
    
    grid_search.fit(x_train,y_train)
    

    # log parameters
    mlflow.log_param('best parameters',grid_search.best_params_)
    mlflow.log_param('best estimator',grid_search.best_estimator_)
    mlflow.log_param('best score',grid_search.best_score_)
    
    
    plt.plot(grid_search.cv_results_['mean_test_neg_root_mean_squared_error'].tolist())
    plt.savefig("test_rmse.png")
    plt.close()
    
    plt.plot(grid_search.cv_results_['mean_test_r2'].tolist())
    plt.savefig("test_r2.png")
    plt.close()
    
     # log artifact
    
    mlflow.log_artifact("test_r2.png")
    mlflow.log_artifact("test_rmse.png")    
    

#### train_whole

In [0]:
with mlflow.start_run(run_name = 'RidgeRegression') as run:
      
    # model
    RR = Ridge(**grid_search.best_params_)
    RR.fit(x_train,y_train)
    y_pred = RR.predict(x_train)
   
    # log parameters
    mlflow.log_param('data','train')
    mlflow.log_param('type','whole')
    
    
     # log metrics
    mlflow.log_metric('rmse',mean_squared_error(y_train,y_pred, squared=False))
    mlflow.log_metric('r2',r2_score(y_train,y_pred))
    mlflow.log_metric('mape',mean_absolute_percentage_error(y_train,y_pred))
    mlflow.log_metric('mae',mean_absolute_error(y_train,y_pred))

### test

In [0]:
with mlflow.start_run(run_name = 'Ridge Regression') as run:
    
    RR = Ridge(**grid_search.best_params_)
    
    # train the model
    
    RR = Ridge(**grid_search.best_params_)
    RR.fit(x_train,y_train)
       
    # use the model to predict the test data
    
    y_test_pred = RR.predict(x_test)
   
    # log parameters
    mlflow.log_param('type','test')
    
     # log metrics
    mlflow.log_metric('rmse',mean_squared_error(y_test,y_test_pred, squared=False))
    mlflow.log_metric('r2',r2_score(y_test,y_test_pred))
    mlflow.log_metric('mape',mean_absolute_percentage_error(y_test,y_test_pred))
    mlflow.log_metric('mae',mean_absolute_error(y_test,y_test_pred))
    

## Lasso Regression

### train

#### CV

In [0]:

with mlflow.start_run(run_name = 'LassoRegression') as run:
    
    # model
    
    LS = Lasso()
    
     # log parameters
    mlflow.log_param('type','CV')
    
    # cross validation
    
    alpha_values = {'alpha':[i*0.00000001 for i in range(1,100,10)]}
    mlflow.set_tags(alpha_values)
    
    grid_search_ls = GridSearchCV(LS, 
                           alpha_values,
                           scoring=["r2",'neg_root_mean_squared_error'],
                           cv=10,
                           refit='r2', 
                           return_train_score=True,
                           n_jobs=-1)
    
    grid_search_ls.fit(x_train,y_train)

    # log parameters
    mlflow.log_param('best parameters',grid_search_ls.best_params_)
    mlflow.log_param('best estimator',grid_search_ls.best_estimator_)
    mlflow.log_param('best score',grid_search_ls.best_score_)
    
    
    plt.plot(grid_search_ls.cv_results_['mean_test_neg_root_mean_squared_error'].tolist())
    plt.savefig("test_rmse.png")
    plt.close()
    
    plt.plot(grid_search_ls.cv_results_['mean_test_r2'].tolist())
    plt.savefig("test_r2.png")
    plt.close()
    
     # log artifact
    
    mlflow.log_artifact("test_r2.png")
    mlflow.log_artifact("test_rmse.png")
    

#### train_whole

In [0]:
with mlflow.start_run(run_name = 'LassoRegression') as run:
    
    mlflow.set_tags(grid_search_ls.best_params_)
    # model
    
    LS = Lasso(**grid_search_ls.best_params_)
    LS.fit(x_train,y_train)
    y_pred = LS.predict(x_train)
   
    # log parameters
    mlflow.log_param('data','train')
    mlflow.log_param('type','whole')
    
     # log metrics
    mlflow.log_metric('rmse',mean_squared_error(y_train,y_pred, squared=False))
    mlflow.log_metric('r2',r2_score(y_train,y_pred))
    mlflow.log_metric('mape',mean_absolute_percentage_error(y_train,y_pred))
    mlflow.log_metric('mae',mean_absolute_error(y_train,y_pred))

### test

In [0]:
with mlflow.start_run(run_name = 'LassoRegression') as run:
    
    # train the model
    mlflow.set_tags(grid_search_ls.best_params_)
    
    LS = Lasso(**grid_search_ls.best_params_)
    LS.fit(x_train,y_train)
    
    y_test_pred = LS.predict(x_test)
   
    # log parameters
    mlflow.log_param('type','test')
    
     # log metrics
    mlflow.log_metric('rmse',mean_squared_error(y_test,y_test_pred, squared=False))
    mlflow.log_metric('r2',r2_score(y_test,y_test_pred))
    mlflow.log_metric('mape',mean_absolute_percentage_error(y_test,y_test_pred))
    mlflow.log_metric('mae',mean_absolute_error(y_test,y_test_pred))
    

## Random Forest

### train

#### CV

In [0]:
#grid
with mlflow.start_run(run_name = 'RandomForest') as run:
    
    # model
    
    RF = RandomForestRegressor()
    
     # log parameters
    mlflow.log_param('type','CV')
        
   # params = [{'n_estimators':[i for i in range(10,200,10)],#Number of Trees
              # 'max_depth':[i for i in range(1,30,1)],# Tree Depth
             #   'max_features':[i for i in range(1,18,1)],
            #  'min_samples_split': [i for i in range(2,20,1)], #The minimum number of samples required to split an internal node
            #   'min_samples_leaf': [i for i in range(1,20,1)], #The minimum number of samples required to be at a leaf node
           
    
    params = [{'n_estimators':[96],#Number of Trees
              'max_depth':[8],# Tree Depth
            'max_features':[4],
           'min_samples_split': [3], #The minimum number of samples required to split an internal node
            'min_samples_leaf': [i for i in range(1,20,1)], #The minimum number of samples required to be at a leaf node
           
              }
             ]
    
    mlflow.set_tags(params[0])
    
    grid_search = GridSearchCV (RF, 
                           param_grid= params,
                           scoring=["r2",'neg_root_mean_squared_error'],
                           cv=10,        
                           refit='r2', 
                           return_train_score=True,
                           n_jobs=-1)
    
    grid_search.fit(x_train,np.ravel(y_train))
    
    # log parameters
    mlflow.log_param('best parameters',grid_search.best_params_)
    mlflow.log_param('best estimator',grid_search.best_estimator_)
    mlflow.log_param('best score',grid_search.best_score_)
    
    
    plt.plot(grid_search.cv_results_['mean_test_neg_root_mean_squared_error'].tolist())
    plt.savefig("test_rmse.png")
    plt.close()
    
    plt.plot(grid_search.cv_results_['mean_test_r2'].tolist())
    plt.savefig("test_r2.png")
    plt.close()
    
     # log artifact
    
    mlflow.log_artifact("test_r2.png")
    mlflow.log_artifact("test_rmse.png")


#### train_whole

In [0]:
with mlflow.start_run(run_name = 'RandomForest') as run:
    
    mlflow.set_tags(grid_search.best_params_)
    # model
    
    RF = RandomForestRegressor(**grid_search.best_params_)
    RF.fit(x_train,np.ravel(y_train))
    y_pred = RF.predict(x_train)
   
    # log parameters
    mlflow.log_param('data','train')
    mlflow.log_param('type','whole')
    
    
     # log metrics
    mlflow.log_metric('rmse',mean_squared_error(y_train,y_pred, squared=False))
    mlflow.log_metric('r2',r2_score(y_train,y_pred))
    mlflow.log_metric('mape',mean_absolute_percentage_error(y_train,y_pred))
    mlflow.log_metric('mae',mean_absolute_error(y_train,y_pred))   
    

### test

In [0]:
with mlflow.start_run(run_name = 'RandomForest') as run:
    
    RF = RandomForestRegressor(**grid_search.best_params_)
    RF.fit(x_train,np.ravel(y_train))
       
    # use the model to predict the test data
    
    y_test_pred = RF.predict(x_test)
   
    # log parameters
    mlflow.log_param('type','test')
    
     # log metrics
    mlflow.log_metric('rmse',mean_squared_error(y_test,y_test_pred, squared=False))
    mlflow.log_metric('r2',r2_score(y_test,y_test_pred))
    mlflow.log_metric('mape',mean_absolute_percentage_error(y_test,y_test_pred))
    mlflow.log_metric('mae',mean_absolute_error(y_test,y_test_pred))
    