# Set AWS Credentials

If you've already set the defaults in your AWS credentials or config files, then you don't need to do anything futher. However, if you wish to use a specific set of AWS credentials/region, use the following cell.

In [None]:
import os

os.environ["AWS_REGION"] = "ap-south-1"
os.environ["AWS_ACCESS_KEY_ID"] = ""
os.environ["AWS_SECRET_ACCESS_KEY"] = ""

# Set Up Dependencies

In [None]:
from aws_utils import (
    ec2_client,
    create_ebs_snapshot,
    encrypt_ebs_snapshot,
    create_ebs_volume_from_snapshot,
    volume_exists,
)
from k8s_utils import (
    v1,
    get_pv_list,
    get_ebs_backed_pvs,
    get_pod_list,
    scale_deployment,
    scale_stateful_set,
    get_pvc,
    pv_exists,
    pvc_exists,
)

from main import (
    logger,
    get_unencrypted_ebs_pvs,
    get_owners,
    get_volume_id_short,
    get_snapshot_details,
    get_snapshot_list_progress,
)

import time
import pickle
import botocore
import traceback
from pathlib import Path
from datetime import datetime

# Start/Resume a Run

If you want to resume a run, make sure you enter the same name as before. The `run_` will be prepended automatically below.

#### NOTE
If you execute certain cells multiple times for some reason, expect weird behaviour. Make sure to change the run name in those cases.

In [None]:
run_name = f"run_{input()}"
run_folder = f"pv_encrypter_state/{run_name}"

if Path(run_folder).exists():
    logger.info(f"Resuming run {run_name} from {run_folder}")
else:
    Path(run_folder).mkdir(exist_ok=True, parents=True)

    logger.info(f"Starting run {run_name}")
    logger.info(f"Created folder: {run_folder}")

In [2]:
## Overview
rf"""
              Internal Flow

    Get list of all PVs with EBS Volumes      ||
                    |                         ||
Get list of PVs with unencrypted EBS Volumes  ||
                    |                         ||
 Scale down linked Deployments/StatefulSets   ||
                    |                         ||
              Create Snapshot                 ||
                    |                         ||
      Create Encrypted Copy of Snapshot       ||
                    |                         ||
    Create EBS Volume from Encrypted Copy     ||
                    |                         ||
       Delete Persistent Volume Claim         ||
                    |                         ||
        Modify PV with new Volume ID          ||
                    |                         ||
                Create PV                     ||
                    |                         ||
                Create PVC                    ||
                    |                        \  /
 Scale up linked Deployments/StatefulSets     \/
"""
pass

# Collect Info (A)

Regarding the PVs and Pods in the Cluster.

In [None]:
# Get the list of all Persistent Volumes in the Cluster.
pv_list = get_pv_list()

# Get all PVs that are backed by EBS Volumes.
ebs_pv_list = get_ebs_backed_pvs(pv_list)

# Find the PVs with unencrypted EBS Volumes.
# NOTE: This is the main list we use for the rest of the notebook.
# So the order of the `unenc_pv_list` will be maintained and used in enc_snapshot_id_ls
# and volume_id_ls.
unenc_pv_list = get_unencrypted_ebs_pvs(ebs_pv_list)

# Get the list of all Pods.
pod_list = get_pod_list()

# Get all the owners of the pods that are linked to one of the PVs from `unenc_pv_list`
# through a PVC.
valid_owners = get_owners(pod_list, unenc_pv_list)

# State A
# Save the above information to disk for later use if necessary.
with open(f"{run_folder}/state_a.pkl", "wb") as w:
    pickle.dump((pv_list, ebs_pv_list, unenc_pv_list, pod_list, valid_owners), w)

## Resume State A

If your run was interrupted after Step A was completed and you haven't proceeded with the subsequent steps, run the following cell to resume your run from this point.

In [None]:
# (Optional)
with open(f"{run_folder}/state_a.pkl", "rb") as r:
    (pv_list, ebs_pv_list, unenc_pv_list, pod_list, valid_owners) = pickle.load(r)

