# This notebook is dedicated to doing things in sequence for proper development of ML bots

There are 3 major components involved for the development.

1. Data Version Control

* Make a github repo
* do the dvc init
* add the folder that have large files
* connect that folder to s3 bucket
* add that using dvc add
* commit the repo
* pull the repo
* do dvc pull
* train the model on cloud

2. Configure Airlfow
* Initiate an airflow instance (Doubt do we need to have a unique dag folder for every project)
* Modularise the code for use in DAGs
* Sync airflow with git - This would require helm and airflow to be setup in a kube cluster
* Share value or files between dags using xconns

3. Utilizing MLflow

* MLflow will be running parallely while experimenting this will help us to select the best running model
* Once the model is selected
* We can utilize that in the serving DAG

## Data Version Control

In [1]:
!mkdir set_up
!cd set_up
!git clone git@gitlab.gupshup.io:alok.gupta/dvc_mlflow_airflow.git set_up/

mkdir: cannot create directory ‘set_up’: File exists
fatal: destination path 'set_up' already exists and is not an empty directory.


In [2]:
import os
os.chdir('set_up/')
os.getcwd()

'/home/alokg/Desktop/trial_run/set_up'

In [3]:
!dvc pull

  0% Checkout|                                       |0/2 [00:00<?,     ?file/s]
