### Import the data

In [1]:
from azure.storage.blob import BlobSasPermissions, generate_blob_sas, BlobServiceClient
from datetime import datetime, timedelta
from dotenv import load_dotenv
import os
import pandas as pd


load_dotenv()

storage_account_key = os.getenv('STORAGE_ACCOUNT_KEY')
storage_account_name = os.getenv('STORAGE_ACCOUNT_NAME')
connection_string = os.getenv('CONNECTION_STRING')
container_name = os.getenv('CONTAINER_NAME')
blob_service_client = BlobServiceClient.from_connection_string(connection_string)
container_client = blob_service_client.get_container_client(container_name)
#get a list of all blob files in the container
blob_list = []
for blob in container_client.list_blobs():
    blob_list.append(blob.name)

for blob in blob_list:
    sas = generate_blob_sas(
    account_name=storage_account_name,
    container_name= container_name,
    blob_name=blob,
    account_key=storage_account_key,
    permission=BlobSasPermissions(read=True),
    expiry=datetime.utcnow() + timedelta(hours=1)
)
sas_url = 'https://'+storage_account_name+'.blob.core.windows.net/'+container_name+'/'+blob+'?'+sas
df = pd.read_csv(sas_url)
df.head()

Unnamed: 0,video_id,trending_date,title,channel_title,category_id,publish_time,tags,views,likes,dislikes,comment_count,thumbnail_link,comments_disabled,ratings_disabled,video_error_or_removed,description
0,kzwfHumJyYc,17.14.11,Sharry Mann: Cute Munda ( Song Teaser) | Parmi...,Lokdhun Punjabi,1,2017-11-12T12:20:39.000Z,"sharry mann|""sharry mann new song""|""sharry man...",1096327,33966,798,882,https://i.ytimg.com/vi/kzwfHumJyYc/default.jpg,False,False,False,Presenting Sharry Mann latest Punjabi Song Cu...
1,zUZ1z7FwLc8,17.14.11,"पीरियड्स के समय, पेट पर पति करता ऐसा, देखकर दं...",HJ NEWS,25,2017-11-13T05:43:56.000Z,"पीरियड्स के समय|""पेट पर पति करता ऐसा""|""देखकर द...",590101,735,904,0,https://i.ytimg.com/vi/zUZ1z7FwLc8/default.jpg,True,False,False,"पीरियड्स के समय, पेट पर पति करता ऐसा, देखकर दं..."
2,10L1hZ9qa58,17.14.11,Stylish Star Allu Arjun @ ChaySam Wedding Rece...,TFPC,24,2017-11-12T15:48:08.000Z,Stylish Star Allu Arjun @ ChaySam Wedding Rece...,473988,2011,243,149,https://i.ytimg.com/vi/10L1hZ9qa58/default.jpg,False,False,False,Watch Stylish Star Allu Arjun @ ChaySam Weddin...
3,N1vE8iiEg64,17.14.11,Eruma Saani | Tamil vs English,Eruma Saani,23,2017-11-12T07:08:48.000Z,"Eruma Saani|""Tamil Comedy Videos""|""Films""|""Mov...",1242680,70353,1624,2684,https://i.ytimg.com/vi/N1vE8iiEg64/default.jpg,False,False,False,This video showcases the difference between pe...
4,kJzGH0PVQHQ,17.14.11,why Samantha became EMOTIONAL @ Samantha naga ...,Filmylooks,24,2017-11-13T01:14:16.000Z,"Filmylooks|""latest news""|""telugu movies""|""telu...",464015,492,293,66,https://i.ytimg.com/vi/kJzGH0PVQHQ/default.jpg,False,False,False,why Samantha became EMOTIONAL @ Samantha naga ...


## Data Cleaning

In [2]:



df['trending_date'] = pd.to_datetime(df['trending_date'], format='%y.%d.%m')
# 2. Convert the 'publish_time' column to a datetime format
df['publish_time'] = pd.to_datetime(df['publish_time'])
# Calculate time since publication
        

