Skip to content

Commit

Permalink
experimenting with volumes
Browse files Browse the repository at this point in the history
  • Loading branch information
keegansmith21 committed Jul 11, 2024
1 parent 47695c4 commit 06d3e1a
Show file tree
Hide file tree
Showing 2 changed files with 93 additions and 92 deletions.
10 changes: 4 additions & 6 deletions observatory_platform/airflow/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -81,9 +81,7 @@ def check_connections(*connections):


@task
def gke_create_storage(
project_id: str, zone: str, volume_name: str, volume_size: int, kubernetes_conn_id: str, **context
):
def gke_create_storage(volume_name: str, volume_size: int, kubernetes_conn_id: str, **context):
"""Create storage on a GKE cluster.
:param project_id: the Google Cloud project ID.
Expand All @@ -95,12 +93,12 @@ def gke_create_storage(
:return: None.
"""

gcp_create_disk(project_id=project_id, zone=zone, disk_name=volume_name, disk_size_gb=volume_size)
# gcp_create_disk(project_id=project_id, zone=zone, disk_name=volume_name, disk_size_gb=volume_size)
gke_create_volume(kubernetes_conn_id=kubernetes_conn_id, volume_name=volume_name, size_gi=volume_size)


@task
def gke_delete_storage(project_id: str, zone: str, volume_name: str, kubernetes_conn_id: str, **context):
def gke_delete_storage(volume_name: str, kubernetes_conn_id: str, **context):
"""Delete storage on a GKE cluster.
:param project_id: the Google Cloud project ID.
Expand All @@ -112,4 +110,4 @@ def gke_delete_storage(project_id: str, zone: str, volume_name: str, kubernetes_
"""

gke_delete_volume(kubernetes_conn_id=kubernetes_conn_id, volume_name=volume_name)
gcp_delete_disk(project_id=project_id, zone=zone, disk_name=volume_name)
# gcp_delete_disk(project_id=project_id, zone=zone, disk_name=volume_name)
175 changes: 89 additions & 86 deletions observatory_platform/google/gke.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,90 +19,6 @@
from kubernetes import client


def gke_create_volume(*, kubernetes_conn_id: str, volume_name: str, size_gi: int):
"""
:param kubernetes_conn_id:
:param volume_name:
:param size_gi:
:return: None.
"""

# Make Kubernetes API Client from Airflow Connection
hook = KubernetesHook(conn_id=kubernetes_conn_id)
api_client = hook.get_conn()
v1 = client.CoreV1Api(api_client=api_client)

# Create the PersistentVolume
capacity = {"storage": f"{size_gi}Gi"}
pv = client.V1PersistentVolume(
api_version="v1",
kind="PersistentVolume",
metadata=client.V1ObjectMeta(
name=volume_name,
# TODO: supposed to use this user for the persistent volume but doesn't seem to do anything
# annotations={"pv.beta.kubernetes.io/uid": f"{uid}", "pv.beta.kubernetes.io/gid": f"{uid}"}
),
spec=client.V1PersistentVolumeSpec(
capacity=capacity,
access_modes=["ReadWriteOnce"],
persistent_volume_reclaim_policy="Retain",
storage_class_name="standard",
gce_persistent_disk=client.V1GCEPersistentDiskVolumeSource(pd_name=volume_name),
),
)
v1.create_persistent_volume(body=pv)

# Create PersistentVolumeClaim
namespace = hook.get_namespace()
pvc = client.V1PersistentVolumeClaim(
api_version="v1",
kind="PersistentVolumeClaim",
metadata=client.V1ObjectMeta(name=volume_name),
spec=client.V1PersistentVolumeClaimSpec(
access_modes=["ReadWriteOnce"],
resources=client.V1ResourceRequirements(requests=capacity),
storage_class_name="standard",
),
)
v1.create_namespaced_persistent_volume_claim(namespace=namespace, body=pvc)


def gke_delete_volume(*, kubernetes_conn_id: str, volume_name: str):
"""
:param kubernetes_conn_id:
:param namespace:
:param volume_name:
:return: None.
"""

# Make Kubernetes API Client from Airflow Connection
hook = KubernetesHook(conn_id=kubernetes_conn_id)
api_client = hook.get_conn()
v1 = client.CoreV1Api(api_client=api_client)

# Delete VolumeClaim and Volume
namespace = hook.get_namespace()
try:
v1.delete_namespaced_persistent_volume_claim(name=volume_name, namespace=namespace)
except kubernetes.client.exceptions.ApiException as e:
if e.status == 404:
logging.info(
f"gke_delete_volume: PersistentVolumeClaim with name={volume_name}, namespace={namespace} does not exist"
)
else:
raise e