![A
Building data objects from data                       |0.00 [00:00,      ?obj/s][A
Everything is up to date.                                                       [A
[0m

# DAG for AIRFLOW

Task will be to start the airflow using the helm chart

kubectl create namespace airflow

https://www.astronomer.io/events/recaps/official-airflow-helm-chart/ to install helm
helm repo add apache-airflow https://airflow.apache.org
helm repo update
helm search repo airflow
helm install airflow apache-airflow/airflow --namespace airflow --debug --timeout 10m0s

## data_cleaning.py

In [31]:
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import numpy as np
from scipy import stats
from sklearn.model_selection import train_test_split
def clean_data():
    data = pd.read_csv('data/winequality-white.csv',delimiter=';')
    shape_before = data.shape
    data.dropna(inplace = True)
    shape_after = data.shape
    if shape_before!=shape_after:
        print(shape_before[0]-shape_after[0], "Rows dropped from the data")
    shape_before = data.shape
    data = data[(np.abs(stats.zscore(data)) < 3).all(axis=1)]
    shape_after = data.shape
    if shape_before!=shape_after:
        print(shape_before[0]-shape_after[0], "Rows dropped from the data")
    train,test = train_test_split(data)
    ## Save the processed data to data_folder
    train.to_csv('data/wine_train.csv',index = False)
    test.to_csv('data/wine_test.csv',index = False)
    return("done")

411 Rows dropped from the data


## model_creation.py

In [48]:
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import mlflow
from sklearn.ensemble import RandomForestClassifier
from sklearn.metrics import mean_squared_error
model = RandomForestClassifier()
def create_model():
    train_data = pd.read_csv('data/wine_train.csv')
    test_data = pd.read_csv('data/wine_test.csv')
    X_train,y_train,X_test,y_test = train_data.drop(columns=['quality']),train_data['quality'],test_data.drop(columns=['quality']),test_data['quality']
    mlflow.set_tracking_uri("http://34.199.231.204:7000")
    mlflow.set_experiment("wine_testing")
    with mlflow.start_run() as run:
        n_estimators = 100
        max_depth = 6
        max_features = 3
        # Create and train model
        rf = RandomForestClassifier(n_estimators = n_estimators, max_depth = max_depth, max_features = max_features)
        rf.fit(X_train, y_train)
        # Make predictions
        predictions = rf.predict(X_test)

        # Log parameters
        mlflow.log_param("num_trees", n_estimators)
        mlflow.log_param("maxdepth", max_depth)
        mlflow.log_param("max_feat", max_features)

        # Log model
        mlflow.sklearn.log_model(rf, "random-forest-model")

        # Create metrics
        mse = mean_squared_error(y_test, predictions)

        # Log metrics
        mlflow.log_metric("mse", mse)
        return('model trained')

## Inference.py

In [54]:
import numpy as np
from sklearn.ensemble import RandomForestClassifier
import pandas as pd
import mlflow
import mlflow.sklearn
test_data = pd.read_csv('data/wine_test.csv')
X_test,y_test = test_data.drop(columns=['quality']),test_data['quality']
def infer_model():
    mlflow.set_tracking_uri("http://34.199.231.204:7000")
    mlflow.set_experiment("wine_testing_infer")
    with mlflow.start_run():
        #lr = LogisticRegression()
        #lr.fit(X, y)
        model = mlflow.sklearn.load_model("runs:/7f08afa0ddb541069d43db4342ad7734/random-forest-model")
        score = model.score(X_test,y_test)    
        print("Score: %s" % score)
        mlflow.log_metric("score", score)
        #mlflow.sklearn.log_model(lr, "models_dir",registered_model_name="logistics1")
        #print("Model saved in run %s" % mlflow.active_run().info.run_uuid)
    return('Model Infered')

2022/09/15 16:35:38 INFO mlflow.tracking.fluent: Experiment with name 'wine_testing_infer' does not exist. Creating a new experiment.


Score: 0.56951871657754


## Making the data loading DAG

In [55]:
from airflow import DAG
from airflow.operators.python import PythonOperator, BranchPythonOperator
from airflow.operators.bash import BashOperator
from datetime import datetime

In [None]:
import datetime as dt
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from airflow.operators.python_operator import PythonOperator
from data_cleaning import clean_data
from model_creation import create_model
from inference import infer_model
# Say this is Task1
def greet():
    '''
    A python function to write a text file
    '''
    print('Writing in file')
    with open('/Development/airflow_tutorial/greet.txt', 'a+', encoding='utf8') as f:
        now = dt.datetime.now()
        t = now.strftime("%Y-%m-%d %H:%M")
        f.write(str(t) + '\n')
    return 'Greeted'
# Say this is task2
def respond():
    '''
    A python function to return a simple greetingthon function to return a simple greeting
    '''
    return 'Greet Responded Again'
# Declaring DAG default settings
default_args = {
    'owner': 'airflow',
    'start_date': dt.datetime(2018, 9, 24, 10, 00, 00),
    'concurrency': 1,
    'retries': 0
}
# Building the DAG, 'my_simple_dag' is the dag id, which will be
# visible in the airflow ui
with DAG('my_simple_dag',
         catchup=False,
         default_args=default_args,
         schedule_interval='*/10 * * * *',
         # schedule_interval=None,
         ) as dag:
    opr_hello = BashOperator(task_id='say_Hi',
                             bash_command='echo "Hi!!"')
    opr_greet = PythonOperator(task_id='greet',
                               python_callable=greet)
    opr_clean = PythonOperator(task_id='clean_data',
                               python_callable=clean_data)
    opr_model = PythonOperator(task_id='create_model',
                               python_callable=create_model)
    opr_infer = PythonOperator(task_id='infer_model',
                               python_callable=infer_model)
    opr_sleep = BashOperator(task_id='sleep_me',
                             bash_command='sleep 5')
    opr_respond = PythonOperator(task_id='respond',
                                 python_callable=respond)
    # Setting the task flow dependencies
    opr_hello >> opr_greet >> opr_clean >> opr_model >> opr_infer >> opr_sleep >> opr_respond

In [59]:
## we have created am dag file next step is to initiate the airflow instance and see the UI up and running
1. The first trial would be to run it anyhow so I would start it by doing airflow init db

In [58]:
clean_data()

411 Rows dropped from the data


'done'

In [None]:
from dags.model_creation import create_model

In [5]:
import boto3


access_key = 'AKIA4YERRADHHPFC2VGA'
secret_access_key='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'
sesssion = boto3.Session(access_key, secret_access_key)
s3_client = boto3.client('s3')


In [6]:
s3_client.download_file("mlflowartifactfiles","clf_nofeast/deploy.txt","local_file.txt")

In [2]:
import pandas as pd

In [22]:
# import pandas as pd
# from cloudpathlib import CloudPath

df = pd.read_csv("s3://mlflowartifactfiles/tag_data/winequality-white_new_2022-09-21-03-35-43.csv")


In [24]:
data = pd.read_csv('/home/rahulm/airflow/dags/test1/data/winequality-white.csv',delimiter=';')

In [25]:
data.head()

Unnamed: 0,fixed acidity,volatile acidity,citric acid,residual sugar,chlorides,free sulfur dioxide,total sulfur dioxide,density,pH,sulphates,alcohol,quality
0,7.0,0.27,0.36,20.7,0.045,45.0,170.0,1.001,3.0,0.45,8.8,6
1,6.3,0.3,0.34,1.6,0.049,14.0,132.0,0.994,3.3,0.49,9.5,6
2,8.1,0.28,0.4,6.9,0.05,30.0,97.0,0.9951,3.26,0.44,10.1,6
3,7.2,0.23,0.32,8.5,0.058,47.0,186.0,0.9956,3.19,0.4,9.9,6
4,7.2,0.23,0.32,8.5,0.058,47.0,186.0,0.9956,3.19,0.4,9.9,6


In [27]:
data.to_csv("/home/rahulm/airflow/dags/test2_api/data/winequality-white_new.csv",index=False)

In [12]:
df.columns

Index(['fixed acidity;"volatile acidity";"citric acid";"residual sugar";"chlorides";"free sulfur dioxide";"total sulfur dioxide";"density";"pH";"sulphates";"alcohol";"quality"'], dtype='object')