# Convert 'trending_date' and 'publish_time' to tz-naive datetime objects
df['trending_date'] = df['trending_date'].dt.tz_localize(None)
df['publish_time'] = df['publish_time'].dt.tz_localize(None)

# Calculate time since publication
df['time_since_publish'] = (df['trending_date'] - df['publish_time']).dt.days 
# Convert 'tags' column to strings
df['tags'] = df['tags'].astype(str)
# Tokenize and preprocess the tags
df['tag_count'] = df['tags'].apply(lambda x: len(x.split('|')))
df['tags'] = df['tags'].str.lower().str.split('|')
        

# Engagement metrics
df['like_dislike_ratio'] = df['likes'] / (df['dislikes'] + 1)
df['comment_view_ratio'] = df['comment_count'] / (df['views'] + 1)
df = df.fillna(0)
threshold = 100000
  # Ensure 'title' column contains strings
df['title'] = df['title'].astype(str)
# count words
df['title_words_count'] = df['title'].apply(lambda x: len(x.split()))
df['description'] = df['description'].astype(str)
df['description_words_count'] = df['description'].apply(lambda x: len(x.split()))
df['is_viral'] = ((df['views'] > threshold) & (df['time_since_publish'] < 10)).astype(int)
df = df.drop(['title','video_id', 'views','trending_date','thumbnail_link','category_id','channel_title','tags','description'],axis=1)
df.set_index('publish_time', inplace=True)


In [20]:

df.head()

Unnamed: 0_level_0,likes,dislikes,comment_count,comments_disabled,ratings_disabled,video_error_or_removed,time_since_publish,tag_count,like_dislike_ratio,comment_view_ratio,title_words_count,description_words_count,is_viral
publish_time,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1,Unnamed: 8_level_1,Unnamed: 9_level_1,Unnamed: 10_level_1,Unnamed: 11_level_1,Unnamed: 12_level_1,Unnamed: 13_level_1
2017-11-12 12:20:39,33966,798,882,False,False,False,1,15,42.510638,0.000805,15,117,1
2017-11-13 05:43:56,735,904,0,True,False,False,0,19,0.812155,0.0,12,371,1
2017-11-12 15:48:08,2011,243,149,False,False,False,1,14,8.241803,0.000314,10,47,1
2017-11-12 07:08:48,70353,1624,2684,False,False,False,1,20,43.294154,0.00216,6,29,1
2017-11-13 01:14:16,492,293,66,False,False,False,0,11,1.673469,0.000142,12,85,1


### Data Cleaning




In [3]:
from sklearn.model_selection import train_test_split
from sklearn.ensemble import RandomForestClassifier

reference, current = train_test_split(df,test_size=0.2,random_state=42)

target = 'is_viral'
## define the features using the list of column names
prediction = 'prediction'
numerical_features = ['likes','dislikes','comment_count','time_since_publish', 'tag_count', 'like_dislike_ratio','comment_view_ratio','title_words_count','description_words_count']
categorical_features = ['comments_disabled','ratings_disabled','video_error_or_removed']
## create a random forest model
rf = RandomForestClassifier(n_estimators=100, random_state=42, max_depth=5)
rf.fit(reference[numerical_features+categorical_features],reference[target])



  if not hasattr(array, "sparse") and array.dtypes.apply(is_sparse).any():
  if is_sparse(pd_dtype):
  if is_sparse(pd_dtype) or not is_extension_array_dtype(pd_dtype):
  if is_sparse(pd_dtype):
  if is_sparse(pd_dtype) or not is_extension_array_dtype(pd_dtype):


In [4]:
ref_prediction = rf.predict(reference[numerical_features+categorical_features])
current_prediction = rf.predict(current[numerical_features+categorical_features])

