In [1]:
from typing import List

from pathlib import Path

In [2]:
ZENODO_SANDBOX = False

ZENODO_NOMODELS_ID = "11072579"
ZENODO_MODELS_ID = "11116984"

In [3]:
ray_results_path = Path("~/ray_results").expanduser()
assert ray_results_path.is_dir()

# Upload results

## Select files to upload

In [4]:
def get_files_nomodel():
    total = 0
    all_paths = []
    for f in ray_results_path.rglob('*'):
        if not f.is_file() or f.name.startswith('model.data'):
            continue
        
        total += f.stat().st_size
        all_paths.append(f)

    return total, all_paths

total, all_paths = get_files_nomodel()
print("Total GB:", total / 2**30, "files:", len(all_paths))

Total GB: 24.8716683909297 files: 32229


In [5]:
all_paths[0].relative_to(Path('~').expanduser())

PosixPath('ray_results/LightGCN_optim=map@10,dao=DEAD FoundationsDAO,freq=2d,normalize=True,cutoff_date=2021-11-28T00:00:00,fold=0_2024-03-21T22:11:14.136280/.validate_storage_marker')

## Create tar

In [6]:
import tarfile
from tqdm.autonotebook import tqdm

  from tqdm.autonotebook import tqdm


In [7]:
def compress_files(tar_fname, compress, flist):
    if tar_fname.exists():
        try:
            with tarfile.open(tar_fname, f'r:*') as tar:
                if len(tar.getnames()) == len(flist):
                    print("Same length, not removing", len(tar.getnames()), len(flist))
                    return
                else:
                    print("Removing old file")
                    tar_fname.unlink()
        except:
            tar_fname.unlink()
    
    original_size = 0
    original_total = sum(f.stat().st_size for f in flist)
    with tarfile.open(tar_fname, f'x:{compress}') as tar:
        with tqdm(total=len(flist)) as pbar:
            for i, f in enumerate(flist):
                original_size += f.stat().st_size
                tar.add(f, recursive=False, arcname=f.relative_to(Path('~').expanduser()))
                pbar.update()
    
                if i % 5 == 0 and i > 1:
                    final_size = tar_fname.stat().st_size
                    ratio = final_size / original_size
                    pbar.set_postfix({
                        'ratio': f'{ratio*100:.2f}%',
                        'original': f'{original_size / 2**30:.2f}G',
                        'actual': f'{final_size / 2**30:.2f}G',
                        'est.': f'{ratio*original_total / 2**30:.2f}G',
                    })

    print(f"Original size: {original_size/ 2**30}G, New size: {final_size / 2**30}G, ratio: {final_size / original_size*100:.2f}%")


compress_files(Path("ray_results_nomodels.tar.xz"), "xz", all_paths)

  0%|          | 0/32229 [00:00<?, ?it/s]

IOPub message rate exceeded.
The Jupyter server will temporarily stop sending output
to the client in order to avoid crashing it.
To change this limit, set the config variable
`--ServerApp.iopub_msg_rate_limit`.

Current values:
ServerApp.iopub_msg_rate_limit=1000.0 (msgs/sec)
ServerApp.rate_limit_window=3.0 (secs)



Original size: 24.8716683909297G, New size: 2.085955971851945G, ratio: 8.39%


## Upload to Zenodo

In [8]:
import hashlib

def big_hash(fname):
    with open(fname, "rb") as f:
        file_hash = hashlib.md5()
        while chunk := f.read(8192):
            file_hash.update(chunk)
    
    return file_hash.hexdigest()

big_hash("ray_results_nomodels.tar.xz")

'3ba771d323dc8c5b9a285eeab8b3731c'

In [9]:
from typing import Iterable, Union

import io

import hashlib
import requests
import backoff
from zenodo_client import Zenodo

