In [1]:
from multiprocessing import Process
from threading import Thread

In [2]:
Thread?

[1;31mInit signature:[0m
[0mThread[0m[1;33m([0m[1;33m
[0m    [0mgroup[0m[1;33m=[0m[1;32mNone[0m[1;33m,[0m[1;33m
[0m    [0mtarget[0m[1;33m=[0m[1;32mNone[0m[1;33m,[0m[1;33m
[0m    [0mname[0m[1;33m=[0m[1;32mNone[0m[1;33m,[0m[1;33m
[0m    [0margs[0m[1;33m=[0m[1;33m([0m[1;33m)[0m[1;33m,[0m[1;33m
[0m    [0mkwargs[0m[1;33m=[0m[1;32mNone[0m[1;33m,[0m[1;33m
[0m    [1;33m*[0m[1;33m,[0m[1;33m
[0m    [0mdaemon[0m[1;33m=[0m[1;32mNone[0m[1;33m,[0m[1;33m
[0m[1;33m)[0m[1;33m[0m[0m
[1;31mDocstring:[0m     
A class that represents a thread of control.

This class can be safely subclassed in a limited fashion. There are two ways
to specify the activity: by passing a callable object to the constructor, or
by overriding the run() method in a subclass.
[1;31mInit docstring:[0m
This constructor should always be called with keyword arguments. Arguments are:

*group* should be None; reserved for future extension when a ThreadG

In [3]:
import time

In [4]:
class demo_thread(Thread):
    def __init__(self, *args, **kwargs):
        super().__init__(daemon=False, name = "demo_thread")
        
    def print_msg(self):
        time.sleep(5)
        print("Called by Thread")
        
    def run(self):
        self.print_msg()
        

In [5]:
print("Creating object of demo thread")
d = demo_thread()
print("Starting demo thread")
d.start() ## calls the run method of custom thread class
print("Thread created successfully") 
d.join() ## waits for the thread to finish(waits for other thread to finish itself)
print("Thread completed successfully")

Creating object of demo thread
Starting demo thread
Thread created successfully
Called by Thread
Thread completed successfully


# Doing this so that running of training pipeline doesn't block the execution of code

In [6]:
import os

In [7]:
os.chdir("..")

In [8]:
# Initiazing named tuples as None

In [9]:
from collections import namedtuple

In [10]:
Experiment = namedtuple("Experiment",[
    "experiment_id","initialization_timestamp", "artifact_timestamp",
    "running_status", "start_time","stop_time","execution_time", "message", 
    "experiment_file_path", "custom_threshold","model_f1","model_recall",
    "model_precision","is_model_accepted"
])

In [11]:
exp = Experiment(*([None]*14)) ## passing as *args

In [12]:
print(exp.custom_threshold)

None


In [13]:
import uuid


uuid

<module 'uuid' from 'c:\\Users\\rachi\\Desktop\\inueuron\\PERSONAL PROJECTS\\Phishing-Domain-Detection-using-Machine-Learning\\venv\\lib\\uuid.py'>

# Generating ids for experiments

In [14]:
str(uuid.uuid4()) ## Generates random ids

'b8ad457a-e775-4f16-9180-9f055880f996'

In [15]:
Experiment = namedtuple("Experiment",[
    "experiment_id","initialization_timestamp", "artifact_timestamp",
    "running_status", "start_time","stop_time","execution_time", "message", 
    "experiment_file_path", "custom_threshold","model_f1","model_recall",
    "model_precision","is_model_accepted"
])

# Creating dataframes from named tuple

In [16]:
temp = namedtuple("namedtuple",["input1", "input2"])

In [17]:
temp_obj = temp(input1=1, input2=2)

In [18]:
from dataclasses import dataclass

In [19]:
temp_obj

namedtuple(input1=1, input2=2)

In [20]:
dict_obj = temp_obj._asdict()
# creates dictionary out of the named tuple

In [21]:
dict_obj = {key:[value] for key,value in dict_obj.items()}
# keys serve as column names, value is enclosed within [], to create a list of values for easy creation of dataframe


In [22]:
dict_obj

{'input1': [1], 'input2': [2]}

In [23]:
import pandas as pd

In [24]:
pd.DataFrame(dict_obj)

Unnamed: 0,input1,input2
0,1,2


# Returning last rows of dataframe in descending order

In [25]:
temp_df = pd.DataFrame(
    {
        "col1":[1,2,3,4,5],
        "col2":[1,2,3,4,5]
    }
    
)

In [26]:
rows_to_return = 3

In [27]:
temp_df.iloc[-1:(-1*rows_to_return)-1:-1]

Unnamed: 0,col1,col2
4,5,5
3,4,4
2,3,3


In [28]:
temp_df

Unnamed: 0,col1,col2
0,1,1
1,2,2
2,3,3
3,4,4
4,5,5


# Pipeline with thread class

In [29]:
from phishing_domain_detection.component.model_trainer import ModelTrainer
from phishing_domain_detection.config.configuration import Configuration
from phishing_domain_detection.logger import logging
from phishing_domain_detection.exception import Phishing_Exception

from phishing_domain_detection.entity.artifact_entity import DataIngestionArtifact, DataTransformationArtifact, DataValidationArtifact, ModelTrainerArtifact, ModelEvaluationArtifact, ModelPusherArtifact


from phishing_domain_detection.component.data_ingestion import DataIngestion
from phishing_domain_detection.component.data_validation import DataValidation
from phishing_domain_detection.component.data_transformation import DataTransformation
from phishing_domain_detection.component.model_trainer import ModelTrainer
from phishing_domain_detection.component.model_evaluation import ModelEvaluation
from phishing_domain_detection.component.model_pusher import ModelPusher
import os,sys

In [30]:
from collections import namedtuple
from datetime import datetime
import uuid
from phishing_domain_detection.constants import EXPERIMENT_DIR_NAME, EXPERIMENT_FILE_NAME, CURRENT_TIME_STAMP


In [31]:
from threading import Thread
from typing import List

In [32]:
from multiprocessing import Process

In [33]:
import pandas as pd

In [34]:
Experiment = namedtuple("Experiment",[
    "experiment_id","initialization_timestamp", "artifact_timestamp",
    "running_status", "start_time","stop_time","execution_time", "message", 
    "experiment_file_path", "custom_threshold","model_f1","model_recall",
    "model_precision","is_model_accepted"
])

In [35]:
class Pipeline(Thread):
    ### declaring class level experiment attributes
    experiment = Experiment(*([None]*14)) # declaring all attributes of this class level object as None
    experiment_file_path = None # declare the experiment file path as None as of Now
    
    def __init__(self, config = Configuration()) -> None:
        try:
            
            self.config = config
            
            os.makedirs(self.config.training_pipeline_config.artifact_dir, exist_ok=True)
            ## getting path of experiment file
            Pipeline.experiment_file_path = os.path.join(self.config.training_pipeline_config.artifact_dir, EXPERIMENT_DIR_NAME, EXPERIMENT_FILE_NAME)
            
            super().__init__(daemon=False, name='Pipeline') # calling constructor of parent class
            
        except Exception as e:
            raise Phishing_Exception(e,sys) from e
        
    def start_data_ingestion(self) -> DataIngestionArtifact:
        try:
            data_ingestion = DataIngestion(self.config.get_data_ingestion_config())
            return data_ingestion.initiate_data_ingestion()
            
        except Exception as e:
            raise Phishing_Exception(e,sys) from e
        
    def start_data_validation(self, data_ingestion_artifact: DataIngestionArtifact) -> DataValidationArtifact:
        try:
            data_validation = DataValidation(self.config.get_data_validation_config(),data_ingestion_artifact, self.config.get_training_pipeline_config())

            return data_validation.initiate_data_validation()
        except Exception as e:
            raise Phishing_Exception(e,sys) from e
    
    def start_data_transformation(self,data_ingestion_artifact: DataIngestionArtifact, data_validation_artifact: DataValidationArtifact):
        try:
            data_transformation = DataTransformation(
                data_transformation_config=self.config.get_data_transformation_config(),
                data_ingestion_artifact=data_ingestion_artifact,
                data_validation_artifact=data_validation_artifact
            )
            
            return data_transformation.initialize_data_transformation()
        except Exception as e:
            raise Phishing_Exception(e,sys) from e
        
    def start_model_trainer(self, data_transformation_artifact: DataTransformationArtifact) -> ModelTrainerArtifact:
        try:
            model_trainer = ModelTrainer(data_transformation_artifact=data_transformation_artifact, model_trainer_config=self.config.get_model_trainer_config())
            
            return model_trainer.initialize_model_trainer()
        except Exception as e:
            raise Phishing_Exception(e,sys) from e
        
    def start_model_evaluation(self, data_ingestion_artifact: DataIngestionArtifact,
                               data_validation_artifact: DataValidationArtifact,
                               model_trainer_artifact : ModelTrainerArtifact) -> ModelEvaluationArtifact:
        try:
            model_eval = ModelEvaluation(
                model_evaluation_config=self.config.get_model_evaluation_config(),
                data_ingestion_artifact=data_ingestion_artifact,
                data_validation_artifact=data_validation_artifact,
                model_trainer_artifact=model_trainer_artifact
            )
            
            return model_eval.initiate_model_evaluation()
        except Exception as e:
            raise Phishing_Exception(e,sys) from e
        
    def start_model_pusher(self, model_evaluation_artifact: ModelEvaluationArtifact):
        try:
            model_pusher = ModelPusher(model_pusher_config=self.config.get_model_pusher_config(),
                        model_evaluation_artifact=model_evaluation_artifact)
            return model_pusher.initiate_model_pusher()
        except Exception as e:
            raise Phishing_Exception(e,sys) from e
        
     
    def run_pipeline(self):
        try:
            
            ## If pipeline is already running, then don't start another pipeline
            if Pipeline.experiment.running_status: ## Pipeline already running, so don't run another pipeline
                logging.info(f"Pipeline already running, skipping this pipeline run.")
                return Pipeline.experiment

            logging.info(f"Pipeline starting")
            
            ## generating unique id for this pipeline run
            experiment_id = str(uuid.uuid4())
            
            ## Updating the experiment (which is a class) attribute
            Pipeline.experiment = Experiment(
                experiment_id=experiment_id,
                artifact_timestamp=CURRENT_TIME_STAMP,
                initialization_timestamp=CURRENT_TIME_STAMP,
                running_status=True,
                start_time=datetime.now(),
                stop_time=None,
                custom_threshold=None,
                experiment_file_path=Pipeline.experiment_file_path,
                execution_time=None,
                message=f"Pipeline has been started successfully",
                is_model_accepted=None,
                model_f1=None,
                model_precision=None,
                model_recall=None
            )
            
            logging.info(f"Pipeline experiment: {Pipeline.experiment}")
            
            self.save_experiment()
            
            
            
            ## Data Ingestion
            data_ingestion_artifact = self.start_data_ingestion()
            data_validation_artifact = self.start_data_validation(data_ingestion_artifact)
            data_transformation_artifact = self.start_data_transformation(data_ingestion_artifact=data_ingestion_artifact, data_validation_artifact= data_validation_artifact)
            model_trainer_artifact = self.start_model_trainer(data_transformation_artifact=data_transformation_artifact)
            model_evaluation_artifact = self.start_model_evaluation(
                data_ingestion_artifact=data_ingestion_artifact,
                data_validation_artifact=data_validation_artifact,
                model_trainer_artifact=model_trainer_artifact
            )
            
            if model_evaluation_artifact.is_model_accepted:
                model_pusher_artifact = self.start_model_pusher(model_evaluation_artifact=model_evaluation_artifact)
                logging.info(f"Model Pusher artifact: {model_pusher_artifact}")
            
            else:
                logging.info("Trained model rejected")

            logging.info(f"Pipeline Run Completed")

            stop_time = datetime.now()
            
            Pipeline.experiment = Experiment(
                experiment_id=experiment_id,
                initialization_timestamp= CURRENT_TIME_STAMP,
                artifact_timestamp=CURRENT_TIME_STAMP,
                running_status=False,
                start_time=Pipeline.experiment.start_time,
                stop_time=stop_time,
                custom_threshold = model_trainer_artifact.custom_threshold,
                execution_time= stop_time - Pipeline.experiment.start_time,
                experiment_file_path= Pipeline.experiment.experiment_file_path,
                is_model_accepted=model_evaluation_artifact.is_model_accepted,
                message=f"Pipeline has finished executing.",
                model_f1=model_trainer_artifact.model_f1,
                model_precision=model_trainer_artifact.model_precision,
                model_recall=model_trainer_artifact.model_recall
                
            )
            
            logging.info(f"Pipeline experiment: {Pipeline.experiment}")
            self.save_experiment()
            
            
        except Exception as e:
            raise Phishing_Exception(e,sys) from e
        
    def run(self):
        try:
            self.run_pipeline()
        except Exception as e:
            raise Phishing_Exception(e,sys) from e
    
    
    def save_experiment(self):
        try:
            if Pipeline.experiment.experiment_id is None:
                print("Trigger the pipeline atleast once")
                logging.info("Trigger the pipeline first")
                
            experiment = Pipeline.experiment
            
            experiment_dict = experiment._asdict()
            
            experiment_dict = {key:[value] for key,value in experiment_dict.items()}
            # keys serve as column names, value is enclosed within [], to create a list of values for easy creation of dataframe
            
            experiment_dict.update(
                {
                    "created_time_stamp":[datetime.now()],
                    "experiment_file_path": [os.path.basename(Pipeline.experiment.experiment_file_path)]
                }
            )
            
            experiment_report = pd.DataFrame(experiment_dict)
            
            os.makedirs(os.path.dirname(Pipeline.experiment.experiment_file_path), exist_ok=True)
            
            if os.path.exists(Pipeline.experiment.experiment_file_path):
                ## Not the first experiment
                experiment_report.to_csv(Pipeline.experiment_file_path, index=False, header=False, mode="a")
            else:
                ## 1st experiment
                experiment_report.to_csv(Pipeline.experiment_file_path, mode="w", index=False, header=True) 
            
            
        except Exception as e:
            raise Phishing_Exception(e,sys) from e
        
        
    @classmethod
    def get_experiments_status(cls, rows_to_return = 5) ->pd.DataFrame:
        try:
            if os.path.exists(Pipeline.experiment.experiment_file_path):
                df = pd.read_csv(Pipeline.experiment_file_path)
                return df.iloc[-1:(-1*rows_to_return)-1:-1].drop(columns=["experiment_file_path", "initialization_timestamp"], axis=1)
            else:
                return pd.DataFrame()  # empty dataframess
        except Exception as e:
            raise Phishing_Exception(e,sys) from e

In [36]:
pipeline = Pipeline()

In [37]:
pipeline.start()

In [39]:
pipeline.get_experiments_status()

Unnamed: 0,experiment_id,artifact_timestamp,running_status,start_time,stop_time,execution_time,message,custom_threshold,model_f1,model_recall,model_precision,is_model_accepted,created_time_stamp
6,cd28b999-8eaa-416e-a9ef-2931b727f111,2022-10-02_01-04-39,True,2022-10-02 01:04:40.823664,,,Pipeline has been started successfully,,,,,,2022-10-02 01:04:40.823664
5,39ad3ddd-bef9-490b-b654-a33b230af192,2022-10-02_01-00-52,False,2022-10-02 01:00:52.574855,2022-10-02 01:02:32.149114,0 days 00:01:39.574259,Pipeline has finished executing.,0.3,0.938431,0.975619,0.903974,False,2022-10-02 01:02:32.150113
4,39ad3ddd-bef9-490b-b654-a33b230af192,2022-10-02_01-00-52,True,2022-10-02 01:00:52.574855,,,Pipeline has been started successfully,,,,,,2022-10-02 01:00:52.579760
3,25c3c86d-dfcc-48df-8125-39c1e59bd7a2,2022-10-02_00-58-01,False,2022-10-02 00:58:02.142238,2022-10-02 00:59:43.976844,0 days 00:01:41.834606,Pipeline has finished executing.,0.3,0.938696,0.976378,0.903815,True,2022-10-02 00:59:43.977845
2,25c3c86d-dfcc-48df-8125-39c1e59bd7a2,2022-10-02_00-58-01,True,2022-10-02 00:58:02.142238,,,Pipeline has been started successfully,,,,,,2022-10-02 00:58:02.142238
