In [1]:
"""
GitHub Utilities for Google Colab
--------------------------------
This module provides secure Git/GitHub operations in Google Colab notebooks.
It handles authentication securely and provides common Git operations.

Usage:
    from github_utils import GitHubManager
    
    # Initialize with credentials (multiple ways)
    git_manager = GitHubManager()
    
    # Clone a repository
    git_manager.clone_repo("your_repo_name")
    
    # Perform Git operations
    git_manager.pull()
    git_manager.add_all()
    git_manager.commit("Update from Colab")
    git_manager.push()
"""

'\nGitHub Utilities for Google Colab\n--------------------------------\nThis module provides secure Git/GitHub operations in Google Colab notebooks.\nIt handles authentication securely and provides common Git operations.\n\nUsage:\n    from github_utils import GitHubManager\n    \n    # Initialize with credentials (multiple ways)\n    git_manager = GitHubManager()\n    \n    # Clone a repository\n    git_manager.clone_repo("your_repo_name")\n    \n    # Perform Git operations\n    git_manager.pull()\n    git_manager.add_all()\n    git_manager.commit("Update from Colab")\n    git_manager.push()\n'

In [None]:
import os
import sys
import getpass
import subprocess
from pathlib import Path
from typing import List, Optional, Dict, Union, Tuple


In [None]:
# Check if running in Google Colab
try:
    from google.colab import userdata
    IN_COLAB = True
except ImportError:
    IN_COLAB = False