reference['prediction'] = ref_prediction
current['prediction'] = current_prediction


  if not hasattr(array, "sparse") and array.dtypes.apply(is_sparse).any():
  if is_sparse(pd_dtype):
  if is_sparse(pd_dtype) or not is_extension_array_dtype(pd_dtype):
  if not hasattr(array, "sparse") and array.dtypes.apply(is_sparse).any():
  if is_sparse(pd_dtype):
  if is_sparse(pd_dtype) or not is_extension_array_dtype(pd_dtype):


In [6]:
import pandas as pd
import numpy as np
import requests
import zipfile


from evidently import ColumnMapping
from evidently.report import Report
from evidently.metric_preset import DataDriftPreset, TargetDriftPreset, ClassificationPreset
column_mapping = ColumnMapping()

column_mapping.target = target
column_mapping.prediction = prediction
column_mapping.numerical_features = numerical_features
column_mapping.categorical_features = categorical_features
regression_perfomance = Report(metrics=[ClassificationPreset()])
regression_perfomance.run(current_data=reference, reference_data=None, column_mapping=column_mapping)
regression_perfomance.save_html('reports/classification_perfomance.html')


In [6]:
target_drift = Report(metrics=[TargetDriftPreset()])
target_drift.run(
                 reference_data=reference,
                 current_data=current,
                 column_mapping=column_mapping)

target_drift.show()

In [7]:
df.index

DatetimeIndex(['2017-11-12 12:20:39', '2017-11-13 05:43:56',
               '2017-11-12 15:48:08', '2017-11-12 07:08:48',
               '2017-11-13 01:14:16', '2017-11-10 04:29:50',
               '2017-11-11 16:41:15', '2017-11-11 06:14:19',
               '2017-11-13 04:42:26', '2017-11-12 04:30:01',
               ...
               '2018-06-12 10:27:38', '2018-06-11 13:17:24',
               '2018-06-13 15:33:32', '2018-06-13 17:00:04',
               '2018-06-13 14:18:31', '2018-06-13 08:01:11',
               '2018-06-13 11:30:04', '2018-06-13 05:00:02',
               '2018-06-13 15:07:49', '2018-06-10 04:29:54'],
              dtype='datetime64[ns]', name='publish_time', length=37352, freq=None)

In [65]:
df.sort_index(inplace=True)

In [67]:
df.head()

Unnamed: 0,likes,dislikes,comment_count,comments_disabled,ratings_disabled,video_error_or_removed,time_since_publish,tag_count,like_dislike_ratio,comment_view_ratio,title_words_count,description_words_count,is_viral
2017-05-27 20:27:30,46,39,5,False,False,False,219,22,1.15,0.000131,11,26,0
2017-05-27 20:27:30,57,66,5,False,False,False,220,22,0.850746,8.6e-05,11,26,0
2017-11-07 06:01:50,865914,63908,103907,False,False,False,8,22,13.549171,0.002687,12,107,1
2017-11-07 06:01:50,829362,61195,101117,False,False,False,6,22,13.552552,0.002818,12,107,1
2017-11-07 06:01:50,853550,62858,102740,False,False,False,7,22,13.578803,0.002737,12,107,1


In [68]:
print(df.index.min(), df.index.max())


2017-05-27 20:27:30 2018-06-13 18:05:15


In [69]:
df.tail()

Unnamed: 0,likes,dislikes,comment_count,comments_disabled,ratings_disabled,video_error_or_removed,time_since_publish,tag_count,like_dislike_ratio,comment_view_ratio,title_words_count,description_words_count,is_viral
2018-06-13 17:00:04,3105,1403,898,False,False,False,0,15,2.211538,0.001678,4,25,1
2018-06-13 17:30:00,631,129,60,False,False,False,0,26,4.853846,0.000697,18,96,0
2018-06-13 17:33:48,2349,161,181,False,False,False,0,25,14.5,0.004075,6,18,0
2018-06-13 18:05:15,9176,1116,1646,False,False,False,0,31,8.214861,0.001439,12,42,1
2018-06-13 18:05:15,9176,1116,1646,False,False,False,0,31,8.214861,0.001439,12,42,1


