# Dynamic Client

更灵活的API，除了可以操作Kubernetes标准资源，也可以操作自定义资源。

参考：

- https://github.com/kubernetes-client/python/tree/master/examples/dynamic-client

## 代码示例

### 导入SDK

In [None]:
from kubernetes import client, config, dynamic

### 创建Client

In [None]:
# Configs can be set in Configuration class directly or using helper
# utility. If no argument provided, the config will be loaded from
# default location.
config.load_kube_config(config_file="./.kube/config")
api_client = client.ApiClient()
# Creating a dynamic client
dynamic_client = dynamic.DynamicClient(api_client)

### 操作ConfigMap

In [None]:
# fetching the configmap api
config_map_api = dynamic_client.resources.get(api_version="v1", kind="ConfigMap")

In [None]:
namespace = "sps"
configmap_name = "test-configmap"

configmap_manifest = {
    "kind": "ConfigMap",
    "apiVersion": "v1",
    "metadata": {
        "name": configmap_name,
        "labels": {
            "foo": "bar",
        },
    },
    "data": {
        "config.json": '{"command":"/usr/bin/mysqld_safe"}',
        "frontend.cnf": "[mysqld]\nbind-address = 10.0.0.3\n",
    },
}

In [None]:
# Creating configmap `test-configmap`

configmap = config_map_api.create(body=configmap_manifest, namespace=namespace)

print("\n[INFO] configmap `test-configmap` created\n")

In [None]:
# Listing the configmaps

configmap_list = config_map_api.get(
    name=configmap_name, namespace=namespace, label_selector="foo=bar"
)

print("NAME:\n%s\n" % (configmap_list.metadata.name))
print("DATA:\n%s\n" % (configmap_list.data))

In [None]:
# Updating the configmap's data, `config.json`

configmap_manifest["data"]["config.json"] = "{}"

configmap_patched = config_map_api.patch(
    name=configmap_name, namespace=namespace, body=configmap_manifest
)

print("\n[INFO] configmap `test-configmap` patched\n")
print("NAME:\n%s\n" % (configmap_patched.metadata.name))
print("DATA:\n%s\n" % (configmap_patched.data))

In [None]:
# Deleting configmap `test-configmap`

configmap_deleted = config_map_api.delete(name=configmap_name, body={}, namespace=namespace)
print("\n[INFO] configmap `test-configmap` deleted\n")

### 操作FlinkDeployment

需要在`Kubernetes`上安装[Flink Kubernetes Operator](https://nightlies.apache.org/flink/flink-kubernetes-operator-docs-stable/)。

In [None]:
import yaml

with open("yaml_dir/flink/basic.yaml") as f:
    flink_deployment_manifest = yaml.safe_load(f)
    # yaml file中存在多个document时，使用yaml.safe_load_all(f)返回一个generator
    # flink_deployment_manifest = yaml.safe_load_all(f)

# flink_deployment_manifest

In [None]:
# fetching the configmap api
flink_operator_api = dynamic_client.resources.get(api_version=flink_deployment_manifest['apiVersion'], kind=flink_deployment_manifest['kind'])

In [None]:
# Creating flink deployment

flink_app = flink_operator_api.create(body=flink_deployment_manifest)

print("\n[INFO] flink deployment created\n")

In [None]:
# Listing flink deployment

flink_app_list = flink_operator_api.get(
    name=flink_deployment_manifest['metadata']['name'], namespace=flink_deployment_manifest['metadata']['namespace']
)

print("NAME:\n%s\n" % (flink_app_list.metadata.name))

In [None]:
# List pods
v1 = client.CoreV1Api(api_client)
ret = v1.list_namespaced_pod(flink_deployment_manifest['metadata']['namespace'], label_selector=f"app={flink_deployment_manifest['metadata']['name']}")
for i in ret.items:
    print(f"{i.status.pod_ip}\t{i.metadata.namespace}\t{i.metadata.name}")

In [None]:
# Updating the flink deployment

flink_deployment_manifest["spec"]["taskManager"]["resource"]["cpu"] = 1

flink_app_patched = flink_operator_api.patch(
    content_type="application/merge-patch+json",
    body=flink_deployment_manifest)

print("\n[INFO] flink deployment patched\n")
print("NAME:\n%s\n" % (flink_app_patched.metadata.name))

In [None]:
# Deleting flink deployment

flink_app_deleted = flink_operator_api.delete(
    name=flink_deployment_manifest["metadata"]["name"],
    namespace=flink_deployment_manifest["metadata"]["namespace"])
print("\n[INFO] flink deployment deleted\n")