# BackupScript
## Development Proof of Concept

---
### Path Management System

In [1]:
## Config Mockup
bkp_root = "bkp"
files = [
    {
        "path": "C:/Documents/Files",
        "backup_path_override": None,
        "subpath": "backup1",
        "ignored_paths": []
    },
    {
        "path": "C:/Documents/Files4",
        "backup_path_override": None,
        "mode": "sync",
        "ignored_paths": []
    },
    {
        "path": "C:/Testing/Path/Test1/TestFiles",
        "backup_path_override": None,
        "mode": "snapshot",
        "ignored_paths": []
    }
]
path_reduction = 1

In [2]:
import os
from typing import Dict, Tuple, List

class File(object):
    def __init__(self, name, dir, mtime):
        self.name = name
        self.dir = dir
        self.mtime = mtime
    
    @property
    def absolute_path(self):
        return os.path.join(self.dir, self.name).replace("\\", "/")

    @property
    def path(self):
        return os.path.join(os.path.dirname(self.dir), self.name).replace("\\", "/")

scandir_sim = {
    "C:/Documents/Files": [
        File("text1.txt", "C:/Documents/Files", 100),
        File("text2.txt", "C:/Documents/Files", 100),
    ],
    "C:/Documents/Files4": [
        File("test1.txt", "C:/Documents/Files4", 150),
        File("test3.txt", "C:/Documents/Files4", 150)
    ],
    "C:/Testing/Path/Test1/TestFiles": [
        File("testing1.txt", "C:/Testing/Path/Test1/TestFiles", 200),
        File("testing2.txt", "C:/Testing/Path/Test1/TestFiles", 200)
    ]
}

def scandir(p: str):
    return scandir_sim.get(p, [])


In [3]:
## Get Source Path / Get Backup Path
def get_bkp_path(source_path: str, subpath: str = "", *, override_path_reduction: int = None):
    _nodrive_path = os.path.splitdrive(source_path)[1]
    _path_parts = [x for x in _nodrive_path.split("/") if len(x) > 0]
    return os.path.join(bkp_root, subpath, *_path_parts[path_reduction if override_path_reduction is None else override_path_reduction:]).replace("\\", "/")

def get_path_reduction(source_data: Dict[str, str]):
    override_path_reduction = source_data.get("override_path_reduction", None)
    path_red = path_reduction if override_path_reduction is None else override_path_reduction
    return path_red if path_red > 0 else None

def get_src_path(bkpp: str, source_data: Dict[str, str]):
    path: str = source_data.get("path")
    _subp = bkpp.replace("\\", "/")[len(source_data.get("subpath", "")) + len(bkp_root) + 1:]
    _bkpdir = os.path.join(*path.split("/")[get_path_reduction(source_data)+1:]).replace("\\", "/")
    bkpdir_ind = path.find(_bkpdir)
    if len(_bkpdir) > 0 and bkpdir_ind != -1:
        _fixed_sourcedir = path[:bkpdir_ind] + path[bkpdir_ind+len(_bkpdir):]
    else:
        _fixed_sourcedir = path
    _p = os.path.join(_fixed_sourcedir, _subp[1 if _subp.startswith("/") else None:]).replace("\\", "/")
    return _p

In [4]:
backed_up_files = []

print(f"{'-'*10} Build {'-'*10}")
print(f"{'Source'.ljust(45)} -> {'Backup'.ljust(40)}")
for d in files:
    path = d.get("path")
    for f in scandir(path):
        _bpkpath = get_bkp_path(f.absolute_path, d.get("subpath", ""))
        backed_up_files.append([_bpkpath, path, f.absolute_path, d])
        print(f"{f.absolute_path.ljust(45)} -> {_bpkpath.ljust(40)}")

print(f"{'-'*10} Restore {'-'*10}")
print(f"{'⚫'}| {'Source'.ljust(45)} -> {'Backup'.ljust(40)} -> {'Reconstructed'.ljust(45)}")
for f in backed_up_files:
    f: Tuple[str, str, str, Dict[str, str]]
    original = f[2]
    reconstructed = get_src_path(f[0], f[3])
    print(f"{'✔️' if original == reconstructed else '❌'}| {original.ljust(45)} -> {f[0].ljust(40)} -> {reconstructed.ljust(45)}")


---------- Build ----------
Source                                        -> Backup                                  
C:/Documents/Files/text1.txt                  -> bkp/backup1/Files/text1.txt             
C:/Documents/Files/text2.txt                  -> bkp/backup1/Files/text2.txt             
C:/Documents/Files4/test1.txt                 -> bkp/Files4/test1.txt                    
C:/Documents/Files4/test3.txt                 -> bkp/Files4/test3.txt                    
C:/Testing/Path/Test1/TestFiles/testing1.txt  -> bkp/Path/Test1/TestFiles/testing1.txt   
C:/Testing/Path/Test1/TestFiles/testing2.txt  -> bkp/Path/Test1/TestFiles/testing2.txt   
---------- Restore ----------
⚫| Source                                        -> Backup                                   -> Reconstructed                                
✔️| C:/Documents/Files/text1.txt                  -> bkp/backup1/Files/text1.txt              -> C:/Documents/Files/text1.txt                 
✔️| C:/Documents/Files/text