In [8]:
import pandas as pd
import joblib
from evidently.pipeline.column_mapping import ColumnMapping
from evidently.report import Report
from evidently.metric_preset import DataDriftPreset
from evidently.metric_preset import TargetDriftPreset
from evidently.metric_preset import DataQualityPreset

data_quality_report = Report(metrics=[DataQualityPreset()])
data_quality_report.run(
            reference_data=reference,
            current_data=current,
            column_mapping=column_mapping
)
data_quality_report.show()        


is_categorical_dtype is deprecated and will be removed in a future version. Use isinstance(dtype, CategoricalDtype) instead


is_categorical_dtype is deprecated and will be removed in a future version. Use isinstance(dtype, CategoricalDtype) instead



In [9]:
import pandas as pd
from evidently import ColumnMapping
from evidently.report import Report
from evidently.metric_preset import ClassificationPreset, TargetDriftPreset, DataQualityPreset

class MonitoringPipeline:
    def __init__(self, reference_data, current_data, target, prediction, numerical_features, categorical_features):
        self.reference_data = reference_data
        self.current_data = current_data
        self.target = target
        self.prediction = prediction
        self.numerical_features = numerical_features
        self.categorical_features = categorical_features
        self.column_mapping = self.create_column_mapping()

        self.classification_performance_report = Report(metrics=[ClassificationPreset()])
        self.target_drift_report = Report(metrics=[TargetDriftPreset()])
        self.data_quality_report = Report(metrics=[DataQualityPreset()])

    def create_column_mapping(self):
        column_mapping = ColumnMapping()
        column_mapping.target = self.target
        column_mapping.prediction = self.prediction
        column_mapping.numerical_features = self.numerical_features
        column_mapping.categorical_features = self.categorical_features
        return column_mapping

    def run_reports(self, report:str):
        if report == 'classification_performance':
            self._run_classification_performance_report()
        elif report == 'target_drift':
            self._run_target_drift_report()
        elif report == 'data_quality':
            self._run_data_quality_report()

    def _run_classification_performance_report(self):
        regression_perfomance = Report(metrics=[ClassificationPreset()])
        regression_perfomance.run(current_data=reference, reference_data=None, column_mapping=column_mapping)
        regression_perfomance.save('reports/classification_performance.html')
        

    def _run_target_drift_report(self):
        self.target_drift_report.run(
            reference_data=self.reference_data,
            current_data=self.current_data,
            column_mapping=self.column_mapping
        )
        self.target_drift_report.save('reports/target_drift.html')

    def _run_data_quality_report(self):
        self.data_quality_report.run(
            reference_data=self.reference_data,
            current_data=self.current_data,
            column_mapping=self.column_mapping
        )
        self.data_quality_report.save('reports/data_quality.html')

# Example usage
reference_data = reference  # Replace ... with your data
current_data = current    # Replace ... with your data
target = 'is_viral'
## define the features using the list of column names
prediction = 'prediction'
numerical_features = ['likes','dislikes','comment_count','time_since_publish', 'tag_count', 'like_dislike_ratio','comment_view_ratio','title_words_count','description_words_count']
categorical_features = ['comments_disabled','ratings_disabled','video_error_or_removed']

monitoring_pipeline = MonitoringPipeline(
    reference_data=reference_data,
    current_data=current_data,
    target=target,
    prediction=prediction,
    numerical_features=numerical_features,
    categorical_features=categorical_features
)

monitoring_pipeline.run_reports("data_quality")



is_categorical_dtype is deprecated and will be removed in a future version. Use isinstance(dtype, CategoricalDtype) instead


is_categorical_dtype is deprecated and will be removed in a future version. Use isinstance(dtype, CategoricalDtype) instead



