<a href="https://colab.research.google.com/github/fireicewolf/HFtoMS/blob/main/HFtoMS.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
# @title Install dependencies
!pip install --no-cache-dir -U huggingface-hub modelscope requests
!pip cache purge
!apt update && apt install aria2 -y

In [None]:
# @title Remove useless python package for more storage space.
!pip uninstall tensorboard tensorboard-data-server tensorflow tensorflow-datasets tensorflow-estimator tensorflow-gcs-config tensorflow-hub tensorflow-io-gcs-filesystem tensorflow-metadata tensorflow-probability tensorstore -y
!pip uninstall torch torchaudio torchsummary torchtext torchvision triton -y
!pip uninstall opencv-python-headless opencv-python opencv-contrib-python -y
!pip uninstall Sphinx sphinxcontrib-applehelp sphinxcontrib-devhelp sphinxcontrib-htmlhelp sphinxcontrib-jsmath sphinxcontrib-qthelp sphinxcontrib-serializinghtml -y

In [None]:
# @title Cleanup workspace
%cd /content
!rm -rf /content/*

In [None]:
#@title HF to MS
from typing import Literal

import json
import math
import os
import requests

from huggingface_hub import HfApi,login, list_repo_tree, hf_hub_download
from huggingface_hub.hf_api import RepoFile, RepoFolder

from modelscope.hub.api import HubApi
from modelscope.hub.constants import ModelVisibility

WORKSPACE = "/content"
WORK_MODE = "Fork single file from Huggingface to Modelscope" # @param {"type":"string"} ["Fork repo from Huggingface to Modelscope","Fork single file from Huggingface to Modelscope","Download single file from CivitAI to Modelscope"]
#@markdown Fork repo from HuggingFace
HF_TOKEN = "" # @param {"type":"string","placeholder":"Your HF_TOKEN"}
HF_REPO_ID = "" # @param {type:"string","placeholder":"HF Repo ID you want to fork"}
HF_RECURSIVE = True # @param {type:"boolean"}
HF_REPO_TYPE = "model" # @param ["model","space","dataset"]
HF_REVISION = "main" # @param {type:"string","placeholder":"HF Repo REVISION"}
#@markdown Download Single file from HuggingFace
HF_SUBFOLDER = "" # @param {"type":"string","placeholder":"HF subfolder name"}
HF_FILENAME = "" # @param {type:"string","placeholder":"HF single file name"}
#@markdown CIVITAI download
CIVITAI_TOKEN = "" # @param {"type":"string","placeholder":"CivitAI_TOKEN"}
DOWNLOAD_LINK = "" # @param {"type":"string","placeholder":"CivitAI Link"}
DOWNLOAD_FILENAME = "" # @param {"type":"string","placeholder":"Download file name"}
#@markdown Modelscope settings
MS_TOKEN = "" # @param {type:"string","placeholder":"Your MS_TOKEN"}
MS_REPO_ID = "" # @param {type:"string","placeholder":"MS Repo ID you want to create/upload"}
MS_REPO_PRIVATE = True # @param {type:"boolean","placeholder":"MS Repo visibility"}
MS_REVISION = "master" # @param {type:"string","placeholder":"MS Repo REVISION"}
MS_COMMIT = "Upload to ModelScope" # @param {type:"string","placeholder":"MS Repo commit message"}
#@markdown Modelscope AIGC repo settings
MS_UPLOAD_AIGC_MODEL = False # @param {type:"boolean"}
MS_AIGC_FOUDNATION = "FLUX_1" # @param {"type":"string"} ["SD_1_5","SD_XL","SD_3","FLUX_1"]
MS_AIGC_MODEL_TYPE = "Checkpoint" # @param {"type":"string"} ["Checkpoint","LoRA","VAE"]
MS_TAG = "v1.0" # @param {type:"string","placeholder":"MS Repo tag"}

def login_hf(hf_token: str):
  login(token=hf_token)

def list_hf_repo_tree(
        hf_repo_id: str,
        hf_recursive: bool = True,
        hf_repo_type: str = "model",
        hf_revision: str = "main"
    ):
  hf_repo_tree = list_repo_tree(hf_repo_id, recursive=hf_recursive, repo_type=hf_repo_type, revision=hf_revision)
  return list(hf_repo_tree)

def hf_file_download(hf_repo_id: str, hf_path: str, hf_repo_type: str, hf_revision: str,local_dir: str):
  hf_filename = os.path.basename(hf_path)
  hf_subfolder = os.path.dirname(hf_path)
  hf_hub_download(hf_repo_id, hf_filename, subfolder=hf_subfolder, repo_type=hf_repo_type, revision=hf_revision, local_dir=local_dir)

def ms_create_repo(
        ms_token: str,
        ms_repo_id: str,
        ms_repo_private: bool,
    ):
  api = HubApi()
  ms_git_token = (api.login(ms_token))[0]
  print(f'Creating repo "{ms_repo_id}" on ModelScope...')
  try:
    api.get_model(ms_repo_id)
    print(f'"{ms_repo_id}" already exist on ModelScope!')
  except Exception:
    model_visibility = ModelVisibility.PRIVATE if ms_repo_private else ModelVisibility.PUBLIC
    api.create_model(ms_repo_id, license="Other",visibility=model_visibility)
    print(f'"{ms_repo_id}" created on ModelScope')

  return ms_git_token

def ms_git_clone_repo(ms_git_token, ms_repo_id, ms_revision):
  os.chdir(WORKSPACE)
  repo_on_local_path = os.path.join(WORKSPACE, os.path.basename(ms_repo_id))
  print('Disabling Git LFS...')
  os.environ["GIT_LFS_SKIP_SMUDGE"] = "1"
  !git lfs uninstall
  if os.path.exists(repo_on_local_path):
    print(f'"{repo_on_local_path}" already exist, will delete it!!!')
    !rm -rf "{repo_on_local_path}"

  !git clone -b {ms_revision} http://outh2:{ms_git_token}@www.modelscope.cn/{ms_repo_id}.git {repo_on_local_path}
  os.chdir(repo_on_local_path)
  !git switch {ms_revision}
  print('Enabling Git LFS...')
  os.environ["GIT_LFS_SKIP_SMUDGE"] = "0"
  !git lfs install
  return repo_on_local_path

def format_file_size(size_bytes):
  if size_bytes == 0:
    return "0B"
  size_name = ("B", "KB", "MB", "GB", "TB")
  i = int(math.floor(math.log(size_bytes, 1024)))
  p = math.pow(1024, i)
  s = round(size_bytes / p, 2)
  return f"{s} {size_name[i]}"

def get_non_git_lfs_files(repo_on_local):
  hf_repo_tree = list_hf_repo_tree(hf_repo_id=HF_REPO_ID,hf_recursive=True,hf_repo_type=HF_REPO_TYPE,hf_revision=HF_REVISION)
  file_use_git_lfs = []
  total = len(hf_repo_tree)
  totol_non_lfs = 0
  for file in hf_repo_tree:
    if type(file) == RepoFolder:
      path = file.path
      path_on_local = os.path.join(repo_on_local, path)
      os.makedirs(path_on_local, exist_ok=True)
    elif type(file) == RepoFile:
      path = file.path
      size = file.size
      file_on_local = os.path.join(repo_on_local, path)
      if size < 25 * 1024 * 1024:
        totol_non_lfs+=1
        print(f'Processing {totol_non_lfs}/{total}: {path}...')
        if os.path.exists(file_on_local):
          print(f'"{file_on_local}" already exist, will delete it!!!')
          !rm -rf "{file_on_local}"
        hf_file_download(HF_REPO_ID, path, HF_REPO_TYPE, HF_REVISION, repo_on_local)
        if os.path.exists(os.path.join(repo_on_local, ".huggingface")):
          !rm -rf {os.path.join(repo_on_local, ".huggingface")}
        if os.path.exists(os.path.join(repo_on_local, ".cache")):
          !rm -rf {os.path.join(repo_on_local, ".cache")}

      else:
        print(f'"{path}" is {format_file_size(size)},  will add it to Git LFS list.')
        file_use_git_lfs.append(path)
        print(file_use_git_lfs)
  return file_use_git_lfs, total, totol_non_lfs

def upload_git_lfs_files(ms_git_token, file_use_git_lfs, total, totol_non_lfs):
  print(f'Total non Git-LFS files: {totol_non_lfs}.')
  i = totol_non_lfs
  for file in file_use_git_lfs:
    i+=1
    repo_on_local = ms_git_clone_repo(ms_git_token, MS_REPO_ID, MS_REVISION)
    file_on_local = os.path.join(repo_on_local, file)

    if os.path.exists(file_on_local):
      print(f'"{file_on_local}" already exist, will delete it!!!')
      !rm -rf "{file_on_local}"
    print(f'Processing {i}/{total}: {file}...')
    os.chdir(repo_on_local)
    hf_file_download(HF_REPO_ID, file, HF_REPO_TYPE, HF_REVISION, repo_on_local)
    if os.path.exists(os.path.join(repo_on_local, ".huggingface")):
      !rm -rf {os.path.join(repo_on_local, ".huggingface")}
    if os.path.exists(os.path.join(repo_on_local, ".cache")):
      !rm -rf {os.path.join(repo_on_local, ".cache")}
    !git lfs track {file_on_local}
    !git add {file_on_local}
    git_commit_cmd = f'git commit -m "{MS_COMMIT}"'
    !{git_commit_cmd}
    !git push --set-upstream origin {MS_REVISION}
    os.chdir(WORKSPACE)

  total_lfs = len(file_use_git_lfs)
  print(f'Total Git-LFS files: {total_lfs}.')
  print(f'Total files: {total}.')

def civitai_download(repo_on_local):
  def request_get(
    url:str,
    headers:dict | None=None
    ) -> tuple[Literal[True], requests.Response] | tuple[Literal[False], str]:
    """
    Performs a GET request

    returns: tuple(success:bool, response:Response or failure message:str)
    """
    try:
        response = requests.get(
            url,
            stream=True,
            verify=False,
            headers=headers,
        )

    except TimeoutError:
        output = f"GET Request timed out for {url}"
        print(output)
        return False, output

    if not response.ok:
        status_code = response.status_code
        reason = response.reason
        print(
            f"""
            GET Request failed with error code:
            {status_code}: {reason}
            """
        )
        return False, reason

    return True, response

  if "https://civitai.com" in DOWNLOAD_LINK:
    headers = {"Authorization": f"Bearer {CIVITAI_TOKEN}"}
    success, response = request_get(DOWNLOAD_LINK, headers=headers)
    if not success:
      print(f'Download failed: {response}')
      return None
    else:
      if DOWNLOAD_FILENAME:
        filename = DOWNLOAD_FILENAME
      else:
        filename = response.headers.get("Content-Disposition").split('"')[1]
      print(f'Downloading {filename} from CIVITAI...')
      !aria2c --console-log-level=error --summary-interval=10 -c -x 8 -k 2M -s 8 -d {repo_on_local} -o {filename} "{DOWNLOAD_LINK}&token={CIVITAI_TOKEN}"
      print(f'{filename} from CIVITAI downloaded.')
    return filename
  elif "liblib" in DOWNLOAD_LINK:
    success, response = request_get(DOWNLOAD_LINK)
    if not success:
      print(f'Download failed: {response}')
      return None
    else:
      if DOWNLOAD_FILENAME:
        filename = DOWNLOAD_FILENAME
      else:
        filename = response.headers.get("Content-Disposition").split('=')[1]
      print(f'Downloading {filename} from LibLib...')
      !aria2c --console-log-level=error --summary-interval=10 -c -x 8 -k 2M -s 8 -d {repo_on_local} -o {filename} "{DOWNLOAD_LINK}"
      print(f'{filename} from LibLib downloaded.')
      return filename
  else:
    if DOWNLOAD_FILENAME:
      filename = DOWNLOAD_FILENAME
    else:
      filename = os.path.basename(DOWNLOAD_LINK)
    !aria2c --console-log-level=error --summary-interval=10 -c -x 8 -k 2M -s 8 -d {repo_on_local} -o {filename} "{DOWNLOAD_LINK}"
    return filename

def get_base_model(model_type):
  if model_type == "SD_1_5":
    return "AI-ModelScope/stable-diffusion-v1-5"
  elif model_type == "SD_XL":
    return "stabilityai/stable-diffusion-xl-base-1.0"
  elif model_type == "SD_3":
    return "stabilityai/stable-diffusion-3.5-medium"
  elif model_type == "FLUX_1":
    return "black-forest-labs/FLUX.1-dev"

def main():
  os.chdir(WORKSPACE)
  !git config --global user.email "you@example.com"
  !git config --global user.name "Your Name"
  ms_git_token = ms_create_repo(MS_TOKEN, MS_REPO_ID, MS_REPO_PRIVATE)
  repo_on_local = ms_git_clone_repo(ms_git_token, MS_REPO_ID, MS_REVISION)

  if WORK_MODE != "Download single file from CivitAI to Modelscope":
    login_hf(HF_TOKEN) if HF_TOKEN != "" and len(HF_TOKEN) >= 30 else print("HF_TOKEN Invalid, trying with out it")
    if WORK_MODE == "Fork repo from Huggingface to Modelscope":
      file_use_git_lfs, total, totol_non_lfs = get_non_git_lfs_files(repo_on_local)
    elif WORK_MODE == "Fork single file from Huggingface to Modelscope":
      hf_hub_download(HF_REPO_ID, HF_FILENAME, subfolder=HF_SUBFOLDER if HF_SUBFOLDER else None, repo_type=HF_REPO_TYPE, revision=HF_REVISION, local_dir=repo_on_local)
      model_name=HF_FILENAME
  else:
    os.chdir(repo_on_local)
    !ls -lha | grep safetensors
    !rm *.safetensors
    !ls -lha | grep safetensors
    model_name = civitai_download(repo_on_local)

  os.chdir(repo_on_local)
  !git add -A .
  git_commit_cmd = f'git commit -m "{MS_COMMIT}"'
  !{git_commit_cmd}
  if MS_UPLOAD_AIGC_MODEL and WORK_MODE != "Fork repo from Huggingface to Modelscope":
    aigc_model_config = {"aigc_model":True,"framework":"Pytorch","model_file_location":f"{model_name}"}
    with open(os.path.join(repo_on_local,"configuration.json"), "w") as j:
      json.dump(aigc_model_config, j)
    readme_content = f"""
    ---
    base_model: {get_base_model(MS_AIGC_FOUDNATION)}
    frameworks:
    - Pytorch
    tasks:
    - text-to-image-synthesis
    license: other
    tags:
    - {MS_AIGC_MODEL_TYPE}
    - text-to-image
    vision_foundation: {MS_AIGC_FOUDNATION}
    ---
    """
    with open(os.path.join(repo_on_local,"README.md"), "w") as f:
      f.write(readme_content)
    !git add -A .
    !{git_commit_cmd}
    !git push --set-upstream origin {MS_REVISION}
    print(f'Repo "{MS_REPO_ID}" pushed to remote server.')
    print(f'Creating tag "{MS_TAG}"...')
    !git tag {MS_TAG}
    print(f'Pushing tag "{MS_TAG}"...')
    !git push --set-upstream origin {MS_TAG}
    print(f'Tag "{MS_TAG}" pushed to remote server.')
  !git push --set-upstream origin {MS_REVISION}
  print(f'Repo "{MS_REPO_ID}" pushed to remote server.')

  if WORK_MODE == "Fork repo from Huggingface to Modelscope":
    upload_git_lfs_files(ms_git_token, file_use_git_lfs, total, totol_non_lfs)

main()