### Sync System Update source/delete/create logic

In [5]:
## Simulation Objects
class SourceSim(object):
    """
    Test Class for filechanges simulation
    """
    def __init__(self, files: Dict[str, float]):
        self.files = files.copy()

    def update(self, file: str, time: float):
        self.files[file] = time

    def remove(self, file: str):
        self.files.pop(file, None)

    def __repr__(self):
        return ", ".join(f"{f}: {t}" for f, t in self.files.items())
    
    def copy(self):
        return SourceSim(self.files)
    
    def print(self, header: str):
        print(f"{header}")
        for f, ts in self.files.items():
            print(f"{f}: {ts}")

source_files = {
    "file1": 250,
    "file2": 150,
    "file3": 150
}

sync_snapshot = {
    "timestamp": 300,
    "contents": source_files.copy()
}


In [6]:
## Simulation Setup
sfiles = SourceSim(source_files)
bfiles = SourceSim(source_files)
# Add file4 to source
sfiles.update("file4", 350)
# Remove file2 from backup
bfiles.remove("file2")
# Update file1 on source
sfiles.update("file1", 400)

sfiles.print("Source Files:")
print("")
bfiles.print("Backup Files:")


Source Files:
file1: 400
file2: 150
file3: 150
file4: 350

Backup Files:
file1: 250
file3: 150


In [7]:
## Comparison PoC
class Action(object):
    def __init__(self, action: str, file: str, filedata: Tuple[bool, bool, float, float]):
        self.file = file
        self.action = action
        self.filedata = filedata
    
    def __repr__(self):
        return f"{self.action} >> {self.file}"

def compare(source_files_in, bkp_files_in):
    actions = []
    file_data = {}
    latest_actions = [0, 0]
    for f, mt in source_files_in.items():
        fd = file_data.setdefault(f, [False, False, 0, 0])
        fd[0] = True
        fd[2] = mt
        if mt > latest_actions[0]:
            latest_actions[0] = mt
    
    for f, mt in bkp_files_in.items():
        fd = file_data.setdefault(f, [False, False, 0, 0])
        fd[1] = True
        fd[3] = mt
        if mt > latest_actions[1]:
            latest_actions[1] = mt
    
    for f, fd in file_data.items():
        sync_data = sync_snapshot["contents"].get(f, -1)
        sync_timestamp = sync_snapshot.get("timestamp", 0)
        if fd[0] != fd[1]:
            if max(fd[2], fd[3]) > sync_timestamp:
                actions.append(Action("Create", f, fd))
            else:
                actions.append(Action("Remove", f, fd))
        else:
            # This technically only happens if both exist due to data coming from iteration
            if fd[2] > fd[3]:
                actions.append(Action("UpdateFromSource", f, fd))
            elif fd[2] < fd[3]:
                actions.append(Action("UpdateFromBackup", f, fd))
    print(s:= f"{'Latest Action on source' if latest_actions[0] > latest_actions[1] else 'Latest Action on backup' if latest_actions[0] < latest_actions[1] else 'Directories in sync'} @{max(latest_actions)}")
    print('-'*len(s))
    return actions

In [9]:
sfilesc = sfiles.copy()
bfilesc = bfiles.copy()
print("First Sync Attempt:")
fileactions: List[Action] = compare(sfilesc.files, bfilesc.files)
print("\n".join(str(x) for x in fileactions))
for f in fileactions:
    f: Action
    if f.action in ("Create", "UpdateFromSource", "UpdateFromBackup"):
        if f.action in ("UpdateFromSource", "Create"):
            bfilesc.update(f.file, max(f.filedata[2], f.filedata[3]))
        if f.action in ("UpdateFromBackup", "Create"):
            sfilesc.update(f.file, max(f.filedata[2], f.filedata[3]))
    else:
        sfilesc.remove(f.file)
        bfilesc.remove(f.file)

print("\nNew Files:")
print("    Source:")
for f, t in sfilesc.files.items():
    print(f"{f}: {t}")
print("    Backup:")
for f, t in bfilesc.files.items():
    print(f"{f}: {t}")

print("\nSecond Sync Attempt:")
fileactions2: List[Action] = compare(sfilesc.files, bfilesc.files)
print("\n".join(str(x) for x in fileactions2))

First Sync Attempt:
Latest Action on source @400
----------------------------
UpdateFromSource >> file1
Remove >> file2
Create >> file4

New Files:
    Source:
file1: 400
file3: 150
file4: 350
    Backup:
file1: 400
file3: 150
file4: 350

Second Sync Attempt:
Directories in sync @400
------------------------