In [13]:
from zenml.client import Client 
from mlflow.tracking import MlflowClient    

MLFLOW_TRACKING_URI = 'file:/Users/rashid/Library/Application Support/zenml/local_stores/b05be5b6-bf92-4e78-8a17-a8125e4a865e/mlruns'
client = MlflowClient(tracking_uri=MLFLOW_TRACKING_URI)
client.search_experiments()

[<Experiment: artifact_location=('file:///Users/rashid/Library/Application '
  'Support/zenml/local_stores/b05be5b6-bf92-4e78-8a17-a8125e4a865e/mlruns/569436276858845089'), creation_time=1700589151502, experiment_id='569436276858845089', last_update_time=1700589151502, lifecycle_stage='active', name='continuous_deployment_pipeline', tags={}>,
 <Experiment: artifact_location=('file:///Users/rashid/Library/Application '
  'Support/zenml/local_stores/b05be5b6-bf92-4e78-8a17-a8125e4a865e/mlruns/769891711034305573'), creation_time=1700495095435, experiment_id='769891711034305573', last_update_time=1700495095435, lifecycle_stage='active', name='train_pipeline', tags={}>]

In [14]:
import mlflow
mlflow.set_tracking_uri(MLFLOW_TRACKING_URI)


In [15]:
model_name = 'Random-forest-model'
latest_version = client.get_latest_versions(name=model_name)

for version in latest_version:
    print(f'version: {version.version}, run_id: {version.current_stage}')
    

version: 7, run_id: Production
version: 4, run_id: Staging
version: 14, run_id: None


In [16]:
model_version = 7
model_name = 'Random-forest-model'
new_stage = 'Production'
client.transition_model_version_stage(
    name=model_name,
    version=model_version,
    stage=new_stage,
    archive_existing_versions=False
)

<ModelVersion: aliases=[], creation_timestamp=1700504584769, current_stage='Production', description='Run #1 of the mlflow_registry_training_pipeline.', last_updated_timestamp=1700632181129, name='Random-forest-model', run_id=None, run_link=None, source=('file:///Users/rashid/Library/Application '
 'Support/zenml/local_stores/b05be5b6-bf92-4e78-8a17-a8125e4a865e/mlruns/769891711034305573/54a5f7cdef864ec3ad449aac3d80b7f2/artifacts/model'), status='READY', status_message=None, tags={'epochs': '2',
 'lr': '0.001',
 'zenml_pipeline_name': 'train_pipeline',
 'zenml_pipeline_run_uuid': '9a0ecef2-99d9-4cee-8bdf-d5770e8572ec',
 'zenml_version': '0.47.0',
 'zenml_workspace': '400c0936-63fd-43b0-bcbd-0d5f5335cff6'}, user_id=None, version=7>

In [9]:
model_name2 = 'ada-boost-model'
latest_version = client.get_latest_versions(name=model_name2)

for version in latest_version:
    print(f'version: {version.version}, run_id: {version.current_stage}')
    

version: 8, run_id: None
version: 5, run_id: Production
version: 4, run_id: Staging


In [10]:
model_name2 = 'Random-forest-model'
latest_version = client.get_latest_versions(name=model_name2)

for version in latest_version:
    print(f'version: {version.version}, run_id: {version.current_stage}')
    

version: 8, run_id: None
version: 7, run_id: Production
version: 4, run_id: Staging


In [21]:
import mlflow
from sklearn.metrics import accuracy_score, f1_score, precision_score, recall_score

def test_model(name, stage) -> mlflow.pyfunc.PyFuncModel:
    model = mlflow.pyfunc.load_model(f"models:/{name}/{stage}")
    return model
    #y_pred = model.predict(X_test)
    #return {"accuracy_score":accuracy_score(y_test, y_pred),"precision_score":precision_score(y_test, y_pred),"recall_score":recall_score(y_test, y_pred),"f1_score":f1_score(y_test, y_pred)}
    

