<a href="https://colab.research.google.com/github/SankarTG/DevOps/blob/master/KVM_VM_Management_Web_Interface.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
"""
Web interface for managing KVM Ubuntu machines and their VMs.
"""

import libvirt  # For interacting with KVM
from flask import Flask, render_template, request, redirect, url_for, flash, jsonify  # Web framework
import subprocess  # For executing commands (e.g., SSH)
import os
from urllib.parse import urlparse

app = Flask(__name__)
app.secret_key = os.urandom(24)  # Secure secret key for Flask sessions

# Configuration (Move to a separate config.py file for better organization)
class Config:
    DEBUG = True
    KVM_HOSTS = [
        {"name": "kvm1", "uri": "qemu+ssh://user@kvm1.example.com/system"},
        {"name": "kvm2", "uri": "qemu+ssh://user@kvm2.example.com/system"},
        # Add more KVM hosts as needed
    ]
    BACKUP_STORAGE_PATH = "/var/lib/libvirt/images_backup"  # Example backup location
    TEMPLATE_DIR = "/var/lib/libvirt/images" # VM templates
    os_types = ["ubuntu", "centos", "debian"]  # Add more OS types
    # Add allowed memory sizes and CPUs.
    mem_sizes = ["512", "1024", "2048", "4096"]
    cpus_options = ["1","2","4","8"]

# Load the configuration
app.config.from_object(Config)
# Utility Functions

def get_kvm_connection(uri):
    """
    Establishes a connection to the KVM hypervisor using libvirt.

    Args:
        uri (str): The libvirt URI for the KVM host.

    Returns:
        libvirt.virConnect: A connection object, or None on error.
    """
    try:
        conn = libvirt.open(uri)
        if conn is None:
            raise libvirt.libvirtError(f"Failed to connect to KVM at {uri}")
        return conn
    except libvirt.libvirtError as e:
        print(f"Error connecting to KVM: {e}")  # Log the error
        flash(f"Error connecting to KVM: {e}", 'danger')
        return None

def execute_command(command, host_uri=None):
    """
    Executes a command, optionally on a remote host via SSH.

    Args:
        command (str): The command to execute.
        host_uri (str, optional):  If provided, the command is executed on the host.

    Returns:
        tuple: (stdout, stderr, return_code)
    """
    if host_uri:
        # Parse the URI to get hostname.  Cleans the user@ off
        parsed_uri = urlparse(host_uri)
        hostname = parsed_uri.netloc.split('@')[-1]
        ssh_command = f"ssh {hostname} '{command}'"
        print(f"Executing remote command: {ssh_command}")  # for debug
        process = subprocess.Popen(ssh_command, shell=True, capture_output=True, text=True)
    else:
        print(f"Executing local command: {command}")
        process = subprocess.Popen(command, shell=True, capture_output=True, text=True)
    stdout, stderr = process.communicate()
    return_code = process.returncode
    return stdout, stderr, return_code

def list_vms(conn):
    """
    Lists VMs on a given KVM connection.

    Args:
        conn (libvirt.virConnect): The KVM connection object.

    Returns:
        list: A list of VM dictionaries, or None on error.  Each dict contains 'name', 'status'.
    """
    try:
        vms = []
        for dom_id in conn.listDomainsID():
            dom = conn.lookupByID(dom_id)
            if dom:  # Make sure dom is not None
                vms.append({
                    "name": dom.name(),
                    "status": dom.isActive() and "running" or "stopped",
                    "id": dom.ID(),
                })
        return vms
    except libvirt.libvirtError as e:
        print(f"Error listing VMs: {e}")
        flash(f"Error listing VMs: {e}", 'danger')
        return None
    except AttributeError as e:
        print(f"Error: {e}")
        flash(f"Error: {e}", 'danger')
        return None

def get_vm_status(conn, vm_name):
    """
    Gets the status of a specific VM.

    Args:
        conn (libvirt.virConnect):  The KVM connection
        vm_name (str): The name of the VM.

    Returns:
        str: The status of the VM ('running', 'stopped', or None on error).
    """
    try:
        dom = conn.lookupByName(vm_name)
        if dom.isActive():
            return "running"
        else:
            return "stopped"
    except libvirt.libvirtError as e:
        print(f"Error getting VM status for {vm_name}: {e}")
        flash(f"Error getting VM status: {e}", 'danger')
        return None
    except AttributeError as e:
        print(f"Error: {e}")
        flash(f"Error: {e}", 'danger')
        return None

