# End-to-end AI Infrastructure Processing

This notebook serves to take the place of the steps of the eventual Airflow DAGs for the purpose of validating our pipeline

## Cloud Setup - Azure

### Cloud Builout - Azure

This part should be one click. Just run "infrastructure/deployment/deploy-infrastructure.sh -s \<subscription_id\>" and all of the necessary Azure resources should be built out

### Cloud Configuration - Azure

These steps are Azure specific. They will handle setting up the credentials/configurations required for the rest of this.

In [282]:
from azure.identity import DefaultAzureCredential
from azure.storage.filedatalake import DataLakeServiceClient
from azure.mgmt.containerservice import ContainerServiceClient
from azure.mgmt.storage import StorageManagementClient
from azure.mgmt.resource import SubscriptionClient
import json

# Read in configuration
azure_config = json.load(open('config.json'))
subscription_id = azure_config["subscription_id"]
resource_group_name = azure_config["main_resource_group"]
cluster_names = azure_config["cluster_names"]

# Authenticate to Azure
default_credential = DefaultAzureCredential()
subscription_client = SubscriptionClient(credential=default_credential)
datalake_service_client = DataLakeServiceClient(azure_config["account_url"], credential=default_credential)
storage_client = StorageManagementClient(credential=default_credential, subscription_id=subscription_id)

# Find the subscription ID
for subscription in subscription_client.subscriptions.list():
    if subscription.subscription_id == subscription_id:
        break
else:
    raise ValueError("Subscription not found")

# Get the AKS cluster client
container_service_client = ContainerServiceClient(default_credential, subscription_id)

# Get and save kubeconfigs to ./manual_pipeline_files/cluster_configs/<cluster_name>.yaml
for cluster_name in cluster_names:
    # Get the kubeconfig file for the cluster
    credential_results = container_service_client.managed_clusters.list_cluster_user_credentials(
        resource_group_name,
        cluster_name,
    )

    # The kubeconfig file is in base64, so we need to decode it
    kubeconfig = credential_results.kubeconfigs[0].value.decode("utf-8")

    # Save the kubeconfig to a file
    with open(f'./manual_pipeline_files/cluster_configs/{cluster_name}.yaml', 'w') as file:
        file.write(kubeconfig)

# Get the storage account key
storage_account_key = storage_client.storage_accounts.list_keys(resource_group_name, azure_config["account_name"]).keys[0].value

## Collection

We will be structuring and storing data within our SQL database. This will be the first step in our pipeline.

In [260]:
import os
import subprocess
import random
import string

# Create a spark-data filesystem with a spark-upload directory
# Create the file system if it does not exist
try:
    datalake_service_client.create_file_system(file_system="spark-data")
except Exception as e:
    if e.__class__.__name__ == "ResourceExistsError":
        print("Resource already exists")
    else:
        raise e

# Create the spark-upload directory if it does not exist
try:
    datalake_service_client.get_file_system_client(file_system="spark-data").create_directory("spark-jobs")
except Exception as e:
    if e.__class__.__name__ == "ResourceExistsError":
        print("Resource already exists")
    else:
        raise e

Resource already exists


In [261]:
# Grab spark-aks main k8s server e.x. https://dnsprefix-ww13q3en.hcp.eastus.azmk8s.io:443
spark_cluster_name = [cluster for cluster in cluster_names if "spark" in cluster][0]
main_server = container_service_client.managed_clusters.get(resource_group_name, spark_cluster_name).fqdn

image_repo = azure_config["image_repo"]

In [272]:
# Copy the spark job to manual_pipeline_files
os.makedirs("./manual_pipeline_files/spark_jobs", exist_ok=True)
subprocess.run(["cp", "-r", "../jobs/.", "./manual_pipeline_files/spark_jobs/"])

CompletedProcess(args=['cp', '-r', '../jobs/.', './manual_pipeline_files/spark_jobs/'], returncode=0)

