In [1]:
# access data from snowflake
import pandas as pd
from snowflake.snowpark.session import Session
from snowflake.snowpark.functions import *
from snowflake.snowpark.types import *

connection_parameters = {
    "account": "", 
    "user": "", 
    "password": "",
    "role": "ACCOUNTADMIN",
    "warehouse": "HOL_WH",
    "database": "HOL_DB",
    "schema": "PUBLIC"
    }
session = Session.builder.configs(connection_parameters).create()

maintenance_df = session.table('maintenance')
humidity_df = session.table('humidity')
hum_udi_df = session.table('city_udf')

In [5]:
# join together the dataframes and prepare training dataset
maintenance_city = maintenance_df.join(hum_udi_df, ["UDI"])
maintenance_hum = maintenance_city.join(humidity_df, (maintenance_city.col("CITY") == humidity_df.col("CITY_NAME"))).select(col("TYPE"), 
col("AIR_TEMPERATURE_K"), col("PROCESS_TEMPERATURE"), col("ROTATIONAL_SPEED_RPM"), col("TORQUE_NM"), col("TOOL_WEAR_MIN"), col("HUMIDITY_RELATIVE_AVG"), col("MACHINE_FAILURE"))

In [6]:
# write training set to snowflake and materialize the data frame into a pandas data frame
maintenance_hum.write.mode("overwrite").save_as_table("MAINTENANCE_HUM")
maintenance_hum_df = session.table('MAINTENANCE_HUM').to_pandas()

In [7]:
# drop redundant column
maintenance_hum_df = maintenance_hum_df.drop(columns=["TYPE"])
maintenance_hum_df.to_csv('maintenance_hum_df.csv', index = False)

In [None]:
# create bucket if you do not already have one
from google.cloud import storage
client = storage.Client()
bucket = client.bucket('<UNIQUE BUCKET NAME>')
bucket.location = '<GCP REGION>'
bucket.create()

In [8]:
# create functions to train model and register to vertexai
from google.cloud import aiplatform
from sklearn.model_selection import train_test_split
from sklearn.linear_model import LogisticRegression
from sklearn.metrics import accuracy_score
import joblib
import time

def train_and_save_model(dataframe, target_column, model_filename):
    # Split the data into features and target
    X = dataframe.drop(target_column, axis=1)
    y = dataframe[target_column]

    # Split the data into training and test sets
    X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)

    # Create and train the logistic regression model
    model = LogisticRegression()
    model.fit(X_train, y_train)

    # Predict on the test set
    y_pred = model.predict(X_test)

    # Calculate the accuracy
    accuracy = accuracy_score(y_test, y_pred)
    print(f'Accuracy: {accuracy}')

    # Save the model to a file locally
    joblib.dump(model, model_filename)
    return model_filename

def upload_model_to_bucket(bucket_name, source_file_name, destination_blob_name):
    """Uploads a file to the bucket."""
    storage_client = storage.Client()
    bucket = storage_client.bucket(bucket_name)
    blob = bucket.blob(destination_blob_name)

    blob.upload_from_filename(source_file_name)
    print(f"File {source_file_name} uploaded to {destination_blob_name}.")
    # Add a verification check
    if blob.exists():
        print(f"Verification: The file {destination_blob_name} exists in the bucket.")
    else:
        print(f"Verification failed: The file {destination_blob_name} does not exist in the bucket.")

def register_model(project_id, location, model_display_name, model_directory_uri):
    # Initialize the Vertex AI client
    aiplatform.init(project=project_id, location=location)

    # Wait for a short period to ensure the file is visible in GCS
    print("Waiting for the model files to become consistent in GCS...")
    time.sleep(30)  # Wait for 30 seconds

    # Register the model
    model = aiplatform.Model.upload(
        display_name=model_display_name,
        artifact_uri=model_directory_uri,  # This should point to the directory
        serving_container_image_uri="us-docker.pkg.dev/vertex-ai/prediction/sklearn-cpu.0-23:latest"
    )

    print(f"Model {model.display_name} has been registered in Vertex AI.")
    return model


In [None]:
# usage
if __name__ == "__main__":
    df = pd.read_csv('maintenance_hum_df.csv')  # Modify with your data source
    target_column = 'MACHINE_FAILURE'  # Modify with your target column
    model_filename = 'model.joblib'

    # Train the model and save it locally
    saved_model_filename = train_and_save_model(df, target_column, model_filename)

    # Specify your GCP project details
    project_id = '<GCP PROJECT ID>'  # Modify with your GCP project ID
    location = '<GCP DEPLOYMENT REGION>'  # Modify with your region, e.g., 'us-central1'
    bucket_name = '<BUCKET NAME>'  # Your Google Cloud Storage bucket name
    model_display_name = 'failure_update'  # Modify with your desired model name

    # Define the Google Cloud Storage path for the model DIRECTORY
    model_directory_uri = f"gs://{bucket_name}/"  # Ensure this ends with a slash

    # Upload the model to Google Cloud Storage
    upload_model_to_bucket(bucket_name, saved_model_filename, model_filename)

    # Register the model in Vertex AI
    registered_model = register_model(project_id, location, model_display_name, model_directory_uri)


In [32]:
def download_model_from_gcs(bucket_name, source_blob_name, destination_file_name):
    """Downloads a blob from the bucket."""
    storage_client = storage.Client()
    bucket = storage_client.bucket(bucket_name)
    blob = bucket.blob(source_blob_name.strip('/'))  # Ensure no leading/trailing slash

    blob.download_to_filename(destination_file_name)
    print(f"Downloaded storage object {bucket_name}/{source_blob_name} to local file {destination_file_name}.")

def load_model_from_vertex_ai(project_id, location, model_id):
    # Initialize the Vertex AI client
    aiplatform.init(project=project_id, location=location)

    # Retrieve the model details
    model = aiplatform.Model(model_name=f"projects/{project_id}/locations/{location}/models/{model_id}")
    model_artifacts_gcs_uri = model.gca_resource.artifact_uri
    print(f"Model artifacts located at: {model_artifacts_gcs_uri}")

    # Parse the GCS URI to get the bucket and path
    bucket_name = model_artifacts_gcs_uri.split("/")[2]
    model_path_in_bucket = "/".join(model_artifacts_gcs_uri.split("/")[3:]).strip('/')  # Avoid leading slash

    # Download the model artifact (.joblib file)
    destination_file_name = "downloaded_model.joblib"  # Local file to which the model will be downloaded
    download_model_from_gcs(bucket_name, model_path_in_bucket + 'model.joblib', destination_file_name)

    # Load the model into memory
    loaded_model = joblib.load(destination_file_name)
    print("Model loaded into memory successfully.")

    return loaded_model


In [None]:
# Example usage
if __name__ == "__main__":
    project_id = '<GCP PROJECT ID>'  # Replace with your Google Cloud project ID
    location = '<GCP REGION>'  # Replace with your model's region, e.g., 'us-central1'
    model_id = '<REGISTERED MODEL ID>'  # Replace with your model ID

    model = load_model_from_vertex_ai(project_id, location, model_id)

In [None]:
model

In [None]:
# connect to Snowpark registry and log model
from snowflake.ml.registry import registry
test_data = maintenance_hum_df.drop('MACHINE_FAILURE', axis=1)
reg = registry.Registry(session=session)
reg.log_model(model, model_name='vertex_model', version_name='v1', sample_input_data=test_data)

# verify model deployment and view functions associated with the model
mv = reg.get_model('vertex_model').version('v1')
mv.show_functions()