In [None]:
!pip install requests

In [10]:
import json
import requests
import os
import re
from urllib.parse import urlparse, unquote
from concurrent.futures import ThreadPoolExecutor
from pathlib import Path

def is_valid_url(url):
    """Check if a string is a valid URL."""
    try:
        result = urlparse(url)
        return all([result.scheme, result.netloc])
    except:
        return False

def extract_urls_from_text(text):
    """Extract URLs from text content using regex."""
    # URL pattern that matches most common URL formats
    url_pattern = r'http[s]?://(?:[a-zA-Z]|[0-9]|[$-_@.&+]|[!*\\(\\),]|(?:%[0-9a-fA-F][0-9a-fA-F]))+'
    urls = re.findall(url_pattern, text)
    return {url for url in urls if is_valid_url(url)}

def extract_urls(content, file_type):
    """Extract all URLs from content based on file type."""
    urls = set()
    
    if file_type == 'json':
        def extract_json(obj):
            if isinstance(obj, dict):
                for value in obj.values():
                    extract_json(value)
            elif isinstance(obj, list):
                for item in obj:
                    extract_json(item)
            elif isinstance(obj, str):
                urls.update(extract_urls_from_text(obj))
        
        extract_json(content)
    else:  # text file
        urls.update(extract_urls_from_text(content))
    
    return urls

def download_file(url, output_dir):
    """Download a single file from URL."""
    try:
        response = requests.get(url, stream=True, timeout=30)
        response.raise_for_status()
        
        # Decode URL-encoded filename and sanitize it
        filename = unquote(os.path.basename(urlparse(url).path))
        if not filename:
            # Generate filename from URL if path is empty
            filename = re.sub(r'[^a-zA-Z0-9.]', '_', urlparse(url).netloc)
        
        # Add extension based on Content-Type if filename has no extension
        if '.' not in filename:
            content_type = response.headers.get('content-type', '').split(';')[0]
            common_extensions = {
                'image/jpeg': '.jpg',
                'image/png': '.png',
                'video/mp4': '.mp4',
                'application/json': '.json',
                'text/plain': '.txt'
            }
            extension = common_extensions.get(content_type, '.dat')
            filename += extension
        
        full_path = os.path.join(output_dir, filename)
        
        # Create directory if it doesn't exist
        os.makedirs(output_dir, exist_ok=True)
        
        # Download file with progress tracking
        file_size = int(response.headers.get('content-length', 0))
        
        with open(full_path, 'wb') as f:
            if file_size == 0:
                f.write(response.content)
            else:
                downloaded = 0
                for chunk in response.iter_content(chunk_size=8192):
                    if chunk:
                        f.write(chunk)
                        downloaded += len(chunk)
                        
        print(f"Downloaded: {filename} ({file_size} bytes)")
        return True
    except Exception as e:
        print(f"Error downloading {url}: {str(e)}")
        return False

def process_files(input_path, output_dir, max_workers=5):
    """Process files and download referenced files. Can handle both single files and directories."""
    all_urls = set()
    supported_extensions = {'.json', '.txt', '.md', '.csv'}
    input_path = Path(input_path)
    
    def process_single_file(file_path):
        """Helper function to process a single file."""
        try:
            with open(file_path, 'r', encoding='utf-8') as f:
                content = f.read()
                if file_path.suffix.lower() == '.json':
                    try:
                        content = json.loads(content)
                        return extract_urls(content, 'json')
                    except json.JSONDecodeError as e:
                        print(f"Error parsing JSON file {file_path}: {str(e)}")
                        return set()
                else:
                    return extract_urls(content, 'text')
        except Exception as e:
            print(f"Error processing {file_path}: {str(e)}")
            return set()
    
    # Handle both single file and directory
    if input_path.is_file():
        if input_path.suffix.lower() in supported_extensions:
            all_urls.update(process_single_file(input_path))
        else:
            print(f"Unsupported file type: {input_path}")
            return
    elif input_path.is_dir():
        for file_path in input_path.iterdir():
            if file_path.suffix.lower() in supported_extensions:
                all_urls.update(process_single_file(file_path))
    else:
        print(f"Input path does not exist: {input_path}")
        return
    
    if not all_urls:
        print("No URLs found in the input file(s).")
        return
    
    print(f"\nFound {len(all_urls)} unique URLs")
    
    # Download files using thread pool
    with ThreadPoolExecutor(max_workers=max_workers) as executor:
        results = list(executor.map(lambda url: download_file(url, output_dir), all_urls))
    
    successful = sum(1 for r in results if r)
    print(f"\nDownload summary:")
    print(f"Total URLs found: {len(all_urls)}")
    print(f"Successfully downloaded: {successful}")
    print(f"Failed: {len(all_urls) - successful}")

In [13]:
if __name__ == "__main__":
    input_path = r"C:\Users\Sakib\Desktop\postman_collection.json"
    output_directory = "downloads"
    
    # Force process as text file regardless of extension
    with open(input_path, 'r', encoding='utf-8') as f:
        content = f.read()
        urls = extract_urls_from_text(content)
        if urls:
            print(f"Found {len(urls)} URLs")
            for url in urls:
                download_file(url, output_directory)
        else:
            print("No valid URLs found in the file")

Found 1 URLs
Downloaded: googlelogo_color_272x92dp.png (5969 bytes)
