In [25]:
import subprocess
import os 
from pathlib import Path
import csv

In [34]:

def get_all_txt_files(folder_dir: Path, dir_name: str):
    txt_format = ["txt"]  # Only looking for RTF files

    txt_files = []
    
    # Use folder_dir instead of dir_name for Path search
    for file_format in txt_format:
        txt_files.extend(folder_dir.rglob(f"*.{file_format}"))

    print(f"Found {len(txt_files)} txt files.")  # Debugging line
    csv_file_path = f"{dir_name}_renamed_txt_files.csv"
    return rename_txt_files(txt_files, csv_file_path)

    
def rename_txt_files(file_paths: [Path], csv_file_path: str):
    renamed_files = []
    with open(csv_file_path, 'w', newline='') as csvfile:
        csvwriter = csv.writer(csvfile)
        csvwriter.writerow(['Old Filename', 'New Filename'])

        for file_path in file_paths:
            # Extracting the pecha_id and the original file name without its extension
            parts = file_path.parts
            # Assuming the first significant folder after the root is the pecha_id
            pecha_id = parts[3] if len(parts) > 1 else ""
            original_file_name = file_path.stem

            # Constructing new file name based on pecha_id and the original file name, ensuring .txt extension
            new_file_name = f"{pecha_id}_{original_file_name}.txt"
            new_file_path = Path(new_file_name)  # Moving the file up one directory

            # Renaming the file
            file_path.rename(new_file_path)
            renamed_files.append(new_file_path)

            # Writing the old and new file paths to the CSV
            csvwriter.writerow([parts[3:], str(new_file_path)])

    print(f"Renamed {len(renamed_files)} files.")  # Debugging line
    return renamed_files


In [35]:
folder_dir = Path("../../pecha_data")
get_all_txt_files(folder_dir, "pecha_data")

Found 0 txt files.
Renamed 0 files.


[]

In [None]:
import shutil
from pathlib import Path

def move_file(file_path: Path, destination_folder: Path):
    # Ensure the destination folder exists, create if not
    destination_folder.mkdir(parents=True, exist_ok=True)
    # Construct the new path for the file in the destination folder
    new_file_path = destination_folder / file_path.name
    # Move the file
    shutil.move(str(file_path), str(new_file_path))
    print(f"Moved {file_path} to {new_file_path}")


In [None]:
output_txt = Path("../../pecha_txt")
for file_path in file_paths:
    move_file(file_path, output_txt)

In [30]:
PECHA_CORRUPTED_FILES = Path("../../pecha_corrupted_files.txt")
if not PECHA_CORRUPTED_FILES.exists():
    PECHA_CORRUPTED_FILES.touch()

def save_corrupted_pecha(file_path:str):
    with open(PECHA_CORRUPTED_FILES, "a") as f:
        f.write(f"{file_path}\n")

"""check point system"""

PECHA_CLONED_CHECKPOINT = Path("../../pecha_checkpoint.txt")

def load_checkpoints():
    if PECHA_CLONED_CHECKPOINT.exists():
        return PECHA_CLONED_CHECKPOINT.read_text().splitlines()

    PECHA_CLONED_CHECKPOINT.touch()
    return []

def save_checkpoint(file_checkpoint:str):
    with open(PECHA_CLONED_CHECKPOINT, "a") as f:
        f.write(f"{file_checkpoint}\n")

In [3]:
def clone_github_repo(repo_name: str, destination_folder: Path):
    # Retrieve GitHub token and organization name from environment variables
    token = os.getenv('GITHUB_TOKEN')
    org_name = os.getenv('GITHUB_ORG')
    
    if not token or not org_name:
        print("[ERROR]: GitHub token or organization name not found in environment variables.")
        return
        
     if destination_folder.exists() and list(destination_folder.rglob("*")):
        print(
            f"[INFO]: Destination folder {destination_folder} already exists and is not empty."
        )
    
    try:
        # Construct the URL with authentication token
        repo_url = f"https://{token}@github.com/{org_name}/{repo_name}.git"
        
        # Run the git clone command
        result = subprocess.run(
            ["git", "clone", repo_url, str(destination_folder)],
            check=True,
            capture_output=True,
            text=True,  # Ensure output is in text format, not bytes
        )
        print(f"[SUCCESS]: Repository {repo_name} cloned successfully to {destination_folder}.")
        save_checkpoint(str(repo_name))

    except subprocess.CalledProcessError as e:
        error_message = e.stderr  # Capture the standard error output
        print(f"[ERROR]: Error cloning {repo_name} repository: {error_message}")
        save_corrupted_pecha(f"{repo_name}-{error_message}")


In [4]:
from multiprocessing import Pool
from typing import List
from tqdm import tqdm

In [5]:
def worker_task(args):
    pecha_id, output_dir, checkpoints = args
    if f"{str(pecha_id)}" in checkpoints:
            return
    clone_github_repo(pecha_id, output_dir)    

def clone_all_git_repo(all_pecha_ids: List[Path], output_dir:Path):
    checkpoints = load_checkpoints()
    tasks = [(pecha_id, Path(f"{output_dir}/{pecha_id}"), checkpoints) for pecha_id in all_pecha_ids]

    num_processes = 5
    with Pool(processes=num_processes) as pool:
        list(tqdm(pool.imap(worker_task,tasks), total = len(tasks),  desc="Cloning git repo"))

In [6]:
all_pecha_ids = ["OD1DF94FB",
                "IB6A68EB5",
                "O073CD7B3",
                "I32148D7C",
                "OCECD22A6",
                "I0E29A756",
                "OAA57DE4B"]
output_dir = Path("../../pecha_data")

In [7]:
clone_all_git_repo(all_pecha_ids, output_dir)

Cloning git repo: 100%|███████████████████████████████████████████████████████████████████████████████████████████████████████████████| 7/7 [00:00<00:00, 56461.78it/s]