try:
v1.delete_persistent_volume(name=volume_name)
except kubernetes.client.exceptions.ApiException as e:
if e.status == 404:
logging.info(f"gke_delete_volume: PersistentVolume with name={volume_name} does not exist")
else:
raise e


# def gke_create_volume(*, kubernetes_conn_id: str, volume_name: str, size_gi: int):
# """
#
Expand All @@ -119,10 +35,26 @@ def gke_delete_volume(*, kubernetes_conn_id: str, volume_name: str):
#
# # Create the PersistentVolume
# capacity = {"storage": f"{size_gi}Gi"}
# pv = client.V1PersistentVolume(
# api_version="v1",
# kind="PersistentVolume",
# metadata=client.V1ObjectMeta(
# name=volume_name,
# # TODO: supposed to use this user for the persistent volume but doesn't seem to do anything
# # annotations={"pv.beta.kubernetes.io/uid": f"{uid}", "pv.beta.kubernetes.io/gid": f"{uid}"}
# ),
# spec=client.V1PersistentVolumeSpec(
# capacity=capacity,
# access_modes=["ReadWriteOnce"],
# persistent_volume_reclaim_policy="Retain",
# storage_class_name="standard",
# gce_persistent_disk=client.V1GCEPersistentDiskVolumeSource(pd_name=volume_name),
# ),
# )
# v1.create_persistent_volume(body=pv)
#
# # Create PersistentVolumeClaim
# namespace = hook.get_namespace()
# namespace = "coki-astro"
# pvc = client.V1PersistentVolumeClaim(
# api_version="v1",
# kind="PersistentVolumeClaim",
Expand All @@ -133,7 +65,7 @@ def gke_delete_volume(*, kubernetes_conn_id: str, volume_name: str):
# storage_class_name="standard",
# ),
# )
# r = v1.create_namespaced_persistent_volume_claim(namespace=namespace, body=pvc)
# v1.create_namespaced_persistent_volume_claim(namespace=namespace, body=pvc)
#
#
# def gke_delete_volume(*, kubernetes_conn_id: str, volume_name: str):
Expand Down Expand Up @@ -169,6 +101,77 @@ def gke_delete_volume(*, kubernetes_conn_id: str, volume_name: str):
# logging.info(f"gke_delete_volume: PersistentVolume with name={volume_name} does not exist")
# else:
# raise e


def gke_create_volume(*, kubernetes_conn_id: str, volume_name: str, size_gi: int):
"""
:param kubernetes_conn_id:
:param volume_name:
:param size_gi:
:return: None.
"""

# Make Kubernetes API Client from Airflow Connection
hook = KubernetesHook(conn_id=kubernetes_conn_id)
api_client = hook.get_conn()
v1 = client.CoreV1Api(api_client=api_client)

# Create the PersistentVolume
capacity = {"storage": f"{size_gi}Gi"}

# Create PersistentVolumeClaim
namespace = hook.get_namespace()
# namespace = "coki-astro"
print(namespace) # TODO delete
pvc = client.V1PersistentVolumeClaim(
api_version="v1",
kind="PersistentVolumeClaim",
metadata=client.V1ObjectMeta(name=volume_name),
spec=client.V1PersistentVolumeClaimSpec(
access_modes=["ReadWriteOnce"],
resources=client.V1ResourceRequirements(requests=capacity),
storage_class_name="standard",
),
)
v1.create_namespaced_persistent_volume_claim(namespace=namespace, body=pvc)


def gke_delete_volume(*, kubernetes_conn_id: str, volume_name: str):
"""
:param kubernetes_conn_id:
:param namespace:
:param volume_name:
:return: None.
"""

# Make Kubernetes API Client from Airflow Connection
hook = KubernetesHook(conn_id=kubernetes_conn_id)
api_client = hook.get_conn()
v1 = client.CoreV1Api(api_client=api_client)

# Delete VolumeClaim and Volume
namespace = hook.get_namespace()
try:
v1.delete_namespaced_persistent_volume_claim(name=volume_name, namespace=namespace)
except kubernetes.client.exceptions.ApiException as e:
if e.status == 404:
logging.info(
f"gke_delete_volume: PersistentVolumeClaim with name={volume_name}, namespace={namespace} does not exist"
)
else:
raise e

# try:
# v1.delete_persistent_volume(name=volume_name)
# except kubernetes.client.exceptions.ApiException as e:
# if e.status == 404:
# logging.info(f"gke_delete_volume: PersistentVolume with name={volume_name} does not exist")
# else:
# raise e


#
#
# def gke_retrieve_secret(*, secret_name: str, namespace: str):
Expand Down

0 comments on commit 06d3e1a

Please sign in to comment.