In [22]:
model = test_model('Random-forest-model', 'Production')
print(type(model))

<class 'mlflow.pyfunc.PyFuncModel'>


In [23]:
from sklearn.metrics import accuracy_score

rf_model = 'models:/Random-forest-model/Production'
model = mlflow.pyfunc.load_model(rf_model)
y_pred = model.predict(X_test)
accuracy_score1 = accuracy_score(y_test, y_pred)
accuracy_score1

In [24]:
from sklearn.model_selection import train_test_split

X = df.drop(['is_viral'], axis=1)
y = df['is_viral']

X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)

ada_model = 'models:/ada-boost-model/Production'
model = mlflow.pyfunc.load_model(ada_model)
y_pred = model.predict(X_test)
accuracy_score1 = accuracy_score(y_test, y_pred)
accuracy_score1

  if not hasattr(array, "sparse") and array.dtypes.apply(is_sparse).any():
  if is_sparse(pd_dtype):
  if is_sparse(pd_dtype) or not is_extension_array_dtype(pd_dtype):


0.9729621201980994

In [25]:
from sklearn.model_selection import train_test_split

X = df.drop(['is_viral'], axis=1)
y = df['is_viral']

X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)

In [26]:
model1 = test_model(name='Random-forest-model',stage='Production',X_test=X_test, y_test=y_test)
model1

In [27]:
test_model(name='ada-boost-model',stage='Production',X_test=X_test, y_test=y_test)

In [28]:
test_model(name='gradient-boost-model',stage='None',X_test=X_test, y_test=y_test)

In [29]:
import mlflow

model_name3 = 'gradient-boost-model'
model3 = mlflow.pyfunc.load_model(f"models:/{model_name3}/Production")
model3

mlflow.pyfunc.loaded_model:
  artifact_path: model
  flavor: mlflow.sklearn
  run_id: 1285a032bd2a4a1585467c29663bced5

In [30]:
from zenml.integrations.mlflow.steps import (
    mlflow_model_registry_deployer_step,
)

ada_boost = mlflow_model_registry_deployer_step(model_name="ada-boost-model")() 

gradient_boost = mlflow_model_registry_deployer_step(model_name="gradient-boost-model")()

In [8]:
from zenml.integrations.mlflow.steps.mlflow_deployer import (
    mlflow_model_registry_deployer_step,
)

model_deployer = mlflow_model_registry_deployer_step.with_options(
    parameters=dict(
        registry_model_name='Random-forest-model',
        registry_model_version=6,
        timeout=300,
    )
)

model_deployer

<zenml.integrations.mlflow.steps.mlflow_deployer.mlflow_model_registry_deployer_step at 0x14c0ee2c0>

In [2]:
from zenml.integrations.mlflow.steps.mlflow_deployer import (
    mlflow_model_registry_deployer_step,
)

model_deployer = mlflow_model_registry_deployer_step.with_options(
    parameters=dict(
        registry_model_name='ada-boost-model',
        registry_model_version=6,
        timeout=300,
    )
)

type(model_deployer)

zenml.integrations.mlflow.steps.mlflow_deployer.mlflow_model_registry_deployer_step

In [3]:
from zenml.integrations.mlflow.steps.mlflow_deployer import (
    mlflow_model_registry_deployer_step,
)

model_deployer = mlflow_model_registry_deployer_step.with_options(
    parameters=dict(
        registry_model_name='gradient-boost-model',
        registry_model_version=7,
        timeout=300,
    )
)

model_deployer



<zenml.integrations.mlflow.steps.mlflow_deployer.mlflow_model_registry_deployer_step at 0x131c88130>

In [14]:

