In [None]:
# pip install mysql-connector-python
# pip install PyMySQL

## Keep N of File

In [None]:
import os

def keep_last_n_files(folder_path, n):
    files = [os.path.join(folder_path, f) for f in os.listdir(folder_path)
             if os.path.isfile(os.path.join(folder_path, f))]
    files.sort(key=os.path.getmtime, reverse=True)
    for file_to_delete in files[n:]:
        try:
            os.remove(file_to_delete)
            print(f"Deleted: {file_to_delete}")
        except Exception as e:
            print(f"Failed to delete {file_to_delete}: {e}")

if __name__ == "__main__":
    folder = "path/to/your/folder"  
    max_files = 5                  

    keep_last_n_files(folder, max_files)


## Full Backup

In [None]:
import subprocess
import datetime
import re
import json
import os

def backup_database(host, user, password, db_name, output_file_prefix):
    # Remove trailing slash if any
    if output_file_prefix.endswith('/') or output_file_prefix.endswith('\\'):
        output_file_prefix = output_file_prefix.rstrip('/\\')
    
    # Create the full backup folder if it doesn't exist
    os.makedirs(output_file_prefix, exist_ok=True)

    timestamp = datetime.datetime.now().strftime("%Y-%m-%d_%H-%M-%S")
    
    output_file = f"{output_file_prefix}/full_backup_{timestamp}.sql"
    binlog_info_file = "./config/last_full_backup.json"

    command = [
        "mysqldump",
        f"--host={host}",
        f"--user={user}",
        f"--password={password}",
        db_name,
        "--single-transaction",
        "--master-data=2",
        f"--result-file={output_file}"
    ]
    
    print("Running command:", " ".join(command))
    
    result = subprocess.run(command, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
    
    if result.returncode == 0:
        print(f"Backup successful. File saved as {output_file}.")

        binlog_info = {}
        with open(output_file, 'r', encoding='utf-8') as f:
            for line in f:
                match = re.search(r"CHANGE MASTER TO MASTER_LOG_FILE='(.+)', MASTER_LOG_POS=(\d+);", line)
                if match:
                    binlog_info['binlog_file'] = match.group(1)
                    binlog_info['binlog_pos'] = int(match.group(2))
                    print(f"Extracted binlog info: {binlog_info}")
                    break
        
        if binlog_info:
            # Ensure config folder exists
            os.makedirs("./config", exist_ok=True)

            # Save to JSON file
            with open(binlog_info_file, 'w', encoding='utf-8') as jf:
                json.dump(binlog_info, jf, indent=4)
            print(f"Binlog info saved to JSON file: {binlog_info_file}")
        else:
            print("Warning: Could not find binlog info in backup file.")
    else:
        print(f"Error: {result.stderr.decode('utf-8')}")

# Example usage
backup_database("localhost", "root", "1234", "computer_shop", "../backup/full")


## Differential Backup

In [None]:
import subprocess
import json
import os
import datetime

def differential_backup_from_config(user, password, host, backup_dir='./backup/differential', config_path='./config/last_full_backup.json'):
    if not os.path.exists(config_path):
        print(f"Error: Config file not found at {config_path}. Please run full backup first.")
        return
    
    with open(config_path, 'r') as f:
        binlog_info = json.load(f)
    
    binlog_file = binlog_info.get('binlog_file')
    start_position = binlog_info.get('binlog_pos')
    
    if not binlog_file or not start_position:
        print("Error: Invalid binlog info in config file.")
        return

    # Ensure differential backup folder exists
    os.makedirs(backup_dir, exist_ok=True)

    # Generate timestamped output file name
    timestamp = datetime.datetime.now().strftime("%Y-%m-%d_%H-%M-%S")
    output_file = os.path.join(backup_dir, f"diff_backup_{timestamp}.sql")

    command = [
        "mysqlbinlog",
        f"--start-position={start_position}",
        "--read-from-remote-server",
        f"--host={host}",
        f"--user={user}",
        f"--password={password}",
        binlog_file
    ]

    print("Running command:", " ".join(command))

    with open(output_file, "w") as out_file:
        result = subprocess.run(command, stdout=out_file, stderr=subprocess.PIPE)

    if result.returncode == 0:
        print(f"Differential backup successful. File saved as {output_file}.")
    else:
        print(f"Error: {result.stderr.decode('utf-8')}")

differential_backup_from_config(
    user="root",
    password="1234",
    host="localhost",
    backup_dir='../backup/differential'
)


## Restore full backup

In [None]:
import subprocess

def restore_database(host, user, password, db_name, input_file):
    command = [
        "mysql",
        f"--host={host}",
        f"--user={user}",
        f"--password={password}",
        db_name
    ]
    print(command)
    
    with open(input_file, "r") as infile:
        result = subprocess.run(command, stdin=infile, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
    print(result)
    
    if result.returncode == 0:
        print(f"Restore successful from {input_file}.")
    else:
        print(f"Error: {result.stderr.decode('utf-8')}")

# Example usage
restore_database("localhost", "root", "1234", "computer_shop", "../backup/full/full_backup_2025-07-05_23-02-10.sql")


## Restore Full backup with differential backup

In [3]:
import subprocess

def restore_database(host, user, password, db_name, input_fullbackup_file,input_diff_backup_file):
    command = [
        "mysql",
        f"--host={host}",
        f"--user={user}",
        f"--password={password}",
        db_name
    ]
    print(command)
    # restore fullbackup
    with open(input_fullbackup_file, "r") as infile:
        result = subprocess.run(command, stdin=infile, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
    print(result)
    
    if result.returncode == 0:
        print(f"Restore successful from {input_fullbackup_file}.")
    else:
        print(f"Error: {result.stderr.decode('utf-8')}")
    # restore last differential file
    with open(input_diff_backup_file, "r") as infile:
        result = subprocess.run(command, stdin=infile, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
    print(result)
    
    if result.returncode == 0:
        print(f"Restore successful from {input_diff_backup_file}.")
    else:
        print(f"Error: {result.stderr.decode('utf-8')}")

# Example usage
restore_database("localhost", "root", "1234", "computer_shop", 
                input_fullbackup_file="../backup/full/full_backup_2025-07-05_23-02-10.sql",
                input_diff_backup_file="../backup/differential/diff_backup_2025-07-05_23-35-27.sql")


['mysql', '--host=localhost', '--user=root', '--password=1234', 'computer_shop']
ERROR 1305 (42000) at line 1413: FUNCTION computer_shop.count_feedback_by_product does not exist

Restore successful from ../backup/differential/diff_backup_2025-07-05_23-35-27.sql.
