# File Extractor - Web Scraping Version
## Extract relevant source code files directly from GitHub URLs

This notebook web scrapes source code directly from GitHub repositories using the URLs generated by the scraper, without cloning repositories.

In [1]:
import requests
import json
import time
import os
from bs4 import BeautifulSoup

In [2]:
# CONFIGURATION
REPOS_FILE = "../../data/repo_details/repository_list_scrap_list.json"
OUTPUT_DIR = "../../data/extracted_files"
MAX_FILES_PER_REPO = 50
os.makedirs(OUTPUT_DIR, exist_ok=True)

session = requests.Session()
session.headers.update({
    'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36',
    'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8',
})

request_count = 0
last_request_time = 0

In [3]:
def make_request(url):
    global request_count, last_request_time
    
    current_time = time.time()
    if current_time - last_request_time < 1.0:
        time.sleep(1.0 - (current_time - last_request_time))
        
    try:
        response = session.get(url, timeout=10)
        request_count += 1
        last_request_time = time.time()
        
        if response.status_code == 200:
            return response
        elif response.status_code == 429:
            print("Rate limited. Waiting 30 seconds...")
            time.sleep(30)
            return make_request(url)
        else:
            return None 
        
    except Exception as e:
        print(f"Request error: {e}")
        return None

In [4]:
def detect_language_from_path(file_path):
    file_path_lower = file_path.lower()
    
    source_extensions = {
        '.py': 'python',
        '.java': 'java', 
        '.cpp': 'cpp', '.cc': 'cpp', '.cxx': 'cpp', '.hpp': 'cpp',
    }
    
    for ext, lang in source_extensions.items():
        if file_path_lower.endswith(ext):
            return lang
    return None

In [5]:
def get_default_branch(repo_url):
    response = make_request(repo_url)
    
    if response:
        soup = BeautifulSoup(response.content, 'html.parser')
        branch_select = soup.find('details', {'data-test-selector': 'branch-select-menu'})
        if branch_select:
            branch_name = branch_select.find('span', class_='css-truncate-target')
            if branch_name:
                return branch_name.text.strip()
    
    for branch in ['main', 'master']:
        test_url = f"{repo_url}/tree/{branch}"
        if make_request(test_url):
            return branch
    
    return 'main'

In [6]:
def get_repository_tree(repo_url, branch, path_in_repo="", all_files=None, depth=0, max_depth=10):
    if all_files is None:
        all_files = []
    
    if depth > max_depth:
        return all_files
    
    if path_in_repo:
        url = f"{repo_url}/tree/{branch}/{path_in_repo}"
    else:
        url = f"{repo_url}/tree/{branch}"
        
    response = make_request(url)
    if not response:
        return all_files
    
    soup = BeautifulSoup(response.content, 'html.parser')
    
    file_containers = [
        soup.find('div', {'role': 'grid'}),
        soup.find('table'),
        soup.find('div', class_='Box'),
        soup.find('div', class_='js-navigation-container'),
    ]
    
    file_container = next((container for container in file_containers if container is not None), None)
    
    if not file_container:
        links = soup.find_all('a', href=True)
    else:
        links = file_container.find_all('a', href=True)
    
    directories_to_scan = []
    
    for link in links:
        href = link['href']
        text = link.text.strip()
        
        if not text or text == '..':
            continue
        
        if any(skip in href for skip in ['/commits/', '/blame/', '/graphs/']):
            continue
        
        # Directory detection
        if f'/tree/{branch}' in href and not href.endswith(f'/tree/{branch}'):
            path_parts = href.split(f'/tree/{branch}/')
            if len(path_parts) > 1:
                dir_path = path_parts[1]
                if dir_path and dir_path not in directories_to_scan:
                    directories_to_scan.append(dir_path)
        
        # File detection
        elif f'/blob/{branch}' in href:
            path_parts = href.split(f'/blob/{branch}/')
            if len(path_parts) > 1:
                file_path = path_parts[1]
                if file_path and file_path not in all_files:
                    all_files.append(file_path)
    
    # Recursively scan directories
    for dir_path in directories_to_scan:
        skip_dirs = ['.git', 'node_modules', '__pycache__', 'build', 'dist', 'target', 'bin', 'obj', 'vendor']
        if any(skip_dir in dir_path.split('/') for skip_dir in skip_dirs):
            continue
            
        get_repository_tree(repo_url, branch, dir_path, all_files, depth + 1, max_depth)
        time.sleep(0.5)
    
    return all_files

In [7]:
def download_file_content(repo_url, file_path, branch):
    # Raw URL - primary method
    raw_url = repo_url.replace('https://github.com/', 'https://raw.githubusercontent.com/')
    raw_url = f"{raw_url}/{branch}/{file_path}"
    
    response = make_request(raw_url)
    if response and response.status_code == 200:
        content = response.text
        if content and content.strip():
            return content, True
    
    return "", False