# Scale Down Owners

The Deployments and StatefulSets that are linked to the PVs we care about could have running pods.

In those cases, in order to ensure data integrity, we have to scale them down before proceeding.

### NOTE (Very Important)

* Remember that your applications/microservices can have multiple Deployments/StatefulSets supporting them. So if just one of them is scaled down, say a MySQL DB, the rest of the application could fail.
* If your Deployment/StatefulSet is managed by ArgoCD or its equivalent, it's possible that any modifications that this notebook makes can be overridden when the drift is detected a few minutes later. So we expect 0 replicas, while the reality could be that ArgoCD, for example, has fully scaled it up in the background.
* The following cells **only** care about Deployments/StatefulSets. But there can be any number of owner types.

In these or any one of many, many potential cases, make sure to get your hands dirty and bring down the Deployments/StatefulSets/any other owner yourself; make sure they're scaled down appropriately for the duration of this exercise.

### Work on Subset

In [None]:
deployment_ls = list(valid_owners["Deployment"].keys())
stateful_set_ls = list(valid_owners["StatefulSet"].keys())

In [None]:
# Modify this to limit what gets scaled down.
# From the above two tables that are displayed at the output of the "Collect Info"
# cell, choose from the left most column: "Index" and add it to the corresponding lists
# below.
# The deployments/statefulsets at these indexes will get scaled down.

# The default below takes in the whole list, so you can run this cell without
# modification if you want to operate on the full list.
# Example index selection:
# >> deployment_index_ls = [1, 10, 21, 37]
# >> stateful_set_index_ls = [1, 2, 15, 29]

deployment_index_ls = list(range(len(deployment_ls)))
stateful_set_index_ls = list(range(len(stateful_set_ls)))

In [None]:
limited_deployment_ls = [deployment_ls[i] for i in deployment_index_ls]
limited_stateful_set_ls = [stateful_set_ls[i] for i in stateful_set_index_ls]

In [None]:
# (Optional)
# Verify the Deployments and StatefulSets that will be scaled down.
# Format:
# <namespace>|<name>

print(limited_deployment_ls)
print(limited_stateful_set_ls)

In [None]:
decision = input(
    "Are you sure you wish to go ahead? This will scale down deployments and statefulsets linked to the PVs that are in the process of being encrypted. Enter 'YES'."
)

if decision == "YES":
    # Scale down Deployments.
    for name in limited_deployment_ls:

        # The name contains both namespace and name separated by a '|'.
        deployment_namespace = name.split("|")[0]
        deployment_name = name.split("|")[1]

        scale_deployment(
            name=deployment_name,
            namespace=deployment_namespace,
            replicas=0,
        )

        logger.info(
            f"Scaling down deployment {deployment_name} in namespace {deployment_namespace}"
        )

    # Scale down StatefulSets.
    for name in limited_stateful_set_ls:

        # The name contains both namespace and name separated by a '|'.
        stateful_set_namespace = name.split("|")[0]
        stateful_set_name = name.split("|")[1]

        scale_stateful_set(
            name=stateful_set_name,
            namespace=stateful_set_namespace,
            replicas=0,
        )

        logger.info(
            f"Scaling down statefulset {stateful_set_name} in namespace {stateful_set_namespace}"
        )

# (Important) Limit PVs

This is where you get to limit the PVs that get operated on.

In [None]:
# Select the PV indexes that you want to operate on.
# You could refer to the right-most column: "PV Index" of the two tables in the
# output of the "Collect Info" cell.

# The default below takes in the whole list, so you can run this cell without
# modification if you want to operate on the full list.
# Example: unenc_pv_index_list = [1, 4, 21, 37]

unenc_pv_index_list = list(range(len(unenc_pv_list)))
limited_unenc_pv_list = [unenc_pv_list[i] for i in unenc_pv_index_list]