# https://stackoverflow.com/a/64423275/4505998
class UploadChunksIterator(Iterable):
    """
    This is an interface between python requests and tqdm.
    Make tqdm to be accessed just like IOBase for requests lib.
    """
    def __init__(
        self, file: io.BufferedReader, total_size: int, chunk_size: int = 16 * 1024
    ):  # 16MiB
        self.file = file
        self.chunk_size = chunk_size
        self.total_size = total_size

    def __iter__(self):
        return self

    def __next__(self):
        data = self.file.read(self.chunk_size)
        if not data:
            raise StopIteration
        return data

    # we dont retrive len from io.BufferedReader because CallbackIOWrapper only has read() method.
    def __len__(self):
        return self.total_size

class ZenodoPlus(Zenodo):
    def _upload_big_files(self, *, bucket: str, paths, done=[]) -> List[requests.Response]:
        _paths = [paths] if isinstance(paths, (str, Path)) else paths
        _paths = [ Path(p) for p in _paths]
        _done_dict = { f['filename'] : f for f in done }
    
        rv = []
        # see https://developers.zenodo.org/#quickstart-upload
        for path in _paths:
            total_size = path.stat().st_size
            
            if path.name in _done_dict:
                _info = _done_dict[path.name]
                skip = False
                md5sum = big_hash(path)
                
                if _info['filesize'] == total_size and _info['checksum'] == md5sum:
                    print("Skipping already uploaded file", path, "with hash", md5sum)
                    continue

                raise
                if skip: continue

            with open(path, "rb") as file:
                file = tqdm.wrapattr(
                    file, 
                    "read",
                    miniters=1, 
                    total=total_size, 
                    unit='B',
                    unit_scale=True,
                    unit_divisor=1024,
                )

                with file as f:
                    res = requests.put(
                        f"{bucket}/{path.name}",
                        data=UploadChunksIterator(f, total_size),
                        params={"access_token": self.access_token},
                        # headers={'Content-Type': m.content_type},
                    )

            res.raise_for_status()
            rv.append(res)
        return rv
    
    def upload_to_record(self, deposition_id: str, paths):
        url = f"{self.depositions_base}/{deposition_id}"
        res = requests.get(url, params={"access_token": self.access_token})
        res.raise_for_status()

        deposition_data = res.json()

        bucket = deposition_data['links']['bucket']
        self._upload_big_files(bucket=bucket, paths=paths, done=deposition_data['files'])

Z = ZenodoPlus(sandbox=ZENODO_SANDBOX)
Z.upload_to_record(ZENODO_NOMODELS_ID, ["ray_results_nomodels.tar.xz"])
print(f"Remember to publish {Z.depositions_base}/{ZENODO_NOMODELS_ID}")

Skipping already uploaded file ray_results_nomodels.tar.xz with hash 3ba771d323dc8c5b9a285eeab8b3731c
Remember to publish https://zenodo.org/api/deposit/depositions/11072579


# Upload models

In [10]:
def get_files_model():
    total = 0
    all_paths = []
    for f in ray_results_path.rglob('*'):
        if not f.is_file() or not f.name.startswith('model.data'):
            continue
        
        total += f.stat().st_size
        all_paths.append(f)

    return total, all_paths

total, model_paths = get_files_model()
print("Total GB:", total / 2**30, "files:", len(model_paths))

Total GB: 62.57409965991974 files: 4000


### Won't do it because it takes too much space. It's 48 GiB compressed as xz

In [11]:
# compress_files(Path("ray_results_models.tar.xz"), "xz", model_paths)

Same length, not removing 4000 4000


In [12]:
# Z.upload_to_record(ZENODO_MODELS_ID, ["ray_results_models.tar.xz"])
# print(f"Remember to publish {Z.depositions_base}/{ZENODO_MODELS_ID}")

  0%|          | 0.00/47.9G [00:00<?, ?B/s]

HTTPError: 400 Client Error: BAD REQUEST for url: https://zenodo.org/api/files/abe5a128-6c67-4ae7-95e1-33b457033480/ray_results_models.tar.xz?access_token=S759O8lbNaha4LkEi0mAu5OiCL0QcPsWVRPE8TL3xsj0JSbFeN58NXPBIQLu