In [None]:
!python -m pip install --user --upgrade pip #to install or upgrade pip

#use 'conda list' to obtian the versions of libraries
!pip3 install pandas==0.24.2 matplotlib==3.2.2 scipy==1.4.1 statsmodels==0.12.0 scikit-learn==0.23.1 lightgbm --user

In [None]:
!pip3 install kfp --upgrade --user # install kubeflow pipelines

In [None]:
#Check if the install was successful

!which dsl-compile

In [None]:
# Import Kubeflow SDK
import kfp  
import kfp.dsl as dsl  #dsl is for assembling the entire pipeline
import kfp.components as comp  #components builds individual components

# where the outputs are stored
data_path = "components" 

## DATA INJESTION

In [None]:
def data_injestion(data_path):
    import pickle #used to save the model
    import sys, subprocess; #sys is system libraries 
    subprocess.run([sys.executable, '-m', 'pip', 'install', 'pandas==0.24.2'])

    import pandas as pd
    
    train = pd.read_csv("pd.read_excel('https://github.com/olawalecodes/stage-f-11-food-delivery/blob/master/data/Data_Train.xlsx?raw=true") 
    test = pd.read_csv("pd.read_excel('https://github.com/olawalecodes/stage-f-11-food-delivery/blob/master/data/Data_Test.xlsx?raw=true")
    
    #Save the injested data as a pickle file to be used by the data tranformation component.
    with open(f'{data_path}/inj_data', 'wb') as f:
        pickle.dump((train,test), f)`

In [6]:
idata = data_injestion(data_path)

## DATA PREPROCESSING

In [7]:
def data_transformation(data_path):
    
    import sys, subprocess;
    subprocess.run([sys.executable, '-m', 'pip', 'install', 'pandas==0.24.2'])
    subprocess.run([sys.executable, '-m', 'pip', 'install', 'scikit-learn==0.23.1'])
    
    import pandas as pd
    import numpy as np
    import itertools
    import scipy
    from sklearn.metrics import accuracy_score
    
    import pickle

    
    # Load and unpack the test_data
    with open(f'{data_path}/inj_data','rb') as f:
        data = pickle.load(f)
    train, test = data
    
    combined_data = train.append(test, ignore_index=True,sort=False)
    combined_data.drop('Delivery_Time', inplace=True,axis=1)
    
    combined_data['Average_Cost'] = combined_data['Average_Cost'].str.replace("[^0-9]","")
    combined_data['Average_Cost'] = combined_data['Average_Cost'].str.strip()
    combined_data['Average_Cost']=pd.to_numeric(combined_data['Average_Cost'])
    combined_data['Average_Cost'] = combined_data['Average_Cost'].fillna(-999).astype(int)
    
    combined_data['Minimum_Order'] = combined_data['Minimum_Order'].str.replace("[^0-9]","")
    combined_data['Minimum_Order'] = combined_data['Minimum_Order'].str.strip()
    combined_data['Minimum_Order']=pd.to_numeric(combined_data['Minimum_Order'])
    combined_data['Minimum_Order'] = combined_data['Minimum_Order'].fillna(-999).astype(int)
    
    combined_data.Rating = combined_data.Rating.replace("NEW",-999)
    combined_data.Rating = combined_data.Rating.replace("-",-999)
    combined_data.Rating = combined_data.Rating.replace("Opening Soon",-999)
    combined_data.Rating = combined_data.Rating.replace("Temporarily Closed",-999)
    combined_data.Rating = combined_data.Rating.astype('float')
    
    combined_data.Votes = combined_data.Votes.replace("-",-999)
    combined_data.Votes = combined_data.Votes.astype('float')
    
    combined_data.Reviews = combined_data.Reviews.replace("-",-999)
    combined_data.Reviews = combined_data.Reviews.astype('float')
    
    combined_data['city'] = combined_data['Location'].apply(lambda x : np.char.strip(x.split(','))[-1])
    
    actual_city = {'Noida' : 'Noida', 
               'Gurgaon' : 'Gurgoan',
               'Gurgoan' : 'Gurgoan',
               'Mumbai CST Area' : 'Mumbai',
               'Mumbai Central' : 'Mumbai',
               'Mumbai' : 'Mumbai',
               'Pune' : 'Pune',
               'Maharashtra' : 'Pune',
               'Pune University' : 'Pune',
               'Timarpur' : 'Delhi',
               'Delhi' : 'Delhi',
               'Delhi Cantt.' : 'Delhi',
               'Delhi University-GTB Nagar' : 'Delhi',
               'India Gate' : 'Delhi',
               'Whitefield' : 'Banglore', 
               'Marathalli' : 'Banglore',
               'Majestic' : 'Banglore',
               'Bangalore' : 'Banglore',
               'Electronic City' : 'Banglore',
               'Hyderabad' : 'Hyderabad',
               'Begumpet' : 'Hyderabad',
               'Kolkata' : 'Kolkata'
               }
    combined_data['city'] = combined_data[['city']].applymap(actual_city.get)
    combined_data = combined_data.drop("Location", axis=1)
    
    cuisines = list(combined_data.Cuisines.apply(lambda x : x.split(",")))
    
    merged = list(itertools.chain.from_iterable(cuisines))
    merged = np.sort(np.unique(np.char.lstrip(merged)))
    
    cuisines_DF = pd.DataFrame(0, index=np.arange(len(combined_data)), columns = merged)
    combined_data.Cuisines[0:(1)].apply(lambda x : np.char.strip(x.split(","))).tolist()[0]
    cuisines_DF.rename(columns = {'Poké' : 'Poke'}, inplace = True)
    combined_data = pd.merge(combined_data, cuisines_DF, left_index=True, right_index=True)
    
    combined_data['Minimum_Order_Zero'] = np.where(combined_data['Minimum_Order'] == 0, 1, 0)

    combined_data['Reviews_by_Votes'] = combined_data['Reviews'] / combined_data['Votes']
    combined_data['Minimum_Order_to_Cost'] = combined_data['Minimum_Order'] / combined_data['Average_Cost']
    #mapping the number of restaurants for each city
    combined_data["num_of_restaurants_city"] = combined_data["city"].map(combined_data.groupby("city").Restaurant.nunique())
    #number of restaurants for each city 
    combined_data["Restaurant_branch_count"] = combined_data["Restaurant"].map(combined_data["Restaurant"].value_counts())
    #combined_data['votes_review_rating'] = combined_data.Votes*combined_data.Reviews*combined_data.Rating
    
    target = "Delivery_Time"
    train_target = train[target]
    # Converting the classes to integer values. 
    # Since this is a multi class classfication problem. The class mapping will be useful when ensembling various models.
    train_target = train_target.apply(lambda x: x.split()[0]).astype(int)
    class_map = {}
    class_map_rev = {}
    for a,b in enumerate(sorted(train_target.unique())):
        class_map[b] = a
        class_map_rev[a] = b
    
    train_target = train_target.map(class_map)
    
    num_cols = ['Votes', 'Reviews', 'Rating', 'Average_Cost', 'Minimum_Order', 
            'Restaurant_branch_count', 'num_of_restaurants_city', 'Reviews_by_Votes', 'Minimum_Order_to_Cost']
    cat_cols = [col for col in combined_data.columns if col not in num_cols]
    features = pd.get_dummies(combined_data.drop(num_cols, axis=1), columns=cat_cols, sparse=True)
    
    features = features.sparse.to_coo()2
    num_features=scipy.sparse.coo_matrix(combined_data[num_cols].values)
    features=scipy.sparse.hstack([features, num_features]).tocsr()
    
    train_ohe = features[:train.shape[0], :]
    test_ohe = features[train.shape[0]:, :]
    
    from sklearn.model_selection import train_test_split
    X_train, X_test, y_train, y_test = train_test_split(train_ohe, train_target, test_size=0.20, random_state=314, stratify=train_target)
    
    #Save the preprocessed data as a pickle file to be used by the model component.
    with open(f'{data_path}/preprocessed_data', 'wb') as f: # dumps the dataframe 
        pickle.dump((X_train, X_test, y_train, y_test), f)

Unnamed: 0,Restaurant,Location,Cuisines,Average_Cost,Minimum_Order,Rating,Votes,Reviews
0,ID_2842,"Mico Layout, Stage 2, BTM Layout,Bangalore","North Indian, Chinese, Assamese",₹350,₹50,4.2,361,225
1,ID_730,"Mico Layout, Stage 2, BTM Layout,Bangalore","Biryani, Kebab",₹100,₹50,NEW,-,-
2,ID_4620,"Sector 1, Noida",Fast Food,₹100,₹50,3.6,36,16
3,ID_5470,"Babarpur, New Delhi, Delhi","Mithai, North Indian, Chinese, Fast Food, Sout...",₹200,₹50,3.6,66,33
4,ID_3249,"Sector 1, Noida","Chinese, Fast Food",₹150,₹50,2.9,38,14
5,ID_506,"Yerawada, Pune, Maharashtra","North Indian, Chinese",₹100,₹50,3.4,16,11
6,ID_8321,"Raja Bazar, Kolkata","South Indian, Fast Food",₹200,₹50,3.3,72,10
7,ID_4559,"Sector 3, Marathalli","Kerala, South Indian, Chinese",₹150,₹50,3.8,46,29
8,ID_7982,"Sector 1, Noida",North Indian,₹200,₹50,NEW,-,-
9,ID_2869,"D-Block, Sector 63, Noida",Mithai,₹200,₹50,3.2,5,2


In [None]:
new_data = data_transformation(data_path)

##  MODEL TRAINING 

In [None]:
def train(data_path):
    import sys, subprocess;
    subprocess.run([sys.executable, '-m', 'pip', 'install', 'pandas==0.24.2'])
    subprocess.run([sys.executable, '-m', 'pip', 'install', 'scikit-learn==0.23.1'])
    
    import pickle
    import pandas as pd
    import numpy as np
    from sklearn.ensemble import RandomForestClassifier    
    import lightgbm as lgb
    from scipy.stats import hmean
    
    with open(f'{data_path}/preprocessed_data','rb') as f:
        data = pickle.load(f)
    
    X_train, X_test, y_train, y_test=data
    
    lgb_fit_params={"early_stopping_rounds":50, 
                "eval_metric" : 'multi_logloss', 
                "eval_set" : [(X_test,y_test)],
                'eval_names': ['valid'],
                'verbose':100
               }

    lgb_params = {'boosting_type': 'gbdt',
     'objective': 'multiclass',
     'metric': 'multi_logloss',
     'verbose': 0,
     'bagging_fraction': 0.8,
     'bagging_freq': 1,
     'num_class': 7,
     'feature_fraction': 0.8,
     'lambda_l1': 0.01,
     'lambda_l2': 0.01,
     'learning_rate': 0.1,
     'max_bin': 255,
     'max_depth': -1,
     'min_data_in_bin': 1,
     'min_data_in_leaf': 1,
     'num_leaves': 31}
    
    #light gbm classifier on the split data
    clf_lgb = lgb.LGBMClassifier(n_estimators=10000, **lgb_params, random_state=123456789, n_jobs=-1)
    clf_lgb.fit(X_train, y_train, **lgb_fit_params)
    clf_lgb.best_iteration_
    
    #light gbm classifier on the full data
    clf_lgb_fulldata = lgb.LGBMClassifier(n_estimators=int(clf_lgb.best_iteration_*1.2), **lgb_params)
    clf_lgb_fulldata.fit(train_ohe, train_target)
    
    #prediction of the lgbm model fit on the split data
    pred = clf_lgb.predict_proba(X_test)
    pred = np.argmax(pred,axis=1)
    
    #random forest model with 2000 estimators fit on the full data
    %%time
    clf_rf_fulldata=RandomForestClassifier(n_estimators=2000, max_features=0.1)
    clf_rf_fulldata.fit(train_ohe, train_target)
    
    #random forest model with 1000 estimators fit on the full data
    clf_rf2_fulldata=RandomForestClassifier(n_estimators=1000, max_features=0.1)
    clf_rf2_fulldata.fit(train_ohe, train_target)
    
    #random forest model with 1000 estimators fit on the split data
    clf_rf2=RandomForestClassifier(n_estimators=1000, max_features=0.1)
    clf_rf2.fit(X_train, y_train)
    
    #prediction of the randomforest model(1000 estimators) fit on the split data
    pred_rf2_split = clf_rf2.predict_proba(X_test)
    pred_rf2_split = np.argmax(pred_rf2_split,axis=1)
    print(accuracy_score(pred_rf2_split, y_test))
    
    #random forest model with 2000 estimators fit on the split data
    %%time
    clf_rf=RandomForestClassifier(n_estimators=2000, max_features=0.1)
    clf_rf.fit(X_train, y_train)
    
    #prediction of the randomforest model(2000 estimators) fit on the split data
    pred_rf_split = clf_rf.predict_proba(X_test)
    pred_rf_split = np.argmax(pred_rf_split,axis=1)
    
    #SAVE THE 2 MODELS TO BE USED FOR ENSEMBLING
    
    with open(f'{data_path}/lgbmodel', 'wb') as f:  #save model
        pickle.dump(clf_lgb, f)
    with open(f'{data_path}/rfmodel', 'wb') as f:  #save model
        pickle.dump(clf_rf2, f)

    
    #ENSEMBLING
    #ensemble of the random forrest and lgbm model on the split data
    pred_ensemble1 = np.mean((clf_lgb.predict_proba(X_test),
                       clf_rf2.predict_proba(X_test)), axis=0)
    pred_ensemble1 = np.argmax(pred_ensemble1, axis=1)
    
    #ensemble using hmean of the random forrest and lgbm model on the split data
    pred_ensemble2 = hmean((np.clip(clf_lgb.predict_proba(X_test), 0.001, 1),
                         np.clip(clf_rf2.predict_proba(X_test), 0.001, 1)), axis=0)
    pred_ensemble2 = np.argmax(pred_ensemble2, axis=1)
    
    #ensemble using hmean of the random forrest and lgbm model on the test data
    prediction = hmean((np.clip(clf_lgb_fulldata.predict_proba(test_ohe), 0.001, 1),
                         np.clip(clf_rf2_fulldata.predict_proba(test_ohe), 0.001, 1)), axis=0)
    prediction = np.argmax(prediction, axis=1)
    
    submission = pd.DataFrame({target: pd.Series(prediction).map(class_map_rev).apply(lambda x: str(x)+" minutes")})
    print(submission)
    #submission.to_excel('Result.xlsx', index=False)
    
    with open(f'{data_path}/final_data', 'wb') as f:  #save final predicted data
        pickle.dump((submission, pred_ensemble2, ytest ), f)

In [None]:
built = train(data_path)

## MODEL VALIDATION

In [None]:
def validate(data_path):
    import sys, subprocess;
    subprocess.run([sys.executable, '-m', 'pip', 'install', 'pandas==0.24.2'])
    import pandas as pd
    from math import sqrt
    import numpy as np
    import pickle
    
     
    with open(f'{data_path}/final_data','rb') as f:  
        data = pickle.load(f)
    submission, pred_ensemble2, ytest = data
    
    evaluation=accuracy_score(pred_ensemble2, y_test)
    
     with open(f'{data_path}/result.txt', 'w') as f:  #saved as a text file , note 'w'
        f.write(" Model Accuracy: {}".format(evaluation))
    
    print('Successfully!')
    

In [None]:
result = validate(data_path)

## CREATE COMPONENTS

In [None]:
# Create components.
inj_op = comp.func_to_container_op(data_injestion , base_image = "tensorflow/tensorflow:latest-gpu-py3")
transformation_op = comp.func_to_container_op(data_transformation, base_image = "tensorflow/tensorflow:latest-gpu-py3")
train_op = comp.func_to_container_op(train, base_image = "tensorflow/tensorflow:latest-gpu-py3")
validate_op = comp.func_to_container_op(validate, base_image = "tensorflow/tensorflow:latest-gpu-py3")


## BUILD PIPELINES

In [None]:
#Create a client to enable communication with the Pipelines API server.
client = kfp.Client()

In [None]:
# Define the pipeline
@dsl.pipeline(
    name='Food Delivery Pipeline',
    description=
    'An ML pipeline that estimates the time of delivery of food.'
)
# Define parameters to be fed into pipeline
def food_delivery_pipeline(data_path: str):

    # Define volume to share data between components.
    vop = dsl.VolumeOp(name="create_volume",
                       resource_name="data-volume",
                       size="1Gi",
                       modes=dsl.VOLUME_MODE_RWO)

    # Create data injestion component.
    injestion_container = inj_op(data_path) \
                                    .add_pvolumes({data_path: vop.volume})

    # Create data transformation component.
    transformation_container = transformation_op(data_path) \
                                    .add_pvolumes({data_path: injestion_container.pvolume})
    # Create model training component.
    train_container = train_op(data_path) \
                                    .add_pvolumes({data_path: transformation_container.pvolume})
    
    # Create model validation component.
    validate_container = validate_op(data_path) \
                                    .add_pvolumes({data_path: train_container.pvolume})
    

    # Print the result of the prediction
    validation_result_container = dsl.ContainerOp(
        name="print_validation_result",
        image='library/bash:4.4.23', #default
        pvolumes={data_path: validate_container.pvolume},
        arguments=['cat', f'{data_path}/result.txt']) #txt file at the end

In [None]:
DATA_PATH = data_path

pipeline_func = food_delivery_pipeline

In [None]:
experiment_name = 'food_delivery_kubeflow'
run_name = pipeline_func.__name__ + ' run' 

arguments = {"data_path":DATA_PATH}

# Compile pipeline to generate compressed YAML definition of the pipeline.
kfp.compiler.Compiler().compile(pipeline_func,  
  '{}.zip'.format(experiment_name))

# Submit pipeline directly from pipeline function
run_result = client.create_run_from_pipeline_func(pipeline_func, 
                                                  experiment_name=experiment_name, 
                                                  run_name=run_name, 
                                                  arguments=arguments)