In [1]:
projects = !gcloud config get-value project
!gcloud config set project {projects[0]}
!gcloud container clusters get-credentials two-node-cluster --zone us-central1-a

Updated property [core/project].
Fetching cluster endpoint and auth data.
kubeconfig entry generated for two-node-cluster.


In [2]:
# Get the nodes names
nodes = !kubectl get nodes
first_node = nodes[1].split()[0]
second_node = nodes[2].split()[0]
print(f"First node: {first_node}")
print(f"Second node: {second_node}")

First node: gke-two-node-cluster-node-pool-1-260bb633-kp47
Second node: gke-two-node-cluster-node-pool-1-260bb633-tgkc


In [3]:
# Change node names in yaml files

import yaml

dir_yaml = 'deploy_real_kubernetes/'

# FIRST NODE
with open(dir_yaml + 'deploy_micro_inf.yaml', 'r') as filename:
    micro_inf_configs = list(yaml.safe_load_all(filename))
micro_inf_configs[0]['spec']['template']['spec']['nodeName'] = first_node
with open(dir_yaml + 'deploy_micro_inf.yaml', 'w') as filename:
    yaml.safe_dump_all(micro_inf_configs, filename)

with open(dir_yaml + '04-mlflow.yaml', 'r') as filename:
    mlflow_configs = list(yaml.safe_load_all(filename))
mlflow_configs[2]['spec']['template']['spec']['nodeName'] = first_node
with open(dir_yaml + '04-mlflow.yaml', 'w') as filename:
    yaml.safe_dump_all(mlflow_configs, filename)

# SECOND NODE
with open(dir_yaml + '01-zookeeper.yaml', 'r') as filename:
    zookeeper_configs = list(yaml.safe_load_all(filename))
zookeeper_configs[1]['spec']['template']['spec']['nodeName'] = second_node
with open(dir_yaml + '01-zookeeper.yaml', 'w') as filename:
    yaml.safe_dump_all(zookeeper_configs, filename)

with open(dir_yaml + '02-kafka.yaml', 'r') as filename:
    kafka_configs = list(yaml.safe_load_all(filename))
kafka_configs[1]['spec']['template']['spec']['nodeName'] = second_node
with open(dir_yaml + '02-kafka.yaml', 'w') as filename: 
    yaml.safe_dump_all(kafka_configs, filename)

with open(dir_yaml + 'deploy_micro_up.yaml', 'r') as filename:
    micro_up_configs = list(yaml.safe_load_all(filename))
micro_up_configs[0]['spec']['template']['spec']['nodeName'] = second_node
with open(dir_yaml + 'deploy_micro_up.yaml', 'w') as filename:
    yaml.safe_dump_all(micro_up_configs, filename)

In [4]:
# Apply yaml files
dir_name = 'deploy_real_kubernetes'
import os

files = [f for f in os.listdir(dir_name) if os.path.isfile(os.path.join(dir_name, f))]
for file in files:
    if file.endswith('.yaml'):
        print(f'Applying {file}...')
        !kubectl apply -f {os.path.join(dir_name, file)}
    else:
        print(f'Skipping {file}, not a YAML file.')

Applying 00-namespace.yaml...
namespace/kafka created
Applying 01-zookeeper.yaml...
service/zookeeper-service created
deployment.apps/zookeeper created
Applying 02-kafka.yaml...
service/kafka-service created
deployment.apps/kafka-broker created
Applying 04-mlflow.yaml...
persistentvolumeclaim/mlflow-pvc created
service/mlflow created
deployment.apps/mlflow created
persistentvolume/mlflow-pv created
Applying api_inference_svc.yaml...
service/api-inferencia created
Applying api_update_svc.yaml...
service/api-update created
Applying deploy_micro_inf.yaml...
deployment.apps/api-inferencia created
persistentvolumeclaim/api-inferencia-pvc created
persistentvolume/api-inferencia-pv created
Applying deploy_micro_up.yaml...
deployment.apps/api-update created
persistentvolumeclaim/api-update-pvc created
persistentvolume/api-update-pv created
Skipping node1.txt, not a YAML file.


In [12]:
# Copy Files to Pods

# pods = !kubectl get pods | findstr /R "upd"

# Check if pods are running
# !kubectl exec {pods[0].split()[0]} -- ls /app/

# pods = !kubectl get pods | findstr /R "upd"
# !kubectl cp ../temp/upd/API_update_V8.1.py {pods[0].split()[0]}:/app/API_update_V8.1.py 
# !kubectl cp ../../Datasets/real_usage/AGR_a_first_train.csv {pods[0].split()[0]}:/app/AGR_a_first_train.csv
# !kubectl cp ../../Datasets/real_usage/AGR_g_first_train.csv {pods[0].split()[0]}:/app/AGR_g_first_train.csv

pods = !kubectl get pods | findstr /R "inf"
!kubectl cp ../temp/inf/API_inferencia_V8.0.py {pods[0].split()[0]}:/app/API_inferencia_V8.0.py


In [None]:
# Run the scripts

import os
import threading

update_pods = !kubectl get pods | findstr /R "upd"

def run_update_script(pod_name):
    os.system(f"kubectl exec {pod_name} -- nohup python3 /app/API_update_V8.1.py > NUL 2>&1 &")
t = threading.Thread(target=run_update_script, args=(update_pods[0].split()[0],))
t.start()

def run_inference_script(pod_name, script_path):
    os.system(f"kubectl exec {pod_name} -- nohup python3 {script_path} > NUL 2>&1 &")

inference_pods = !kubectl get pods | findstr /R "inf"
for pod in inference_pods:
    pod_name = pod.split()[0]
    script_path = "/app/API_inferencia_V8.0.py"
    t = threading.Thread(target=run_inference_script, args=(pod_name, script_path))
    t.start()

In [11]:
# [SKIP] Kill the scripts
for pod in inference_pods:
    !kubectl exec {pod.split()[0]} -- pkill -f API
for pod in update_pods:
    !kubectl exec {pod.split()[0]} -- pkill -f API

In [20]:
# See python processes running in the pod
# update_pods = !kubectl get pods | findstr /R "upd"
# inference_pods = !kubectl get pods | findstr /R "inf"
print("Checking Python processes in pods:")
for pod in inference_pods:
    print(f"Pod: {pod.split()[0]}")
    !kubectl exec {pod.split()[0]} -- ps aux | findstr /R "python"

for pod in update_pods:
    print(f"Pod: {pod.split()[0]}")
    !kubectl exec {pod.split()[0]} -- ps aux | findstr /R "python"

Checking Python processes in pods:
Pod: api-inferencia-6f47cb7df8-92lbg
Pod: api-inferencia-6f47cb7df8-dxnqj
Pod: api-inferencia-6f47cb7df8-sp8t6
Pod: api-inferencia-6f47cb7df8-tn9f8
Pod: api-update-794765cdd7-wwk5f


In [None]:
# Check if program is working
# update_pods = !kubectl get pods | findstr /R "upd"
# !kubectl exec {update_pods[0].split()[0]} -- ls
# !kubectl exec {update_pods[0].split()[0]} -- cat message_log.csv