In [None]:
# (Optional)
# Review the operational list of PVs.
# (PV Name, PVC Name, PVC Namespace)
print([(pv.metadata.name, pv.spec.claim_ref.name, pv.spec.claim_ref.name) for pv in limited_unenc_pv_list])

# Create Snapshots (B)

Create Snapshots of all qualifying EBS volumes.

### NOTE
For any AWS account, there can a maximum of 100 pending Snapshots at any time in a region. So we do the Snapshot creation in batches of 100.

In [None]:
# The list of IDs of unencrypted Snapshots created from the unencrypted EBS volumes.
unenc_snapshot_id_ls = []

# A folder to backup the PV and PVC objects.
# We keep this separate from the backup of the other objects because they're critical
# to the recreation process later on. 
Path(f"{run_folder}/pv_pvc_manifests").mkdir(exist_ok=True, parents=True)

In [None]:
# Create batches of 100 PVs each from the `limited_unenc_pv_list` list.

unenc_pv_batches = []
for batch_num in range((len(limited_unenc_pv_list) // 100) + 1):

    # Make sure the start index is less than the length of the list.
    if batch_num * 100 < len(limited_unenc_pv_list):

        # Add a batch of 100 elements.
        # 0:100, 100:200, 200:300, ...
        unenc_pv_batches.append(
            limited_unenc_pv_list[batch_num * 100 : (batch_num + 1) * 100]
        )

In [None]:
# For each batch.
for batch_idx, batch in enumerate(unenc_pv_batches):

    logger.info(f"Processing Batch #{batch_idx}")

    # For each PV in the list of unencrypted PVs.
    for idx, pv in enumerate(batch):

        logger.info(f"Iteration {idx}")

        # Get the PVC linked to the PV.
        pvc = get_pvc(
            claim_ref_name=pv.spec.claim_ref.name,
            claim_ref_namespace=pv.spec.claim_ref.namespace,
        )

        # Save the manifest of the PV and the corresponding PVC so they can be recreated
        # later.
        with open(
            f"{run_folder}/pv_pvc_manifests/{pv.spec.claim_ref.name}.pkl", "wb"
        ) as w:
            pickle.dump({"pv": pv, "pvc": pvc}, w)

        # Strip the `aws://<region>/` prefix.
        volume_id = get_volume_id_short(pv.spec.aws_elastic_block_store.volume_id)

        logger.info(
            f"Triggering Snapshot creation for {volume_id}"
        )

        # Create the EBS snapshot from the unencrypted EBS volume.
        snapshot_id = create_ebs_snapshot(
            volume_id=volume_id,
            extra_log_info=f"Created for run: {run_name} at {str(datetime.now())}",
            tags=[
                {
                    "Key": "RunName",
                    "Value": run_name,
                }
            ],
        )

        logger.info(
            f"In-Progress Snapshot ID: {snapshot_id}"
        )

        unenc_snapshot_id_ls.append(snapshot_id)

# State B
# Save list of unencrypted Snapshot IDs to disk.
with open(f"{run_folder}/unenc_snapshot_id_ls.pkl", "wb") as w:
    pickle.dump(unenc_snapshot_id_ls, w)

### Check Snapshot Status

The following is a blocking cell; it will block execution until the Snapshots have been fully created in AWS.

Uncomment the cell to run it. Or if you don't want to block execution, run the cell below that periodically to check for completion.

In any case, do NOT proceed with the rest of the notebook without this step being complete and it's confirmed that all snapshots are completed.

In [None]:
# (Optional)
# Blocking cell.
status = False
while not status:
    response = get_snapshot_list_progress(unenc_snapshot_id_ls)
    if response["avg_progress"] == 100:
        status = True
    else:
        time.sleep(20)

In [None]:
# Non-Blocking Cell to ensure all Snapshots in `unenc_snapshot_id_ls` are complete.
response = get_snapshot_list_progress(unenc_snapshot_id_ls)
assert (
    response["avg_progress"] == 100
), f"Progress is still at {response['avg_progress']}"

## Resume State B

If your run was interrupted after Step B was completed and you haven't proceeded with the subsequent steps, run the following cell to resume your run from this point.

In [None]:
# Load State A
with open(f"{run_folder}/state_a.pkl", "rb") as r:
    (pv_list, ebs_pv_list, unenc_pv_list, pod_list, valid_owners) = pickle.load(r)

# Load State B
with open(f"{run_folder}/unenc_snapshot_id_ls.pkl", "rb") as r:
    unenc_snapshot_id_ls = pickle.load(r)

# Create Encrypted Snapshots (C)

Create encrypted copies of the snapshots created in the previous step and stored in `unenc_snapshot_id_ls`.

### NOTE
For any AWS account, there can a maximum of 20 pending Snapshots being copied at any time in a region. So we do the Snapshot copying in batches of 20.

In [None]:
# The list of encrypted Snapshots copied from the unencrypted snapshots.
enc_snapshot_id_ls = []

In [None]:
# Create batches.
enc_snapshot_id_batches = []
for batch_num in range((len(enc_snapshot_id_ls) // 20) + 1):

    # Make sure the start index is less than the length of the list.
    if batch_num * 20 < len(enc_snapshot_id_ls):

        # Add a batch of 20 elements.
        # 0:20, 20:40, 40:60, ...
        enc_snapshot_id_batches.append(
            enc_snapshot_id_ls[batch_num * 20 : (batch_num + 1) * 20]
        )

In [None]:
# For each batch.
for batch_idx, batch in enumerate(enc_snapshot_id_batches):

    logger.info(f"Processing Batch #{batch_idx}")

    # For each PV in the list of unencrypted PVs.
    for idx, unenc_snapshot_id in enumerate(batch):

        logger.info(f"Iteration {idx}")

        logger.info(
            f"Triggering encrypted Snapshot copy for {unenc_snapshot_id}."
        )

        # Make an encrypted copy of the unencrypted EBS snapshot.
        snapshot_id = encrypt_ebs_snapshot(
            snapshot_id=unenc_snapshot_id,
            extra_log_info=f"Created for run: {run_name} at {str(datetime.now())}",
            tags=[
                {
                    "Key": "RunName",
                    "Value": run_name,
                }
            ],
        )

        if snapshot_id is not False:
            logger.info(
                f"In-Progress Snapshot ID: {snapshot_id}"
            )
        else:
            logger.error(f"Encrypted Snapshot not created. Error. Snapshot: {unenc_snapshot_id}")

        enc_snapshot_id_ls.append(snapshot_id)

# State C
# Save the list of encrypted snapshots to disk.
with open(f"{run_folder}/enc_snapshot_id_ls.pkl", "wb") as w:
    pickle.dump(enc_snapshot_id_ls, w)

### Check Snapshot Status

In [None]:
# (Optional)
# Blocking cell.
status = False
while not status:
    response = get_snapshot_list_progress(enc_snapshot_id_ls)
    if response["avg_progress"] == 100:
        status = True
    else:
        time.sleep(20)

In [None]:
# Non-Blocking Cell.
# Check status of Snapshot creation before proceeding.
response = get_snapshot_list_progress(enc_snapshot_id_ls)
assert (
    response["avg_progress"] == 100
), f"Progress is still at {response['avg_progress']}"

## Resume State C

If your run was interrupted after Step C was completed and you haven't proceeded with the subsequent steps, run the following cell to resume your run from this point.

In [None]:
# Load State A
with open(f"{run_folder}/state_a.pkl", "rb") as r:
    (pv_list, ebs_pv_list, unenc_pv_list, pod_list, valid_owners) = pickle.load(r)

# Load State B
with open(f"{run_folder}/unenc_snapshot_id_ls.pkl", "rb") as r:
    unenc_snapshot_id_ls = pickle.load(r)

# Load State C
with open(f"{run_folder}/enc_snapshot_id_ls.pkl", "rb") as r:
    enc_snapshot_id_ls = pickle.load(r)

# Create Volumes from Encrypted Snapshots (D)

Using the encrypted snapshots created in the previous step, create encrypted EBS Volumes in the same AZ as the original volume.

There's a one-one relationship between: `limited_unenc_pv_list`, `unenc_snapshot_id_ls`, and `enc_snapshot_id_ls`. So we use this to find the AZ of the EBS volume from the labels of the PV.

NOTE: There's no delay to EBS Volume creation from a Snapshot. So there's no quota limiting us, meaning there's no need to batch our requests.

In [None]:
# List of encrypted Volume IDs.
volume_id_ls = []

In [None]:
# For each PV in the list of unencrypted PVs.
for idx, snapshot_id in enumerate(enc_snapshot_id_ls):

    # Find Availability Zone. Important since the new volume should be in the same region as the old one.
    availability_zone = limited_unenc_pv_list[idx].metadata.labels.get(
        "topology.kubernetes.io/zone"
    ) or limited_unenc_pv_list[idx].metadata.labels.get(
        "failure-domain.beta.kubernetes.io/zone"
    )

    logger.info(f"Triggering Volume creation from {snapshot_id}")

    # Create EBS Volume from the encrypted Snapshot.
    volume_id = create_ebs_volume_from_snapshot(
        snapshot_id=snapshot_id,
        availability_zone=availability_zone,
        tags=[
            {
                "Key": "RunName",
                "Value": run_name,
            }
        ],
    )

    if volume_id is not False:
        logger.info(
            f"Created Volume ID: {volume_id}"
        )
    else:
        logger.error(f"Volume not created. Error. Snapshot: {snapshot_id}")

    volume_id_ls.append(volume_id)

# Save the list of encrypted Volume IDs to disk.
with open(f"{run_folder}/volume_id_ls.pkl", "wb") as w:
    pickle.dump(volume_id_ls, w)

### Volume Status

Volumes are immediately created and don't have a waiting period before they can be used.

## Resume State D

If your run was interrupted, run the following cell to resume your run from this point.

In [None]:
# Load State A
with open(f"{run_folder}/state_a.pkl", "rb") as r:
    (pv_list, ebs_pv_list, unenc_pv_list, pod_list, valid_owners) = pickle.load(r)

# Load State B
with open(f"{run_folder}/unenc_snapshot_id_ls.pkl", "rb") as r:
    unenc_snapshot_id_ls = pickle.load(r)

# Load State C
with open(f"{run_folder}/enc_snapshot_id_ls.pkl", "rb") as r:
    enc_snapshot_id_ls = pickle.load(r)

# Load State D
with open(f"{run_folder}/volume_id_ls.pkl", "rb") as r:
    volume_id_ls = pickle.load(r)

# Warning

Till this point, we've not had any destructive operations, but the next step, which is PVC deletion, will delete the linked PV, which will in turn (if the PV is configured thusly) delete the underlying EBS volume.

Points to remember:

* For each EBS volume, there are:
    * Unencrypted snapshots.
    * Encrypted snapshots.
    * A new EBS volume created from the encrypted copy.

So, you're **definitely not** risking your data itself. It's safe in these three forms. You can further confirm this by checking the resources in AWS, observing the tags applied to each one, etc.

However, exercise caution beyond this point. I highly recommend that you try with a single PV/PVC before running it for everything.

# Review Saved PVs and PVCs

In [None]:
# Total number of unencrypted PVs.
len(limited_unenc_pv_list)

In [None]:
# NOTE: Modify this to see information about different PV/PVC pairs.
idx = 0

pv = limited_unenc_pv_list[idx]

with open(f"{run_folder}/pv_pvc_manifests/{pv.spec.claim_ref.name}.pkl", "rb") as r:
    pv_pvc_manifests = pickle.load(r)

print("PVC Name:", pv_pvc_manifests["pvc"].metadata.name)
print("PV Name:", pv_pvc_manifests["pv"].metadata.name)
print("PV Spec Claim Reference:", pv_pvc_manifests["pv"].spec.claim_ref)
print("PV EBS Spec:", pv_pvc_manifests["pv"].spec.aws_elastic_block_store)

# Retrieve Cleaned PVCs

In [None]:
cleaned_list = []
for pv in limited_unenc_pv_list:
    with open(f"{run_folder}/pv_pvc_manifests/{pv.spec.claim_ref.name}.pkl", "rb") as r:
        pv_pvc_manifests = pickle.load(r)

    # List of tuples (pv, pvc)
    # These are PV and PVC objects which have been stripped of any temporary keys like creation time, uid, etc.
    cleaned_list.append((pv_pvc_manifests["pv"], pv_pvc_manifests["pvc"]))

# Delete PVC -> Create PV -> Create PVC

As mentioned earlier, deleting the original PVC will also delete the linked PV, which will then delete the associated EBS volume.

In [None]:
logger.info(f"{len(cleaned_list)} PVs/PVCs")

In [None]:
for idx, (pv, pvc) in enumerate(cleaned_list):

    logger.info(f"Iteration {idx}")

    # Delete PVC.

    logger.info(f"Deleting PVC {pvc.metadata.name}")

    try:
        response = v1.delete_namespaced_persistent_volume_claim(
            name=pvc.metadata.name,
            namespace=pvc.metadata.namespace,
        )
    except:
        logger.info("Error during PVC deletion. Continuing with the next iteration.")
        continue

    logger.info(f"Deleted PVC {pvc.metadata.name}")

    # Provide a small delay for the PVC deletion to propagate to the PV.
    time.sleep(3)

    # Create PV.

    logger.info(
        f"Creating PV {pv.metadata.name} with the newly encrypted volume: {volume_id_ls[idx]}"
    )

    # Set the volume_id to the encrypted one that we just created.
    try:
        pv.spec.aws_elastic_block_store.volume_id = volume_id_ls[idx]
        pv_creation_response = v1.create_persistent_volume(body=pv)
    except:
        logger.info("Error during PV creation. Continuing with the next iteration.")
        continue

    logger.info(f"Created Persistent Volume: {pv.metadata.name}")

    # Create PVC.

    logger.info(f"Creating PVC {pvc.metadata.name}.")

    # We change nothing because we've created the PV with the same name so the
    # references still hold valid.
    try:
        pvc_creation_response = v1.create_namespaced_persistent_volume_claim(
            namespace=pvc.metadata.namespace,
            body=pvc,
        )
    except:
        logger.info("Error during PVC creation. Continuing with the next iteration.")
        continue

    logger.info(f"Created Persistent Volume Claim: {pvc.metadata.name}")

# Scale Up Deployments/StatefulSets

In [None]:
decision = input(
    "Are you sure you want to go ahead? This will scale up deployments and statefulsets linked to the PVs that were just encrypted. Enter 'YES'."
)

if decision == "YES":
    # Scale down Deployments.
    for name in limited_deployment_ls:

        # The name contains both namespace and name separated by a '|'.
        deployment_namespace = name.split("|")[0]
        deployment_name = name.split("|")[1]

        replicas = len(valid_owners["Deployment"][name])
        scale_deployment(
            name=deployment_name,
            namespace=deployment_namespace,
            replicas=replicas,
        )

        logger.info(
            f"Scaling up deployment {deployment_name} in namespace {deployment_namespace} to {replicas} replicas."
        )

    # Scale down StatefulSets.
    for name in limited_stateful_set_ls:

        stateful_set_namespace = name.split("|")[0]
        stateful_set_name = name.split("|")[1]

        replicas = len(valid_owners["StatefulSet"][name])
        scale_stateful_set(
            name=stateful_set_name,
            namespace=stateful_set_namespace,
            replicas=replicas,
        )

        logger.info(
            f"Scaling up statefulset {stateful_set_name} in namespace {stateful_set_namespace} to {replicas} replicas."
        )