def prediction_service_loader(
    pipeline_name: str,
    pipeline_step_name: str,
    model_name: str,
    running: bool = True,
) -> MLFlowDeploymentService:
    """Get the prediction service started by the deployment pipeline.

    Args:
        pipeline_name: name of the pipeline that deployed the MLflow prediction
            server
        step_name: the name of the step that deployed the MLflow prediction
            server
        model_name: the name of the model that is deployed
        running: when this flag is set, the step only returns a running service
    """
    # get the MLflow model deployer stack component
    model_deployer = MLFlowModelDeployer.get_active_model_deployer()

    # fetch existing services with the same pipeline name, step name, and model name
    existing_services = model_deployer.find_model_server(
        pipeline_name=pipeline_name,
        pipeline_step_name=pipeline_step_name,
        model_name=model_name,
        running=running,
    )

    if not existing_services:
        raise RuntimeError(
            f"No MLflow prediction service deployed by the "
            f"{pipeline_step_name} step in the {pipeline_name} "
            f"pipeline for the '{model_name}' model is currently "
            f"running."
        )
    print(existing_services)
    print(type(existing_services))
    return existing_services[0]






In [15]:
from zenml.integrations.mlflow.model_deployers.mlflow_model_deployer import (
    MLFlowModelDeployer,
)
from zenml.integrations.mlflow.services import MLFlowDeploymentService


from zenml import step


prediction = prediction_service_loader(
    pipeline_name="continuous_deployment_pipeline",
    pipeline_step_name="mlflow_model_deployer_step",
    model_name='Random-forest-model'
    
)
prediction