In [8]:
def process_repository(repo_data, max_files=MAX_FILES_PER_REPO):
    full_name = repo_data['full_name']
    repo_url = repo_data['clone_url'].replace('.git', '')
    primary_language = repo_data['language']
    
    print(f"\nProcessing: {full_name} ({primary_language})")
    
    branch = get_default_branch(repo_url)
    print(f"   Branch: {branch}")
    
    all_files = get_repository_tree(repo_url, branch)
    print(f"   Found {len(all_files)} total files")
    
    if not all_files:
        return []
    
    # Filter for source code files
    source_files = []
    for file_path in all_files:
        if any(skip_dir in file_path for skip_dir in [
            '.git/', 'node_modules/', '__pycache__/', 'build/', 'dist/', 
            'target/', 'bin/', 'obj/', 'vendor/', '.github/'
        ]):
            continue
        
        language = detect_language_from_path(file_path)
        if language:
            source_files.append({
                'path': file_path,
                'language': language
            })
    
    print(f"   Identified {len(source_files)} source code files")
    
    if not source_files:
        return []
    
    # Prioritize and limit files if needed
    if len(source_files) > max_files:
        print(f"   Limiting to {max_files} files")
        
        def priority_score(file_path):
            path_lower = file_path.lower()
            score = 0
            if any(src_dir in path_lower for src_dir in ['src/', 'lib/', 'app/', 'source/', 'main/']):
                score += 3
            elif '/' not in file_path or file_path.count('/') == 0:
                score += 1
            if 'test' in path_lower:
                score -= 1
            return score
        
        source_files.sort(key=lambda x: (priority_score(x['path']), -len(x['path'])), reverse=True)
        source_files = source_files[:max_files]
    
    # Download file contents
    extracted_files = []
    print(f"   Downloading content...")
    
    for i, file_info in enumerate(source_files, 1):
        file_path = file_info['path']
        
        if i % 5 == 0:
            print(f"   Progress: {i}/{len(source_files)}")
        
        content, success = download_file_content(repo_url, file_path, branch)
        
        if success and content and 10 <= len(content) <= 100000:
            extracted_files.append({
                'repo_name': full_name,
                'repo_url': repo_url,
                'repo_language': primary_language,
                'file_path': file_path,
                'file_language': file_info['language'],
                'content': content,
                'size': len(content),
                'lines': len(content.splitlines())
            })
            print(f"   ✅ {file_path}")
        else:
            print(f"   ❌ {file_path}")
        
        time.sleep(1.0)
    
    print(f"   Successfully extracted {len(extracted_files)} files")
    return extracted_files

In [9]:
def save_results(all_files, output_dir):
    output_file = os.path.join(output_dir, "all_extracted_files.json")
    with open(output_file, 'w', encoding='utf-8') as f:
        json.dump(all_files, f, indent=2, ensure_ascii=False)
    
    # Group files by repository
    files_by_repo = {}
    for file_data in all_files:
        repo_name = file_data['repo_name']
        files_by_repo.setdefault(repo_name, []).append(file_data)
    
    # Save files grouped by repository
    repo_files_dir = os.path.join(output_dir, "by_repo")
    os.makedirs(repo_files_dir, exist_ok=True)
    
    for repo_name, files in files_by_repo.items():
        safe_name = repo_name.replace('/', '_')
        repo_file = os.path.join(repo_files_dir, f"{safe_name}_files.json")
        with open(repo_file, 'w', encoding='utf-8') as f:
            json.dump(files, f, indent=2, ensure_ascii=False)
    
    # Create summary
    summary = {
        'total_files': len(all_files),
        'total_repos': len(files_by_repo),
        'total_requests': request_count,
        'files_per_repo': {repo: len(files) for repo, files in files_by_repo.items()},
        'file_languages': {}
    }
    
    for file_data in all_files:
        summary['file_languages'][file_data['file_language']] = summary['file_languages'].get(file_data['file_language'], 0) + 1
    
    summary_file = os.path.join(output_dir, "extraction_summary.json")
    with open(summary_file, 'w', encoding='utf-8') as f:
        json.dump(summary, f, indent=2, ensure_ascii=False)
    
    print(f"All files saved to {output_dir}")
    return summary

In [10]:
# Load repository data
if not os.path.exists(REPOS_FILE):
    print(f"Repository file not found: {REPOS_FILE}")
    exit()

with open(REPOS_FILE, 'r', encoding='utf-8') as f:
    repos_data = json.load(f)

print(f"Loaded {len(repos_data)} repositories")
print(f"Starting extraction...")

# Process all repositories
all_extracted_files = []
successful_repos = 0

for i, repo in enumerate(repos_data, 1):
    print(f"\n{'='*50}")
    print(f"[{i}/{len(repos_data)}] {repo['full_name']}")
    
    files = process_repository(repo)
    
    if files:
        all_extracted_files.extend(files)
        successful_repos += 1
        print(f"✅ Successfully processed {repo['full_name']} - {len(files)} files")
    else:
        print(f"❌ Failed to process {repo['full_name']}")
    
    if i < len(repos_data):
        time.sleep(3)

# Save and display results
if all_extracted_files:
    summary = save_results(all_extracted_files, OUTPUT_DIR)
    
    print(f"\nEXTRACTION COMPLETE!")
    print(f"Successfully processed {successful_repos}/{len(repos_data)} repositories")
    print(f"Extracted {len(all_extracted_files)} total files")
    print(f"Made {request_count} web requests")
    
    print(f"\nFile Language Distribution:")
    for lang, count in sorted(summary['file_languages'].items(), key=lambda x: x[1], reverse=True)[:10]:
        percentage = (count / summary['total_files']) * 100
        print(f"   {lang}: {count} files ({percentage:.1f}%)")
        
else:
    print("No files were extracted")

Loaded 999 repositories
Starting extraction...

[1/999] Cangol/AndroidStackBlur

Processing: Cangol/AndroidStackBlur (Java)
   Branch: master


KeyboardInterrupt: 