In [283]:
# main_server = "https://127.0.0.1:54565"
account_name = azure_config["account_name"]
data_filesystem = azure_config["data_filesystem"]
storage_account_key = "your_storage_account_key"
tenant_id = azure_config["spark_spn"]["tenant_id"]
client_id = azure_config["spark_spn"]["client_id"]
sql_serverurl = azure_config["sql-db"]["server"]
sql_port = azure_config["sql-db"]["port"]
sql_database = azure_config["sql-db"]["database"]
sql_username = azure_config["sql-db"]["username"]
sql_password = azure_config["sql-db"]["password"]

In [None]:
!spark-submit \
  --master k8s://{main_server} \
  --deploy-mode cluster \
  --name spark-dist-training \
  --conf spark.kubernetes.container.image={image_repo}/pyspark-test:latest \
  --conf spark.kubernetes.authenticate.driver.serviceAccountName=spark-sa \
  --conf spark.executor.instances=1 \
  --conf spark.kubernetes.namespace=default \
  --conf "spark.driver.extraJavaOptions=-Divy.cache.dir=/tmp -Divy.home=/tmp" \
  --conf spark.kubernetes.trust.certificates=true \
  --conf spark.driver.port=7070 \
  --conf spark.driver.host=spark-driver-headless.default.svc.cluster.local \
  --conf spark.ui.port=7069 \
  --conf "vars.account.name={account_name}" \
  --conf "vars.account.datafilesystem={data_filesystem}" \
  --conf spark.kubernetes.executor.label.azure.workload.identity/use=true \
  --conf spark.kubernetes.executor.label.aadpodidbinding=vmscalesetidentity \
  --conf spark.kubernetes.driver.label.aadpodidbinding=vmscalesetidentity \
  --conf spark.kubernetes.driver.label.azure.workload.identity/use=true \
  --conf "spark.hadoop.fs.azure.account.key.{account_name}.dfs.core.windows.net={storage_account_key}" \
  --conf "spark.kubernetes.file.upload.path=abfss://spark-data@{account_name}.dfs.core.windows.net/spark-jobs" \
  --conf "spark.hadoop.fs.azure.account.oauth2.msi.tenant={tenant_id}" \
  --conf "spark.hadoop.fs.azure.account.oauth2.client.id={client_id}" \
  --conf "vars.sql.serverurl={sql_serverurl}" \
  --conf "vars.sql.port={sql_port}" \
  --conf "vars.sql.database={sql_database}" \
  --conf "vars.sql.username={sql_username}" \
  --conf "vars.sql.password={sql_password}" \
  --conf "spark.jars.packages=org.apache.hadoop:hadoop-azure:3.3.4,com.azure:azure-storage-blob:12.25.1,com.microsoft.sqlserver:mssql-jdbc:12.6.1.jre11" \
  "manual_pipeline_files/spark_jobs/data-collection/spark-sklearn-iris-collection.py"


## Processing - Spark

In [None]:
import pyspark
driver_host="spark-driver-headless.default.svc.cluster.local"
account_name = azure_config["account_name"]