In [None]:
class GitHubManager:
    """
    A class to manage Git/GitHub operations securely in Google Colab.
    """
    
    def __init__(self, 
                username: Optional[str] = None, 
                email: Optional[str] = None,
                token: Optional[str] = None,
                repo_name: Optional[str] = None,
                use_colab_secrets: bool = True):
        """
        Initialize GitHub manager with optional credentials.
        
        Args:
            username: GitHub username (optional if using Colab secrets)
            email: GitHub email (optional if using Colab secrets)
            token: GitHub personal access token (optional if using Colab secrets)
            repo_name: Repository name (optional, can be set later)
            use_colab_secrets: Whether to attempt retrieving credentials from Colab secrets
        """
        self.username = username
        self.email = email
        self.token = token
        self.repo_name = repo_name
        self.repo_path = None
        self.original_dir = os.getcwd()
        
        # Try to get credentials from Colab secrets if requested
        if use_colab_secrets and IN_COLAB:
            self._get_credentials_from_colab_secrets()
        
        # If credentials are still missing, prompt for them
        if not all([self.username, self.token]):
            self._prompt_for_credentials()
        
        # Configure Git with the credentials
        self._configure_git()
    
    def _get_credentials_from_colab_secrets(self) -> None:
        """
        Attempt to retrieve credentials from Colab secrets.
        """
        try:
            if not self.username:
                self.username = userdata.get('GITHUB_USERNAME')
            if not self.email:
                self.email = userdata.get('GITHUB_EMAIL', None)
            if not self.token:
                self.token = userdata.get('GITHUB_TOKEN')
            if not self.repo_name:
                self.repo_name = userdata.get('GITHUB_REPO', None)
            
            print("Successfully retrieved credentials from Colab secrets")
        except Exception as e:
            print(f"Could not retrieve all credentials from Colab secrets: {e}")
    
    def _prompt_for_credentials(self) -> None:
        """
        Prompt user for missing credentials using getpass for security.
        """
        if not self.username:
            self.username = getpass.getpass("Enter your GitHub username: ")
        if not self.email:
            self.email = getpass.getpass("Enter your GitHub email: ")
        if not self.token:
            self.token = getpass.getpass("Enter your GitHub personal access token: ")
        if not self.repo_name:
            self.repo_name = input("Enter repository name (or press Enter to skip): ")
            if not self.repo_name:
                self.repo_name = None
    
    def _configure_git(self) -> None:
        """
        Configure Git with the provided credentials.
        """
        # Set username
        self._run_command(['git', 'config', '--global', 'user.name', self.username])
        
        # Set email if provided
        if self.email:
            self._run_command(['git', 'config', '--global', 'user.email', self.email])
        
        # Set credentials helper to cache credentials temporarily
        self._run_command(['git', 'config', '--global', 'credential.helper', 'cache'])
        
        print("Git configured successfully")
    
    def _run_command(self, command: List[str], check: bool = True) -> Tuple[int, str, str]:
        """
        Run a shell command and return its output.
        
        Args:
            command: List of command and arguments
            check: Whether to raise an exception if the command fails
            
        Returns:
            Tuple of (return_code, stdout, stderr)
        """
        process = subprocess.Popen(
            command,
            stdout=subprocess.PIPE,
            stderr=subprocess.PIPE,
            text=True
        )
        stdout, stderr = process.communicate()
        
        if check and process.returncode != 0:
            print(f"Command failed: {' '.join(command)}")
            print(f"Error: {stderr.strip()}")
            raise subprocess.CalledProcessError(process.returncode, command, stdout, stderr)
        
        return process.returncode, stdout, stderr
    
    def _enter_repo_directory(self) -> None:
        """
        Change to the repository directory if it exists.
        """
        if not self.repo_path:
            raise ValueError("Repository path not set. Clone or set a repository first.")
        
        if not os.path.exists(self.repo_path):
            raise FileNotFoundError(f"Repository directory '{self.repo_path}' not found.")
        
        os.chdir(self.repo_path)
    
    def _exit_repo_directory(self) -> None:
        """
        Return to the original directory.
        """
        os.chdir(self.original_dir)
    
    def set_repo(self, repo_name: str, repo_path: Optional[str] = None) -> None:
        """
        Set the repository name and path.
        
        Args:
            repo_name: Name of the repository
            repo_path: Path to the repository (defaults to repo_name)
        """
        self.repo_name = repo_name
        self.repo_path = repo_path or os.path.join(os.getcwd(), repo_name)
        print(f"Repository set to: {self.repo_name} at {self.repo_path}")
    
    def clone_repo(self, repo_name: Optional[str] = None, path: Optional[str] = None) -> None:
        """
        Clone a GitHub repository.
        
        Args:
            repo_name: Name of the repository to clone (if different from initialized)
            path: Path where to clone the repository (defaults to current directory)
        """
        if repo_name:
            self.repo_name = repo_name
        
        if not self.repo_name:
            raise ValueError("Repository name not provided")
        
        # Set repository path
        self.repo_path = path or os.path.join(os.getcwd(), self.repo_name)
        
        # Clone repository using token
        repo_url = f"https://{self.token}@github.com/{self.username}/{self.repo_name}.git"
        
        # Check if the directory already exists
        if os.path.exists(self.repo_path):
            print(f"Repository directory '{self.repo_path}' already exists.")
            choice = input("Do you want to pull the latest changes instead? (y/n): ")
            if choice.lower() == 'y':
                self._enter_repo_directory()
                self.pull()
                self._exit_repo_directory()
                return
            else:
                raise FileExistsError(f"Repository directory '{self.repo_path}' already exists.")
        
        print(f"Cloning repository '{self.repo_name}' to '{self.repo_path}'...")
        self._run_command(['git', 'clone', repo_url, self.repo_path])
        print(f"Repository cloned successfully")
    
    def status(self) -> str:
        """
        Get the status of the Git repository.
        
        Returns:
            Git status output
        """
        try:
            self._enter_repo_directory()
            _, stdout, _ = self._run_command(['git', 'status'])
            return stdout
        finally:
            self._exit_repo_directory()
    
    def add_all(self) -> None:
        """
        Add all changes and untracked files to the Git staging area.
        """
        try:
            self._enter_repo_directory()
            
            # First get status to see if there are untracked files
            _, status_output, _ = self._run_command(['git', 'status', '--porcelain'], check=False)
            
            # Check if there are any untracked files (lines starting with '??')
            untracked_files = [line[3:] for line in status_output.split('\n') if line.startswith('?? ')]
            
            if untracked_files:
                print(f"Found {len(untracked_files)} untracked files.")
                
                # Add all untracked files explicitly
                for file in untracked_files:
                    self._run_command(['git', 'add', file])
                    print(f"Added untracked file: {file}")
            
            # Also add all other changes (modified files)
            self._run_command(['git', 'add', '.'])
            
            # Double-check what's staged now
            _, staged_diff, _ = self._run_command(['git', 'diff', '--name-only', '--staged'], check=False)
            staged_files = [f for f in staged_diff.strip().split('\n') if f]
            
            if staged_files:
                print(f"Successfully staged {len(staged_files)} files for commit:")
                for file in staged_files:
                    print(f"  - {file}")
            else:
                print("No changes were staged. Make sure files exist and have changes.")
            
        finally:
            self._exit_repo_directory()
    
    def add_all(self) -> None:
        """
        Add all changes to the Git staging area.
        """
        try:
            self._enter_repo_directory()
            self._run_command(['git', 'add', '.'])
            print("Added all changes to staging area")
        finally:
            self._exit_repo_directory()
    
    def commit(self, message: str) -> None:
        """
        Commit staged changes.
        
        Args:
            message: Commit message
        """
        try:
            self._enter_repo_directory()
            self._run_command(['git', 'commit', '-m', message])
            print(f"Changes committed with message: '{message}'")
        finally:
            self._exit_repo_directory()
    
    def pull(self, branch: str = 'main') -> str:
        """
        Pull the latest changes from the remote repository.
        
        Args:
            branch: Branch to pull from (default: main)
            
        Returns:
            Pull output
        """
        try:
            self._enter_repo_directory()
            _, stdout, _ = self._run_command(['git', 'pull', 'origin', branch])
            print(f"Pulled latest changes from {branch}")
            return stdout
        finally:
            self._exit_repo_directory()
    
    def push(self, branch: str = 'main', force: bool = False) -> str:
        """
        Push local commits to the remote repository.
        
        Args:
            branch: Branch to push to (default: main)
            force: Whether to force push (use with caution)
            
        Returns:
            Push output
        """
        try:
            self._enter_repo_directory()
            command = ['git', 'push', 'origin', branch]
            if force:
                command.append('--force')
            
            _, stdout, _ = self._run_command(command)
            print(f"Pushed changes to {branch}")
            return stdout
        finally:
            self._exit_repo_directory()
    
    def create_branch(self, branch_name: str) -> None:
        """
        Create a new branch and switch to it.
        
        Args:
            branch_name: Name of the new branch
        """
        try:
            self._enter_repo_directory()
            self._run_command(['git', 'checkout', '-b', branch_name])
            print(f"Created and switched to new branch: {branch_name}")
        finally:
            self._exit_repo_directory()
    
    def checkout(self, branch_name: str) -> None:
        """
        Switch to an existing branch.
        
        Args:
            branch_name: Name of the branch to switch to
        """
        try:
            self._enter_repo_directory()
            self._run_command(['git', 'checkout', branch_name])
            print(f"Switched to branch: {branch_name}")
        finally:
            self._exit_repo_directory()
    
    def list_branches(self) -> List[str]:
        """
        List all branches in the repository.
        
        Returns:
            List of branch names
        """
        try:
            self._enter_repo_directory()
            _, stdout, _ = self._run_command(['git', 'branch'])
            branches = [b.strip() for b in stdout.strip().split('\n')]
            # Remove the asterisk from the current branch
            branches = [b[2:] if b.startswith('* ') else b for b in branches]
            return branches
        finally:
            self._exit_repo_directory()
    
    def get_current_branch(self) -> str:
        """
        Get the name of the current branch.
        
        Returns:
            Current branch name
        """
        try:
            self._enter_repo_directory()
            _, stdout, _ = self._run_command(
                ['git', 'rev-parse', '--abbrev-ref', 'HEAD']
            )
            return stdout.strip()
        finally:
            self._exit_repo_directory()
    
    def discard_changes(self, files: Optional[Union[str, List[str]]] = None) -> None:
        """
        Discard all local changes.
        
        Args:
            files: Specific file(s) to discard changes for (None for all)
        """
        try:
            self._enter_repo_directory()
            if files:
                if isinstance(files, str):
                    files = [files]
                for file in files:
                    self._run_command(['git', 'checkout', '--', file])
                print(f"Discarded changes in {len(files)} file(s)")
            else:
                self._run_command(['git', 'reset', '--hard'])
                print("Discarded all local changes")
        finally:
            self._exit_repo_directory()
    
    def stash(self, message: Optional[str] = None) -> None:
        """
        Stash local changes.
        
        Args:
            message: Optional message for the stash
        """
        try:
            self._enter_repo_directory()
            if message:
                self._run_command(['git', 'stash', 'save', message])
                print(f"Stashed changes with message: '{message}'")
            else:
                self._run_command(['git', 'stash'])
                print("Stashed changes")
        finally:
            self._exit_repo_directory()
    
    def stash_pop(self) -> None:
        """
        Apply and remove the latest stash.
        """
        try:
            self._enter_repo_directory()
            self._run_command(['git', 'stash', 'pop'])
            print("Applied and removed the latest stash")
        finally:
            self._exit_repo_directory()
    
    def list_stashes(self) -> List[str]:
        """
        List all stashes.
        
        Returns:
            List of stash descriptions
        """
        try:
            self._enter_repo_directory()
            _, stdout, _ = self._run_command(['git', 'stash', 'list'])
            return [s.strip() for s in stdout.strip().split('\n') if s]
        finally:
            self._exit_repo_directory()
    
    def show_diff(self, staged: bool = False) -> str:
        """
        Show the diff of unstaged or staged changes.
        
        Args:
            staged: Whether to show staged changes (default: False)
            
        Returns:
            Diff output
        """
        try:
            self._enter_repo_directory()
            command = ['git', 'diff']
            if staged:
                command.append('--staged')
            
            _, stdout, _ = self._run_command(command)
            return stdout
        finally:
            self._exit_repo_directory()
    
    def log(self, n: int = 10) -> str:
        """
        Show the commit log.
        
        Args:
            n: Number of commits to show (default: 10)
            
        Returns:
            Log output
        """
        try:
            self._enter_repo_directory()
            _, stdout, _ = self._run_command(['git', 'log', f'-{n}', '--oneline'])
            return stdout
        finally:
            self._exit_repo_directory()