def create_vm(conn, vm_name, template_name, memory, cpus, host_name):
    """
    Creates a new VM from a template.

    Args:
        conn (libvirt.virConnect): The KVM connection object.
        vm_name (str): The name of the new VM.
        template_name (str): The name of the template image.
        memory (str): amount of memory.
        cpus (str): number of cpus.
        host_name (str): the host name
    Returns:
        bool: True on success, False on error.
    """
    try:
        # Construct the full path to the template.
        template_path = f"{app.config['TEMPLATE_DIR']}/{template_name}"
        if not os.path.exists(template_path):
            raise FileNotFoundError(f"Template image not found: {template_path}")

        # Construct the VM image path (create a new image for the VM).
        vm_image_path = f"{app.config['BACKUP_STORAGE_PATH']}/{vm_name}.qcow2" # changed
        # Example command: qemu-img create -f qcow2 /var/lib/libvirt/images/new_vm.qcow2 -b /var/lib/libvirt/images/template.qcow2
        command = f"qemu-img create -f qcow2 {vm_image_path} -b {template_path}"
        stdout, stderr, return_code = execute_command(command, host_uri=None) #local
        if return_code != 0:
            raise Exception(f"Failed to create VM image: {stderr}")

        # Define the VM XML configuration (basic example - adapt as needed).  Use f-strings
        vm_xml = f"""
        <domain type='kvm'>
          <name>{vm_name}</name>
          <memory unit='MiB'>{memory}</memory>
          <vcpu>{cpus}</vcpu>
          <os>
            <type arch='x86_64' machine='pc-i440fx-2.0'>hvm</type>
            <boot dev='hd'/>
          </os>
          <devices>
            <disk type='file' device='disk'>
              <driver name='qemu' type='qcow2'/>
              <source file='{vm_image_path}'/>
              <target dev='vda' bus='virtio'/>
            </disk>
            <interface type='network'>
              <source network='default'/>
              <model type='virtio'/>
            </interface>
            <console type='pty'>
              <target type='scl' port='0'/>
            </console>
            <channel type='spicevmc'>
              <target type='virtio' name='com.redhat.spice.0'/>
            </channel>
            <graphics type='spice' autoport='yes'/>
          </devices>
        </domain>
        """
        # Define the VM using the XML
        domain = conn.defineXML(vm_xml)
        domain.create() #start
        return True
    except Exception as e:
        print(f"Error creating VM {vm_name}: {e}")
        flash(f"Error creating VM: {e}", 'danger')
        return False

def clone_vm(conn, original_vm_name, new_vm_name):
    """
    Clones an existing VM.  This is a more robust approach.

    Args:
        conn (libvirt.virConnect): The KVM connection object.
        original_vm_name (str): The name of the VM to clone.
        new_vm_name (str): The name of the new VM.

    Returns:
        bool: True on success, False on error.
    """
    try:
        original_dom = conn.lookupByName(original_vm_name)
        if not original_dom:
            raise libvirt.libvirtError(f"Original VM not found: {original_vm_name}")

        # Get the XML definition of the original VM.
        original_xml = original_dom.XMLDesc(0)

        # Modify the XML for the new VM (change name, disk path, etc.).
        new_xml = original_xml.replace(f"<name>{original_vm_name}</name>", f"<name>{new_vm_name}</name>")

        # Modify the disk source file.
        original_image_path = ""
        import xml.etree.ElementTree as ET
        root = ET.fromstring(original_xml)
        for disk in root.findall('devices/disk'):
            source = disk.find('source')
            if source is not None and source.get('file'):
                original_image_path = source.get('file')
                break
        if not original_image_path:
            raise libvirt.libvirtError(f"Original VM has no disk source file defined.")
        new_image_path = original_image_path.replace(original_vm_name, new_vm_name) # changed

        new_xml = new_xml.replace(f"<source file='{original_image_path}'/>", f"<source file='{new_image_path}'/>")

        # Create a new disk image for the clone (using qemu-img).
        command = f"qemu-img create -f qcow2 {new_image_path} -b {original_image_path}"
        stdout, stderr, return_code = execute_command(command, host_uri=None) # local
        if return_code != 0:
            raise Exception(f"Failed to create new disk image: {stderr}")

        # Define the new VM.
        new_dom = conn.defineXML(new_xml)
        new_dom.create()
        return True
    except libvirt.libvirtError as e:
        print(f"Error cloning VM {original_vm_name} to {new_vm_name}: {e}")
        flash(f"Error cloning VM: {e}", 'danger')
        return False
    except Exception as e:
        print(f"Error cloning VM {original_vm_name} to {new_vm_name}: {e}")
        flash(f"Error cloning VM: {e}", 'danger')
        return False