In [None]:
!spark-submit \
  --master k8s://{main_server} \
  --deploy-mode cluster \
  --name spark-dist-training \
  --conf spark.kubernetes.container.image={image_repo}/pyspark-test:latest \
  --conf spark.kubernetes.authenticate.driver.serviceAccountName=spark-sa \
  --conf spark.executor.instances=1 \
  --conf spark.kubernetes.namespace=default \
  --conf spark.driver.extraJavaOptions="-Divy.cache.dir=/tmp -Divy.home=/tmp" \
  --conf spark.kubernetes.trust.certificates=true \
  --conf spark.driver.port=7070 \
  --conf spark.driver.host=spark-driver-headless.default.svc.cluster.local \
  --conf spark.ui.port=7069 \
  --conf "vars.account.name={account_name}" \
  --conf "vars.account.datafilesystem={data_filesystem}" \
  --conf spark.kubernetes.executor.label.azure.workload.identity/use=true \
  --conf spark.kubernetes.executor.label.aadpodidbinding=vmscalesetidentity \
  --conf spark.kubernetes.driver.label.aadpodidbinding=vmscalesetidentity \
  --conf spark.kubernetes.driver.label.azure.workload.identity/use=true \
  --conf "spark.hadoop.fs.azure.account.key.{account_name}.dfs.core.windows.net={storage_account_key}" \
  --conf "spark.kubernetes.file.upload.path=abfss://spark-data@{account_name}.dfs.core.windows.net/spark-jobs" \
  --conf "spark.hadoop.fs.azure.account.oauth2.msi.tenant={tenant_id}" \
  --conf "spark.hadoop.fs.azure.account.oauth2.client.id={client_id}" \
  --conf "vars.sql.serverurl={sql_serverurl}" \
  --conf "vars.sql.port={sql_port}" \
  --conf "vars.sql.database={sql_database}" \
  --conf "vars.sql.username={sql_username}" \
  --conf "vars.sql.password={sql_password}" \
  --conf spark.jars.packages="org.apache.hadoop:hadoop-azure:3.3.4,com.azure:azure-storage-blob:12.25.1" \
    "manual_pipeline_files/iris_spark_job.py"


## Training - Prep

In [None]:
# Build and push the docker image at ./manual_pipeline_files/Dockerfile.pretraining
# This docker image will be submitted to a kubeflow pytorch training operator in the next step
# The tag should be generated here, random or based on the git commit hash
import os
import subprocess
import random
import string

def random_string(string_length=10):
    letters = string.ascii_lowercase
    return ''.join(random.choice(letters) for i in range(string_length))

image_name = azure_config["iris_torch_job_name"]
tag = random_string()
full_image_name = f"{image_repo}/{image_name}:{tag}"

subprocess.run(["cp", "-r", "../models/iris", "./manual_pipeline_files/models"])
os.chdir("manual_pipeline_files")
subprocess.run(["docker", "build", "-t", f"{full_image_name}", "-f", "Dockerfile.pretraining", "."])
subprocess.run(["docker", "push", f"{full_image_name}"])
os.chdir("..")

## Training - TorchJob

In [None]:
# We need to open and replace the image tag with the one we just built
import yaml

with open("../pipeline/kubeflow/pytorchjobs/torchjob-iris.yaml", "r") as f:
    torchjob_yaml = yaml.safe_load(f)
    
torchjob_yaml["spec"]["pytorchReplicaSpecs"]["Master"]["template"]["spec"]["containers"][0]["image"] = f"{full_image_name}"
torchjob_yaml["spec"]["pytorchReplicaSpecs"]["Worker"]["template"]["spec"]["containers"][0]["image"] = f"{full_image_name}"
torchjob_yaml["spec"]["pytorchReplicaSpecs"]["Master"]["template"]["spec"]["containers"][0]["args"] = ["--learning_rate", "0.001", "--account_name", account_name]
torchjob_yaml["spec"]["pytorchReplicaSpecs"]["Worker"]["template"]["spec"]["containers"][0]["args"] = ["--learning_rate", "0.001", "--account_name", account_name]
torchjob_yaml["spec"]["pytorchReplicaSpecs"]["Master"]["template"]["metadata"]["labels"]["aadpodidbinding"] = "vmscalesetidentity"
torchjob_yaml["spec"]["pytorchReplicaSpecs"]["Worker"]["template"]["metadata"]["labels"]["aadpodidbinding"] = "vmscalesetidentity"
      
# We'll save our new yaml at ./manual_pipeline_files/torchjob-iris.yaml
with open("./manual_pipeline_files/torchjob-iris.yaml", "w") as f:
    yaml.dump(torchjob_yaml, f)

In [None]:
# We'll apply the yaml to the training kubeflow cluster. Switch to the training cluster based on the config file
training_cluster_name = [cluster for cluster in cluster_names if "train" in cluster][0]