[1;35mReloading configuration file /Users/rashid/youtube-project/youtube-virality-prediction/.zen/config.yaml[0m


RuntimeError: No MLflow prediction service deployed by the mlflow_model_deployer_step step in the continuous_deployment_pipeline pipeline for the 'Random-forest-model' model is currently running.

In [None]:
def inference_pipeline(pipeline_name: str, pipeline_step_name: str, model_name: str):
    batch_data = load_test_data_model()
    model_deployment_service = prediction_service_loader(
        pipeline_name=pipeline_name,
        pipeline_step_name=pipeline_step_name,
        model_name=model_name,
        running=False,
    )
    prediction = predictor(service=model_deployment_service, data=batch_data)
    return prediction

In [17]:
from steps.model_deployer import model_deployer
from steps.prediction_service_loader import prediction_service_loader
from steps.predictor import predictor
from steps.training_data_loader import training_data_loader

from zenml import pipeline
from zenml.config import DockerSettings
from zenml.integrations.constants import MLFLOW, SKLEARN

docker_settings = DockerSettings(
    required_integrations=[MLFLOW, SKLEARN],
    requirements=[
        "numpy==1.24.3",
        "scipy==1.10.1",
        "typing-extensions==4.6.3",
    ],
)


@pipeline(enable_cache=True, settings={"docker": docker_settings})
def deploy_and_predict() -> None:
    """Deploy the best model and run some predictions."""
    prediction_service_loader.after(model_deployer)

    model_deployer()
    _, inference_data, _, _ = training_data_loader()
    model_deployment_service = prediction_service_loader()
    predictor(service=model_deployment_service, data=inference_data)

ImportError: cannot import name 'model_deployer' from 'steps.model_deployer' (/Users/rashid/youtube-project/youtube-virality-prediction/steps/model_deployer.py)

In [None]:
from steps.register_model import model_name
from zenml.integrations.mlflow.steps.mlflow_deployer import (
    mlflow_model_registry_deployer_step,
)

model_deployer = mlflow_model_registry_deployer_step.with_options(
    parameters=dict(
        registry_model_name=model_name,
        registry_model_version=1,
        timeout=300,
    )
)

In [None]:
from zenml import step
from zenml.client import Client
from zenml.services import BaseService


@step(enable_cache=False)
def prediction_service_loader() -> BaseService:
    """Load the model service of our train_and_register_model_pipeline."""
    client = Client()
    model_deployer = client.active_stack.model_deployer
    services = model_deployer.find_model_server(
        pipeline_name="train_and_register_model_pipeline",
    )
    return services[0]

In [16]:
from zenml import step
from zenml.client import Client
from zenml.services import BaseService


@step(enable_cache=False)
def prediction_service_loader() -> BaseService:
    """Load the model service of our train_and_register_model_pipeline."""
    client = Client()
    model_deployer = client.active_stack.model_deployer
    services = model_deployer.find_model_server(
        pipeline_name="train_and_register_model_pipeline",
    )
    return services[0]

In [None]:
from zenml import pipeline
import pandas as pd
from steps.ingest_data import ingest_df
from steps.data_cleaning import cleaning_data
from steps.splitter import sklearn_split_data, drift_splitting
from steps.evaluate import evaluate_model
from steps.model import train_and_save_model,test_model
import joblib
from typing_extensions import Tuple,Annotated
from sklearn.base import ClassifierMixin
from zenml.config import DockerSettings
from zenml.integrations.constants import MLFLOW, SKLEARN
from zenml.integrations.mlflow.steps.mlflow_registry import (
    mlflow_register_model_step,   
)
from zenml.model_registries.base_model_registry import (
    ModelRegistryModelMetadata,
)
from steps.monitoring import model_monitoring
import mlflow



docker_settings = DockerSettings(required_integrations = {MLFLOW, SKLEARN})

@pipeline(enable_cache = False, settings = {"docker":docker_settings})
def train_pipeline(  epochs: int = 2, lr: float = 0.001, num_run:int = 1):
    raw_data = ingest_df()
    df = cleaning_data(df=raw_data)
    X_train, X_test, y_train, y_test = sklearn_split_data(df)
    #random_forest, ada_boost, gradient_boost = train_models()
    
    
    random_forest = train_and_save_model("random_forest",X_train=X_train, y_train=y_train)
    predict_random_forest = evaluate_model(model=random_forest, X_test=X_test,y_test=y_test, model_name="Random Forest")
    mlflow_register_model_step.with_options(
        parameters=dict(
            name="Random-forest-model",
            metadata=ModelRegistryModelMetadata(
                lr=lr, epochs=epochs, optimizer=None
            ),
            description=f"Run #{num_run} of the mlflow_registry_training_pipeline.")
        )(random_forest)
    
    
    ada_boost = train_and_save_model("ada_boost",X_train=X_train, y_train=y_train)
    # ada_boost = test_model("ada-boost-model",stage="Production")
    predict_ada_boost = evaluate_model(model=ada_boost,X_test=X_test, y_test=y_test, model_name='ada boost')
    mlflow_register_model_step.with_options(
        parameters=dict(
            name="ada-boost-model",
            metadata=ModelRegistryModelMetadata(
                lr=lr, epochs=epochs, optimizer=None
            ),
            description=f"Run #{num_run} of the mlflow_registry_training_pipeline.")
        )(ada_boost)
    
    
    # gradient_boost = test_model(model_name="gradient-boost-model",stage="Production")
    gradient_boost = train_and_save_model("gradient_boost",X_train=X_train, y_train=y_train)
    predict_gradient_boost = evaluate_model(model=gradient_boost,X_test=X_test, y_test=y_test,model_name="gradient boost")  
    mlflow_register_model_step.with_options(
        parameters=dict(
            name="gradient-boost-model",
            metadata=ModelRegistryModelMetadata(
                lr=lr, epochs=epochs, optimizer=None
            ),
            description=f"Run #{num_run} of the mlflow_registry_training_pipeline.")
        )(gradient_boost)  
    
    reference_data, current_data = drift_splitting(df)
    
    monitoring_first_model = model_monitoring(reference_data, current_data, model=random_forest)
    monitoring_second_model = model_monitoring(reference_data, current_data, model=ada_boost)
    monitoring_third_model = model_monitoring(reference_data, current_data, model=gradient_boost)
    
  
    