def delete_vm(conn, vm_name):
    """
    Deletes a VM and its associated disk image.

    Args:
        conn (libvirt.virConnect): The KVM connection object.
        vm_name (str): The name of the VM to delete.

    Returns:
        bool: True on success, False on error.
    """
    try:
        dom = conn.lookupByName(vm_name)
        if not dom:
            raise libvirt.libvirtError(f"VM not found: {vm_name}")

        # 1. Destroy the VM (if it's running).
        if dom.isActive():
            dom.destroy()

        # 2. Undefine the VM (remove its definition from libvirt).
        dom.undefine()

        # 3. Delete the VM's disk image file.
        import xml.etree.ElementTree as ET
        dom_xml = dom.XMLDesc()
        root = ET.fromstring(dom_xml)
        image_path = ""
        for disk in root.findall('devices/disk'):
            source = disk.find('source')
            if source is not None and source.get('file'):
                image_path = source.get('file')
                break
        if image_path:
            command = f"rm -f {image_path}"
            stdout, stderr, return_code = execute_command(command, host_uri=None) # local
            if return_code != 0:
                raise Exception(f"Failed to delete VM image: {stderr}")
        return True
    except libvirt.libvirtError as e:
        print(f"Error deleting VM {vm_name}: {e}")
        flash(f"Error deleting VM: {e}", 'danger')
        return False
    except Exception as e:
        print(f"Error deleting VM {vm_name}: {e}")
        flash(f"Error deleting VM: {e}", 'danger')
        return False

def backup_vm(conn, vm_name):
    """
    Backs up a VM's disk image.  This is a simplified example.

    Args:
        conn (libvirt.virConnect): The KVM connection object.
        vm_name (str): The name of the VM to back up.

    Returns:
        bool: True on success, False on error.
    """
    try:
        dom = conn.lookupByName(vm_name)
        if not dom:
            raise libvirt.libvirtError(f"VM not found: {vm_name}")
        import xml.etree.ElementTree as ET
        dom_xml = dom.XMLDesc()
        root = ET.fromstring(dom_xml)
        image_path = ""
        for disk in root.findall('devices/disk'):
            source = disk.find('source')
            if source is not None and source.get('file'):
                image_path = source.get('file')
                break

        if not image_path:
            raise libvirt.libvirtError(f"VM has no disk image defined.")
        backup_path = f"{app.config['BACKUP_STORAGE_PATH']}/{vm_name}.qcow2.bak"
        # Use qemu-img to create a backup (copy) of the disk image.
        command = f"cp {image_path} {backup_path}"
        stdout, stderr, return_code = execute_command(command, host_uri=None)  # Local
        if return_code != 0:
            raise Exception(f"Failed to backup VM image: {stderr}")
        return True
    except libvirt.libvirtError as e:
        print(f"Error backing up VM {vm_name}: {e}")
        flash(f"Error backing up VM: {e}", 'danger')
        return False
    except Exception as e:
        print(f"Error backing up VM {vm_name}: {e}")
        flash(f"Error backing up VM: {e}", 'danger')
        return False