# # Load kubeconfig from ./manual_pipeline_files/cluster_configs/<training_cluster_name>.yaml
os.environ["KUBECONFIG"] = f"./manual_pipeline_files/cluster_configs/{training_cluster_name}.yaml"

# Apply the yaml
subprocess.run(["kubectl", "apply", "-f", "./manual_pipeline_files/torchjob-iris.yaml"])

In [None]:
# Poll the status of the job until it's done. We can use the kubectl command to dump the status of the job
# Then we can parse the output to see if the job is done
import time
import json

max_time = 60 * 60 # 1 hour
start_time = time.time()    
while time.time() - start_time < max_time:
    status = subprocess.run(["kubectl", "get", "pytorchjobs", "iris-torch-job", "-o", "json", "-n", "kubeflow"], capture_output=True)
    status = json.loads(status.stdout)
    if status["status"]["conditions"][-1]["type"] == "Succeeded":
        print("Job succeeded")
        break
    elif status["status"]["conditions"][-1]["type"] == "Failed":
        print("Job failed")
        break
    time.sleep(60)

## Validation

In [None]:
import torch
import pandas as pd

class Classifier(torch.nn.Module):
    
    def __init__(self, input_size, output_size):
        super().__init__()
        self.fc1 = torch.nn.Linear(input_size, input_size * 2)
        self.fc2 = torch.nn.Linear(input_size * 2, input_size * 2)
        self.fc3 = torch.nn.Linear(input_size * 2, input_size)
        self.fc4 = torch.nn.Linear(input_size, output_size)

    def forward(self, x):
        x = torch.relu(self.fc1(x))
        x = torch.relu(self.fc2(x))
        x = torch.relu(self.fc3(x))
        x = self.fc4(x)
        return x
    

class IrisDataset(torch.utils.data.Dataset):

    def __init__(self, data: pd.DataFrame, target_column: str):
        super().__init__()
        self.data = data
        self.target_column = target_column

    def __len__(self):
        return self.data.shape[0]

    def __getitem__(self, idx):
        features = self.data.loc[idx, self.data.columns != self.target_column]
        target = self.data.loc[idx, self.data.columns == self.target_column]

        features = torch.tensor(features.values, dtype=torch.float32)
        target = torch.tensor(target.values[0], dtype=torch.long)
        return features, target

In [None]:
# The Torch model weights are saved at "https://<file_system>@<account_name>.dfs.core.windows.net/iris-models/weights/iris_model_weights.pt"
# We can download the weights to the local filesystem
# We can use the datalake_service_client to download the weights

# Get the filesystem client
filesystem_client = datalake_service_client.get_file_system_client(azure_config["model_filesystem"])

# Download the weights
file_client = filesystem_client.get_file_client("iris_models/weights/iris_model_weights.pt")
with open("./manual_pipeline_files/iris_model_weights.pt", "wb") as file:
    weights = file_client.download_file()
    file.write(weights.readall())

model = Classifier(4, 3)
model.load_state_dict(torch.load("./manual_pipeline_files/iris_model_weights.pt"))

# We can now use the model to make predictions
import pandas as pd
import numpy as np
import sklearn.datasets

# Load the iris dataset
iris = sklearn.datasets.load_iris()
iris_df = pd.DataFrame(data=iris.data, columns=iris.feature_names)
iris_df["target"] = iris.target

# Convert the dataframe to the IrisDataset class
iris_dataset = IrisDataset(iris_df, "target")

# Run a for loop to and track the accuracy of the model
correct = 0
total = 0
for i in range(len(iris_dataset)):
    features, target = iris_dataset[i]
    output = model(features)
    prediction = np.argmax(output.detach().numpy())
    if prediction == target:
        correct += 1
    total += 1

print(f"Accuracy: {correct / total}")



## Deployment - Prep

In [None]:
# Assume the model is good enough, we build the torchserve docker image with the file being at ../models/iris/serving/Dockerfile
# We need to pull the .mar file from the datalake and put it at ../models/iris/training/iris_model.mar

file_client = filesystem_client.get_file_client("iris_models/mar/iris_model.mar")
with open("../models/iris/serving/iris_model.mar", "wb") as file:
    mar = file_client.download_file()
    file.write(mar.readall())

full_serve_image_name = f"{image_repo}/{azure_config['iris_torchserve_name']}"

# Grab current working directory before changing it
current_dir = os.getcwd()
os.chdir("../models/iris/serving")
subprocess.run(["docker", "build", "-t", f"{full_serve_image_name}", "."])
subprocess.run(["docker", "push", f"{full_serve_image_name}"])
os.chdir(current_dir)



## Deployment - TorchServe/KFServe

In [None]:
# We can now deploy the torchserve model to the serving cluster
# We'll use the kserve-iris.yaml file at ../pipeline/kubeflow/kserve/kserve-iris.yaml
with open("../pipeline/kubeflow/kserve/kserve-iris.yaml", "r") as f:
    kserve_yaml = yaml.safe_load(f)

account_name = azure_config["account_name"]
file_system_name = azure_config["model_filesystem"]
storage_uri = f"https://{account_name}.dfs.core.windows.net/{file_system_name}/iris_models/mar"
kserve_yaml["spec"]["predictor"]["model"]["storageUri"] = storage_uri

# We'll save our new yaml at ./manual_pipeline_files/kserve-iris.yaml
with open("./manual_pipeline_files/kserve-iris.yaml", "w") as f:
    yaml.dump(kserve_yaml, f)
    
# In kserve-secret.yaml, we need to pull the credential info from azure_config["kf_serve_spn"] and add them to the yaml.
with open("../pipeline/kubeflow/kserve/kserve-secret.yaml", "r") as f:
    kserve_secret_yaml = yaml.safe_load(f)

kserve_secret_yaml["stringData"]["AZ_CLIENT_ID"] = azure_config["kf_serve_spn"]["client_id"]
kserve_secret_yaml["stringData"]["AZ_CLIENT_SECRET"] = azure_config["kf_serve_spn"]["secret"]
kserve_secret_yaml["stringData"]["AZ_SUBSCRIPTION_ID"] = azure_config["subscription_id"]
kserve_secret_yaml["stringData"]["AZ_TENANT_ID"] = azure_config["kf_serve_spn"]["tenant_id"]

# We'll save our new yaml at ./manual_pipeline_files/kserve-secret.yaml
with open("./manual_pipeline_files/kserve-secret.yaml", "w") as f:
    yaml.dump(kserve_secret_yaml, f)

# We'll apply the yaml to the serving kubeflow cluster. Switch to the serving cluster based on the config file
serving_cluster_name = [cluster for cluster in cluster_names if "serv" in cluster][0]

# Standard Cluster Load
# Load kubeconfig from ./manual_pipeline_files/cluster_configs/<serving_cluster_name>.yaml
os.environ["KUBECONFIG"] = f"./manual_pipeline_files/cluster_configs/{serving_cluster_name}.yaml"

# Local Cluster Load
# os.environ["KUBECONFIG"] = f"/home/{os.environ['USER']}/.kube/config" # Now switch context to minikube for testing
# subprocess.run(["kubectl", "config", "use-context", "minikube"])

# Apply the yaml
generate_kserve_secret = subprocess.run(["kubectl", "apply", "-f", "./manual_pipeline_files/kserve-secret.yaml"], capture_output=True)
generate_kserve_secret.check_returncode()
print(generate_kserve_secret.stdout.decode("utf-8"))

deploy_inference_service = subprocess.run(["kubectl", "apply", "-f", "./manual_pipeline_files/kserve-iris.yaml"], capture_output=True)
deploy_inference_service.check_returncode()
print(deploy_inference_service.stdout.decode("utf-8")) 

# Delete the modified yaml files
os.remove("./manual_pipeline_files/kserve-iris.yaml")
os.remove("./manual_pipeline_files/kserve-secret.yaml")