def restore_vm(conn, vm_name):
    """
    Restores a VM from a backup.  This is a simplified example.

    Args:
        conn (libvirt.virConnect): The KVM connection object.
        vm_name (str): The name of the VM to restore.

    Returns:
        bool: True on success, False on error.
    """
    try:
        dom = conn.lookupByName(vm_name)
        if not dom:
             raise libvirt.libvirtError(f"VM not found: {vm_name}")
        import xml.etree.ElementTree as ET
        dom_xml = dom.XMLDesc()
        root = ET.fromstring(dom_xml)
        image_path = ""
        for disk in root.findall('devices/disk'):
            source = disk.find('source')
            if source is not None and source.get('file'):
                image_path = source.get('file')
                break
        if not image_path:
            raise libvirt.libvirtError(f"VM has no disk image defined.")

        backup_path = f"{app.config['BACKUP_STORAGE_PATH']}/{vm_name}.qcow2.bak"
        if not os.path.exists(backup_path):
            raise FileNotFoundError(f"Backup not found for VM {vm_name}")
        # Use qemu-img to restore the disk image from the backup.
        command = f"cp {backup_path} {image_path}"
        stdout, stderr, return_code = execute_command(command, host_uri=None) #local
        if return_code != 0:
            raise Exception(f"Failed to restore VM image: {stderr}")

        #stop and start vm
        if dom.isActive():
            dom.destroy()
        dom.create()
        return True
    except libvirt.libvirtError as e:
        print(f"Error restoring VM {vm_name}: {e}")
        flash(f"Error restoring VM: {e}", 'danger')
        return False
    except Exception as e:
        print(f"Error restoring VM {vm_name}: {e}")
        flash(f"Error restoring VM: {e}", 'danger')
        return False

# Flask Routes

@app.route("/")
def index():
    """
    Displays a list of KVM hosts.
    """
    return render_template("index.html", kvm_hosts=app.config['KVM_HOSTS'])

@app.route("/kvm/<host_name>")
def kvm_host(host_name):
    """
    Displays a list of VMs on a specific KVM host.

    Args:
        host_name (str): The name of the KVM host.
    """
    host = next((h for h in app.config['KVM_HOSTS'] if h['name'] == host_name), None) #find the host
    if not host:
        flash(f"KVM host '{host_name}' not found.", 'danger')
        return redirect(url_for('index'))

    conn = get_kvm_connection(host['uri'])
    if not conn:
        return render_template("kvm_host.html", host=host, vms=None)  # show the kvm_host page

    vms = list_vms(conn)
    conn.close()
    return render_template("kvm_host.html", host=host, vms=vms)

@app.route("/vm/create/<host_name>", methods=['GET', 'POST'])
def create_vm_route(host_name):
    """
    Handles VM creation requests.

     Args:
        host_name (str): The name of the KVM host.
    """
    host = next((h for h in app.config['KVM_HOSTS'] if h['name'] == host_name), None) #find the host
    if not host:
        flash(f"KVM host '{host_name}' not found.", 'danger')
        return redirect(url_for('index'))
    conn = get_kvm_connection(host['uri'])
    if not conn:
        return redirect(url_for('kvm_host', host_name=host_name))

    if request.method == 'POST':
        vm_name = request.form['vm_name']
        template_name = request.form['template_name']
        memory = request.form['memory']
        cpus = request.form['cpus']

        if not vm_name:
            flash("VM name is required", 'danger')
            return render_template("create_vm.html", host=host, os_types = app.config['os_types'], mem_sizes = app.config['mem_sizes'], cpus_options = app.config['cpus_options'])
        if not template_name:
            flash("Template is required", 'danger')
            return render_template("create_vm.html", host=host, os_types = app.config['os_types'], mem_sizes = app.config['mem_sizes'], cpus_options = app.config['cpus_options'])
        if not memory:
            flash("Memory is required", 'danger')
            return render_template("create_vm.html", host=host, os_types = app.config['os_types'], mem_sizes = app.config['mem_sizes'], cpus_options = app.config['cpus_options'])
        if not cpus:
            flash("CPUs is required", 'danger')
            return render_template("create_vm.html", host=host, os_types = app.config['os_types'], mem_sizes = app.config['mem_sizes'], cpus_options = app.config['cpus_options'])

        if create_vm(conn, vm_name, template_name, memory, cpus, host_name):
            flash(f"VM '{vm_name}' created successfully.", 'success')
            conn.close()
            return redirect(url_for('kvm_host', host_name=host_name))
        else:
            conn.close()
            return render_template("create_vm.html", host=host, os_types = app.config['os_types'], mem_sizes = app.config['mem_sizes'], cpus_options = app.config['cpus_options'])

    return render_template("create_vm.html", host=host, os_types = app.config['os_types'], mem_sizes = app.config['mem_sizes'], cpus_options = app.config['cpus_options'])

@app.route("/vm/clone/<host_name>/<vm_name>", methods=['GET', 'POST'])
def clone_vm_route(host_name, vm_name):
    """
    Handles VM cloning requests.

    Args:
        host_name (str): The name of the KVM host.
        vm_name (str): The name of the VM to clone.
    """
    host = next((h for h in app.config['KVM_HOSTS'] if h['name'] == host_name), None) #find the host
    if not host:
        flash(f"KVM host '{host_name}' not found.", 'danger')
        return redirect(url_for('index'))

    conn = get_kvm_connection(host['uri'])
    if not conn:
        return redirect(url_for('kvm_host', host_name=host_name))

    if request.method == 'POST':
        new_vm_name = request.form['new_vm_name']
        if not new_vm_name:
            flash("New VM name is required.", 'danger')
            return render_template("clone_vm.html", host=host, vm_name=vm_name)

        if clone_vm(conn, vm_name, new_vm_name):
            flash(f"VM '{vm_name}' cloned to '{new_vm_name}' successfully.", 'success')
            conn.close()
            return redirect(url_for('kvm_host', host_name=host_name))
        else:
            conn.close()
            return render_template("clone_vm.html", host=host, vm_name=vm_name)

    return render_template("clone_vm.html", host=host, vm_name=vm_name)

@app.route("/vm/delete/<host_name>/<vm_name>", methods=['POST'])
def delete_vm_route(host_name, vm_name):
    """
    Handles VM deletion requests.

    Args:
        host_name (str): The name of the KVM host.
        vm_name (str): The name of the VM to delete.
    """
    host = next((h for h in app.config['KVM_HOSTS'] if h['name'] == host_name), None) #find the host
    if not host:
        flash(f"KVM host '{host_name}' not found.", 'danger')
        return redirect(url_for('index'))
    conn = get_kvm_connection(host['uri'])
    if not conn:
        return redirect(url_for('kvm_host', host_name=host_name))

    if delete_vm(conn, vm_name):
        flash(f"VM '{vm_name}' deleted successfully.", 'success')
    else:
        flash(f"Failed to delete VM '{vm_name}'.", 'danger')
    conn.close()
    return redirect(url_for('kvm_host', host_name=host_name))

@app.route("/vm/backup/<host_name>/<vm_name>", methods=['POST'])
def backup_vm_route(host_name, vm_name):
    """
    Handles VM backup requests.

    Args:
        host_name (str): The name of the KVM host.
        vm_name (str): The name of the VM to back up.
    """
    host = next((h for h in app.config['KVM_HOSTS'] if h['name'] == host_name), None) #find the host
    if not host:
        flash(f"KVM host '{host_name}' not found.", 'danger')
        return redirect(url_for('index'))

    conn = get_kvm_connection(host['uri'])
    if not conn:
        return redirect(url_for('kvm_host', host_name=host_name))

    if backup_vm(conn, vm_name):
        flash(f"VM '{vm_name}' backed up successfully.", 'success')
    else:
        flash(f"Failed to back up VM '{vm_name}'.", 'danger')
    conn.close()
    return redirect(url_for('kvm_host', host_name=host_name))

@app.route("/vm/restore/<host_name>/<vm_name>", methods=['POST'])
def restore_vm_route(host_name, vm_name):
    """
    Handles VM restore requests.

    Args:
        host_name (str): The name of the KVM host.
        vm_name (str): The name of the VM to restore.
    """
    host = next((h for h in app.config['KVM_HOSTS'] if h['name'] == host_name), None) #find the host
    if not host:
        flash(f"KVM host '{host_name}' not found.", 'danger')
        return redirect(url_for('index'))

    conn = get_kvm_connection(host['uri'])
    if not conn:
        return redirect(url_for('kvm_host', host_name=host_name))

    if restore_vm(conn, vm_name):
        flash(f"VM '{vm_name}' restored successfully.", 'success')
    else:
        flash(f"Failed to restore VM '{vm_name}'.", 'danger')
    conn.close()
    return redirect(url_for('kvm_host', host_name=host_name))

if __name__ == "__main__":
    app.run(debug=Config.DEBUG)