<a href="https://colab.research.google.com/github/BKG10/90-Days-Of-Cyber-Security/blob/main/Data%20collection.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [4]:
import re
import socket
import requests
from bs4 import BeautifulSoup
from urllib.parse import urlparse, urljoin
from tld import get_tld
import pandas as pd
import concurrent.futures
from functools import partial
from tqdm.notebook import tqdm
import time

class URLFeatureExtractor:
    def __init__(self, url, timeout=10):
        self.url = url
        self.timeout = timeout
        self.parsed_url = self.safe_parse(url)
        self.domain = self.parsed_url.netloc if self.parsed_url else None
        self.soup = None
        self.page_content = None
        self.response = None
        self.error = None

        try:
            headers = {'User-Agent': 'Mozilla/5.0'}
            self.response = requests.get(
                url,
                headers=headers,
                timeout=self.timeout,
                allow_redirects=True
            )
            self.page_content = self.response.text
            self.soup = BeautifulSoup(self.page_content, 'html.parser')
        except Exception as e:
            self.error = str(e)

    def safe_parse(self, url):
        try:
            return urlparse(url)
        except:
            return None

    # Basic URL features
    def get_url_length(self):
        return len(self.url) if self.url else 0

    def get_domain_length(self):
        return len(self.domain) if self.domain else 0

    def get_tld_length(self):
        try:
            tld = get_tld(self.url, fail_silently=True)
            return len(tld) if tld else 0
        except:
            return 0

    def get_letter_ratio_in_url(self):
        letters = sum(c.isalpha() for c in self.url)
        return letters / len(self.url) if self.url else 0

    def get_digit_ratio_in_url(self):
        digits = sum(c.isdigit() for c in self.url)
        return digits / len(self.url) if self.url else 0

    def get_special_char_ratio_in_url(self):
        special = sum(not c.isalnum() for c in self.url)
        return special / len(self.url) if self.url else 0

    # Page content features
    def get_largest_line_length(self):
        if not self.page_content:
            return 0
        lines = self.page_content.split('\n')
        return max(len(line) for line in lines) if lines else 0

    def get_no_of_images(self):
        return len(self.soup.find_all('img')) if self.soup else 0

    def get_no_of_js(self):
        return len(self.soup.find_all('script')) if self.soup else 0

    def get_no_of_css(self):
        return len(self.soup.find_all('link', {'rel': 'stylesheet'})) if self.soup else 0

    def get_no_of_self_ref(self):
        if not self.soup or not self.parsed_url:
            return 0
        base_url = f"{self.parsed_url.scheme}://{self.parsed_url.netloc}"
        self_ref = 0

        for tag in self.soup.find_all(['a', 'link', 'script', 'img']):
            url = tag.get('href', '') or tag.get('src', '')
            if url:
                abs_url = urljoin(base_url, url)
                if abs_url.startswith(base_url):
                    self_ref += 1
        return self_ref

    def get_no_of_external_ref(self):
        if not self.soup or not self.parsed_url:
            return 0
        base_url = f"{self.parsed_url.scheme}://{self.parsed_url.netloc}"
        external_ref = 0

        for tag in self.soup.find_all(['a', 'link', 'script', 'img']):
            url = tag.get('href', '') or tag.get('src', '')
            if url:
                abs_url = urljoin(base_url, url)
                if not abs_url.startswith(base_url) and urlparse(abs_url).netloc:
                    external_ref += 1
        return external_ref

    # Security/behavior features
    def is_https(self):
        if not self.parsed_url:
            return 0
        return 1 if self.parsed_url.scheme == 'https' else (-1 if self.parsed_url.scheme == 'http' else 0)

    def has_obfuscation(self):
        if not self.page_content:
            return 0
        patterns = [
            r'%[0-9a-fA-F]{2}',
            r'\\x[0-9a-fA-F]{2}',
            r'&#x[0-9a-fA-F]+;',
            r'javascript:',
            r'eval\s*\(',
            r'document\.write',
            r'String\.fromCharCode'
        ]
        return 1 if any(re.search(pattern, self.page_content) for pattern in patterns) else -1

    def has_title(self):
        if not self.soup:
            return 0
        title = self.soup.title
        return 1 if title and title.string and title.string.strip() else -1

    def has_description(self):
        if not self.soup:
            return 0
        meta = self.soup.find('meta', attrs={'name': 'description'})
        return 1 if meta and meta.get('content', '').strip() else -1

    def has_submit_button(self):
        if not self.soup:
            return 0
        buttons = self.soup.find_all('input', {'type': 'submit'}) + self.soup.find_all('button')
        return 1 if buttons else -1

    def has_password_field(self):
        if not self.soup:
            return 0
        password_fields = self.soup.find_all('input', {'type': 'password'})
        return 1 if password_fields else -1

    def has_social_net(self):
        if not self.soup:
            return 0
        social_keywords = ['facebook', 'twitter', 'linkedin', 'instagram', 'youtube', 'pinterest']
        for keyword in social_keywords:
            if self.soup.find_all(href=re.compile(keyword, re.I)):
                return 1
        return -1

    def has_favicon(self):
        if not self.soup:
            return 0
        favicon = self.soup.find('link', rel=re.compile('icon', re.I))
        return 1 if favicon else -1

    def is_domain_ip(self):
        if not self.domain:
            return 0
        try:
            socket.inet_aton(self.domain.split(':')[0])
            return 1
        except (socket.error, ValueError):
            return -1
        except:
            return 0

    def has_copyright_info(self):
        if not self.soup:
            return 0
        copyright_texts = self.soup.find_all(string=re.compile(r'copyright|©', re.I))
        return 1 if copyright_texts else -1

    def has_right_click_disabled(self):
        if not self.page_content:
            return 0
        patterns = [
            r'oncontextmenu\s*=\s*["\']return false["\']',
            r'document\.oncontextmenu\s*=\s*function\(\)\s*{\s*return false',
            r'event\.button\s*==\s*2'
        ]
        return 1 if any(re.search(pattern, self.page_content, re.I) for pattern in patterns) else -1

    def has_popup_window(self):
        if not self.page_content:
            return 0
        return 1 if re.search(r'window\.open\s*\(|alert\s*\(|confirm\s*\(|prompt\s*\(', self.page_content, re.I) else -1

    def has_iframe(self):
        if not self.soup:
            return 0
        iframes = self.soup.find_all('iframe')
        return 1 if iframes else -1

    def is_abnormal_url(self):
        if not self.url:
            return 0
        abnormal_patterns = [
            r'@',
            r'//\w+@',
            r'\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}',
            r'https?://[^/]+/',
            r'\.(exe|zip|rar|js|jar|dll|bat|cmd|msi)$'
        ]
        return 1 if any(re.search(pattern, self.url, re.I) for pattern in abnormal_patterns) else -1

    def has_redirect(self):
        if not self.response:
            return 0
        return 1 if len(self.response.history) > 0 else -1

    def has_on_mouseover(self):
        if not self.page_content:
            return 0
        return 1 if re.search(r'onmouseover\s*=', self.page_content, re.I) else -1

    def extract_all_features(self):
        features = {
            'URL': self.url,
            'URLLength': self.get_url_length(),
            'DomainLength': self.get_domain_length(),
            'TLDLength': self.get_tld_length(),
            'LetterRatioInURL': self.get_letter_ratio_in_url(),
            'DigitRatioInURL': self.get_digit_ratio_in_url(),
            'SpacialCharRatioInURL': self.get_special_char_ratio_in_url(),
            'LargestLineLength': self.get_largest_line_length(),
            'NoOfImage': self.get_no_of_images(),
            'NoOfJS': self.get_no_of_js(),
            'NoOfCSS': self.get_no_of_css(),
            'NoOfSelfRef': self.get_no_of_self_ref(),
            'NoOfExternalRef': self.get_no_of_external_ref(),
            'IsHTTPS': self.is_https(),
            'HasObfuscation': self.has_obfuscation(),
            'HasTitle': self.has_title(),
            'HasDescription': self.has_description(),
            'HasSubmitButton': self.has_submit_button(),
            'HasPasswordField': self.has_password_field(),
            'HasSocialNet': self.has_social_net(),
            'HasFavicon': self.has_favicon(),
            'IsDomainIP': self.is_domain_ip(),
            'HasCopyrightInfo': self.has_copyright_info(),
            'RightClick': self.has_right_click_disabled(),
            'popUpWindow': self.has_popup_window(),
            'Iframe': self.has_iframe(),
            'Abnormal_URL': self.is_abnormal_url(),
            'Redirect': self.has_redirect(),
            'on_mouseover': self.has_on_mouseover(),
            'error': self.error
        }
        return features

def process_single_url(url, timeout=10):
    try:
        extractor = URLFeatureExtractor(url, timeout)
        features = extractor.extract_all_features()
        return features
    except Exception as e:
        return {'URL': url, 'error': str(e)}

def process_urls_colab(input_file, output_file, max_workers=20, timeout=10, retries=2):
    """Colab-optimized URL processor with all requested features"""
    try:
        df = pd.read_excel(input_file)
        if 'URL' not in df.columns:
            raise ValueError("Input file must contain 'URL' column")

        print("Colab Resource Info:")
        !nvidia-smi
        !free -h

        urls = df['URL'].tolist()
        results = []

        for attempt in range(retries + 1):
            with concurrent.futures.ThreadPoolExecutor(max_workers=max_workers) as executor:
                remaining_urls = [url for url in urls if url not in [r.get('URL') for r in results]]
                futures = {executor.submit(process_single_url, url, timeout): url
                          for url in remaining_urls}

                try:
                    for future in tqdm(concurrent.futures.as_completed(futures),
                                     total=len(futures), desc=f"Attempt {attempt+1}"):
                        url = futures[future]
                        try:
                            results.append(future.result())
                        except Exception as e:
                            print(f"\nError processing {url}: {e}")
                            if attempt == retries:
                                results.append({'URL': url, 'error': str(e)})
                except Exception as e:
                    print(f"\nBatch processing error (attempt {attempt+1}): {e}")
                    if attempt == retries:
                        raise
                    max_workers = max(1, max_workers // 2)
                    print(f"Reducing workers to {max_workers} for next attempt")
                    time.sleep(5)

        output_df = pd.merge(df, pd.DataFrame(results), on='URL', how='left')
        output_df.to_excel(output_file, index=False)
        print(f"\nSuccessfully processed {len([r for r in results if r.get('error') is None])}/{len(urls)} URLs")
        print(f"Results saved to {output_file}")

    except Exception as e:
        print(f"\nFatal error: {e}")
        raise

# Example usage:
# process_urls_colab("input.xlsx", "output.xlsx", max_workers=20)

In [5]:
process_urls_colab("TEST.xlsx", "output.xlsx", max_workers=20)

Colab Resource Info:
/bin/bash: line 1: nvidia-smi: command not found
               total        used        free      shared  buff/cache   available
Mem:            12Gi       890Mi       8.5Gi       2.0Mi       3.3Gi        11Gi
Swap:             0B          0B          0B


Attempt 1:   0%|          | 0/78743 [00:00<?, ?it/s]


Assuming this really is an XML document, what you're doing might work, but you should know that using an XML parser will be more reliable. To parse this document as XML, make sure you have the Python package 'lxml' installed, and pass the keyword argument `features="xml"` into the BeautifulSoup constructor.




  self.soup = BeautifulSoup(self.page_content, 'html.parser')
Traceback (most recent call last):
  File "/usr/local/lib/python3.11/dist-packages/urllib3/connection.py", line 519, in getresponse
    assert_header_parsing(httplib_response.msg)
  File "/usr/local/lib/python3.11/dist-packages/urllib3/util/response.py", line 88, in assert_header_parsing
    raise HeaderParsingError(defects=defects, unparsed_data=unparsed_data)
urllib3.exceptions.HeaderParsingError: [MissingHeaderBodySeparatorDefect()], unparsed data: 'Referrer-Policy : origin\r\nX-XSS-Protection: 0\r\nX-Frame-Options: SAMEORIGIN\r\nDate: Mon, 07 Apr 2025 03:48:00 GMT\r\n\r\n'

If you meant to use Beautiful Soup to p

Attempt 2: 0it [00:00, ?it/s]

Attempt 3: 0it [00:00, ?it/s]


Successfully processed 70499/78743 URLs
Results saved to output.xlsx


In [None]:
import re
import socket
import requests
from bs4 import BeautifulSoup
from urllib.parse import urlparse, urljoin
from tld import get_tld
import pandas as pd
import concurrent.futures
from functools import partial
from tqdm.notebook import tqdm
import time

class URLFeatureExtractor:
    def __init__(self, url, timeout=10):
        self.url = url
        self.timeout = timeout
        self.parsed_url = self.safe_parse(url)
        self.domain = self.parsed_url.netloc if self.parsed_url else None
        self.soup = None
        self.page_content = None
        self.response = None
        self.error = None

        try:
            headers = {'User-Agent': 'Mozilla/5.0'}
            self.response = requests.get(
                url,
                headers=headers,
                timeout=self.timeout,
                allow_redirects=True
            )
            self.page_content = self.response.text
            self.soup = BeautifulSoup(self.page_content, 'html.parser')
        except Exception as e:
            self.error = str(e)

    def safe_parse(self, url):
        try:
            return urlparse(url)
        except:
            return None

    # Basic URL features
    def get_url_length(self):
        return len(self.url) if self.url else 0

    def get_domain_length(self):
        return len(self.domain) if self.domain else 0

    def get_tld_length(self):
        try:
            tld = get_tld(self.url, fail_silently=True)
            return len(tld) if tld else 0
        except:
            return 0

    def get_letter_ratio_in_url(self):
        letters = sum(c.isalpha() for c in self.url)
        return letters / len(self.url) if self.url else 0

    def get_digit_ratio_in_url(self):
        digits = sum(c.isdigit() for c in self.url)
        return digits / len(self.url) if self.url else 0

    def get_special_char_ratio_in_url(self):
        special = sum(not c.isalnum() for c in self.url)
        return special / len(self.url) if self.url else 0

    # Page content features
    def get_largest_line_length(self):
        if not self.page_content:
            return 0
        lines = self.page_content.split('\n')
        return max(len(line) for line in lines) if lines else 0

    def get_no_of_images(self):
        return len(self.soup.find_all('img')) if self.soup else 0

    def get_no_of_js(self):
        return len(self.soup.find_all('script')) if self.soup else 0

    def get_no_of_css(self):
        return len(self.soup.find_all('link', {'rel': 'stylesheet'})) if self.soup else 0

    def get_no_of_self_ref(self):
        if not self.soup or not self.parsed_url:
            return 0
        base_url = f"{self.parsed_url.scheme}://{self.parsed_url.netloc}"
        self_ref = 0

        for tag in self.soup.find_all(['a', 'link', 'script', 'img']):
            url = tag.get('href', '') or tag.get('src', '')
            if url:
                abs_url = urljoin(base_url, url)
                if abs_url.startswith(base_url):
                    self_ref += 1
        return self_ref

    def get_no_of_external_ref(self):
        if not self.soup or not self.parsed_url:
            return 0
        base_url = f"{self.parsed_url.scheme}://{self.parsed_url.netloc}"
        external_ref = 0

        for tag in self.soup.find_all(['a', 'link', 'script', 'img']):
            url = tag.get('href', '') or tag.get('src', '')
            if url:
                abs_url = urljoin(base_url, url)
                if not abs_url.startswith(base_url) and urlparse(abs_url).netloc:
                    external_ref += 1
        return external_ref

    # Security/behavior features
    def is_https(self):
        if not self.parsed_url:
            return 0
        return 1 if self.parsed_url.scheme == 'https' else (-1 if self.parsed_url.scheme == 'http' else 0)

    def has_obfuscation(self):
        if not self.page_content:
            return 0
        patterns = [
            r'%[0-9a-fA-F]{2}',
            r'\\x[0-9a-fA-F]{2}',
            r'&#x[0-9a-fA-F]+;',
            r'javascript:',
            r'eval\s*\(',
            r'document\.write',
            r'String\.fromCharCode'
        ]
        return 1 if any(re.search(pattern, self.page_content) for pattern in patterns) else -1

    def has_title(self):
        if not self.soup:
            return 0
        title = self.soup.title
        return 1 if title and title.string and title.string.strip() else -1

    def has_description(self):
        if not self.soup:
            return 0
        meta = self.soup.find('meta', attrs={'name': 'description'})
        return 1 if meta and meta.get('content', '').strip() else -1

    def has_submit_button(self):
        if not self.soup:
            return 0
        buttons = self.soup.find_all('input', {'type': 'submit'}) + self.soup.find_all('button')
        return 1 if buttons else -1

    def has_password_field(self):
        if not self.soup:
            return 0
        password_fields = self.soup.find_all('input', {'type': 'password'})
        return 1 if password_fields else -1

    def has_social_net(self):
        if not self.soup:
            return 0
        social_keywords = ['facebook', 'twitter', 'linkedin', 'instagram', 'youtube', 'pinterest']
        for keyword in social_keywords:
            if self.soup.find_all(href=re.compile(keyword, re.I)):
                return 1
        return -1

    def has_favicon(self):
        if not self.soup:
            return 0
        favicon = self.soup.find('link', rel=re.compile('icon', re.I))
        return 1 if favicon else -1

    def is_domain_ip(self):
        if not self.domain:
            return 0
        try:
            socket.inet_aton(self.domain.split(':')[0])
            return 1
        except (socket.error, ValueError):
            return -1
        except:
            return 0

    def has_copyright_info(self):
        if not self.soup:
            return 0
        copyright_texts = self.soup.find_all(string=re.compile(r'copyright|©', re.I))
        return 1 if copyright_texts else -1

    def has_right_click_disabled(self):
        if not self.page_content:
            return 0
        patterns = [
            r'oncontextmenu\s*=\s*["\']return false["\']',
            r'document\.oncontextmenu\s*=\s*function\(\)\s*{\s*return false',
            r'event\.button\s*==\s*2'
        ]
        return 1 if any(re.search(pattern, self.page_content, re.I) for pattern in patterns) else -1

    def has_popup_window(self):
        if not self.page_content:
            return 0
        return 1 if re.search(r'window\.open\s*\(|alert\s*\(|confirm\s*\(|prompt\s*\(', self.page_content, re.I) else -1

    def has_iframe(self):
        if not self.soup:
            return 0
        iframes = self.soup.find_all('iframe')
        return 1 if iframes else -1

    def is_abnormal_url(self):
        if not self.url:
            return 0
        abnormal_patterns = [
            r'@',
            r'//\w+@',
            r'\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}',
            r'https?://[^/]+/',
            r'\.(exe|zip|rar|js|jar|dll|bat|cmd|msi)$'
        ]
        return 1 if any(re.search(pattern, self.url, re.I) for pattern in abnormal_patterns) else -1

    def has_redirect(self):
        if not self.response:
            return 0
        return 1 if len(self.response.history) > 0 else -1

    def has_on_mouseover(self):
        if not self.page_content:
            return 0
        return 1 if re.search(r'onmouseover\s*=', self.page_content, re.I) else -1

    def extract_all_features(self):
        features = {
            'URL': self.url,
            'URLLength': self.get_url_length(),
            'DomainLength': self.get_domain_length(),
            'TLDLength': self.get_tld_length(),
            'LetterRatioInURL': self.get_letter_ratio_in_url(),
            'DigitRatioInURL': self.get_digit_ratio_in_url(),
            'SpacialCharRatioInURL': self.get_special_char_ratio_in_url(),
            'LargestLineLength': self.get_largest_line_length(),
            'NoOfImage': self.get_no_of_images(),
            'NoOfJS': self.get_no_of_js(),
            'NoOfCSS': self.get_no_of_css(),
            'NoOfSelfRef': self.get_no_of_self_ref(),
            'NoOfExternalRef': self.get_no_of_external_ref(),
            'IsHTTPS': self.is_https(),
            'HasObfuscation': self.has_obfuscation(),
            'HasTitle': self.has_title(),
            'HasDescription': self.has_description(),
            'HasSubmitButton': self.has_submit_button(),
            'HasPasswordField': self.has_password_field(),
            'HasSocialNet': self.has_social_net(),
            'HasFavicon': self.has_favicon(),
            'IsDomainIP': self.is_domain_ip(),
            'HasCopyrightInfo': self.has_copyright_info(),
            'RightClick': self.has_right_click_disabled(),
            'popUpWindow': self.has_popup_window(),
            'Iframe': self.has_iframe(),
            'Abnormal_URL': self.is_abnormal_url(),
            'Redirect': self.has_redirect(),
            'on_mouseover': self.has_on_mouseover(),
            'error': self.error
        }
        return features

def process_single_url(url, timeout=10):
    try:
        extractor = URLFeatureExtractor(url, timeout)
        features = extractor.extract_all_features()
        return features
    except Exception as e:
        return {'URL': url, 'error': str(e)}

def process_urls_colab(input_file, output_file, max_workers=20, timeout=10, retries=2):
    """Colab-optimized URL processor with all requested features"""
    try:
        df = pd.read_excel(input_file)
        if 'URL' not in df.columns:
            raise ValueError("Input file must contain 'URL' column")

        print("Colab Resource Info:")
        !nvidia-smi
        !free -h

        urls = df['URL'].tolist()
        results = []

        for attempt in range(retries + 1):
            with concurrent.futures.ThreadPoolExecutor(max_workers=max_workers) as executor:
                remaining_urls = [url for url in urls if url not in [r.get('URL') for r in results]]
                futures = {executor.submit(process_single_url, url, timeout): url
                          for url in remaining_urls}

                try:
                    for future in tqdm(concurrent.futures.as_completed(futures),
                                     total=len(futures), desc=f"Attempt {attempt+1}"):
                        url = futures[future]
                        try:
                            results.append(future.result())
                        except Exception as e:
                            print(f"\nError processing {url}: {e}")
                            if attempt == retries:
                                results.append({'URL': url, 'error': str(e)})
                except Exception as e:
                    print(f"\nBatch processing error (attempt {attempt+1}): {e}")
                    if attempt == retries:
                        raise
                    max_workers = max(1, max_workers // 2)
                    print(f"Reducing workers to {max_workers} for next attempt")
                    time.sleep(5)

        output_df = pd.merge(df, pd.DataFrame(results), on='URL', how='left')
        output_df.to_excel(output_file, index=False)
        print(f"\nSuccessfully processed {len([r for r in results if r.get('error') is None])}/{len(urls)} URLs")
        print(f"Results saved to {output_file}")

    except Exception as e:
        print(f"\nFatal error: {e}")
        raise

# Example usage:
# process_urls_colab("input.xlsx", "output.xlsx", max_workers=20)

In [None]:
process_urls_colab("TEST.xlsx", "output.xlsx", max_workers=20)

Colab Resource Info:
/bin/bash: line 1: nvidia-smi: command not found
               total        used        free      shared  buff/cache   available
Mem:            12Gi       868Mi        10Gi       2.0Mi       1.0Gi        11Gi
Swap:             0B          0B          0B



Assuming this really is an XML document, what you're doing might work, but you should know that using an XML parser will be more reliable. To parse this document as XML, make sure you have the Python package 'lxml' installed, and pass the keyword argument `features="xml"` into the BeautifulSoup constructor.




  self.soup = BeautifulSoup(self.page_content, 'html.parser')


Attempt 1:   0%|          | 0/49998 [00:00<?, ?it/s]

In [2]:
import re
import socket
import requests
from bs4 import BeautifulSoup
from urllib.parse import urlparse, urljoin
from tld import get_tld
import pandas as pd
import concurrent.futures
from functools import partial
from tqdm.notebook import tqdm
import time

class URLFeatureExtractor:
    def __init__(self, url, timeout=10):
        self.url = url
        self.timeout = timeout
        self.parsed_url = self.safe_parse(url)
        self.domain = self.parsed_url.netloc if self.parsed_url else None
        self.soup = None
        self.page_content = None
        self.response = None
        self.error = None

        try:
            headers = {'User-Agent': 'Mozilla/5.0'}
            self.response = requests.get(
                url,
                headers=headers,
                timeout=self.timeout,
                allow_redirects=True
            )
            self.page_content = self.response.text
            self.soup = BeautifulSoup(self.page_content, 'html.parser')
        except Exception as e:
            self.error = str(e)

    def safe_parse(self, url):
        try:
            return urlparse(url)
        except:
            return None

    # Basic URL features
    def get_url_length(self):
        return len(self.url) if self.url else 0

    def get_domain_length(self):
        return len(self.domain) if self.domain else 0

    def get_tld_length(self):
        try:
            tld = get_tld(self.url, fail_silently=True)
            return len(tld) if tld else 0
        except:
            return 0

    def get_letter_ratio_in_url(self):
        letters = sum(c.isalpha() for c in self.url)
        return letters / len(self.url) if self.url else 0

    def get_digit_ratio_in_url(self):
        digits = sum(c.isdigit() for c in self.url)
        return digits / len(self.url) if self.url else 0

    def get_special_char_ratio_in_url(self):
        special = sum(not c.isalnum() for c in self.url)
        return special / len(self.url) if self.url else 0

    # Page content features
    def get_largest_line_length(self):
        if not self.page_content:
            return 0
        lines = self.page_content.split('\n')
        return max(len(line) for line in lines) if lines else 0

    def get_no_of_images(self):
        return len(self.soup.find_all('img')) if self.soup else 0

    def get_no_of_js(self):
        return len(self.soup.find_all('script')) if self.soup else 0

    def get_no_of_css(self):
        return len(self.soup.find_all('link', {'rel': 'stylesheet'})) if self.soup else 0

    def get_no_of_self_ref(self):
        if not self.soup or not self.parsed_url:
            return 0
        base_url = f"{self.parsed_url.scheme}://{self.parsed_url.netloc}"
        self_ref = 0

        for tag in self.soup.find_all(['a', 'link', 'script', 'img']):
            url = tag.get('href', '') or tag.get('src', '')
            if url:
                abs_url = urljoin(base_url, url)
                if abs_url.startswith(base_url):
                    self_ref += 1
        return self_ref

    def get_no_of_external_ref(self):
        if not self.soup or not self.parsed_url:
            return 0
        base_url = f"{self.parsed_url.scheme}://{self.parsed_url.netloc}"
        external_ref = 0

        for tag in self.soup.find_all(['a', 'link', 'script', 'img']):
            url = tag.get('href', '') or tag.get('src', '')
            if url:
                abs_url = urljoin(base_url, url)
                if not abs_url.startswith(base_url) and urlparse(abs_url).netloc:
                    external_ref += 1
        return external_ref

    # Security/behavior features
    def is_https(self):
        if not self.parsed_url:
            return 0
        return 1 if self.parsed_url.scheme == 'https' else (-1 if self.parsed_url.scheme == 'http' else 0)

    def has_obfuscation(self):
        if not self.page_content:
            return 0
        patterns = [
            r'%[0-9a-fA-F]{2}',
            r'\\x[0-9a-fA-F]{2}',
            r'&#x[0-9a-fA-F]+;',
            r'javascript:',
            r'eval\s*\(',
            r'document\.write',
            r'String\.fromCharCode'
        ]
        return 1 if any(re.search(pattern, self.page_content) for pattern in patterns) else -1

    def has_title(self):
        if not self.soup:
            return 0
        title = self.soup.title
        return 1 if title and title.string and title.string.strip() else -1

    def has_description(self):
        if not self.soup:
            return 0
        meta = self.soup.find('meta', attrs={'name': 'description'})
        return 1 if meta and meta.get('content', '').strip() else -1

    def has_submit_button(self):
        if not self.soup:
            return 0
        buttons = self.soup.find_all('input', {'type': 'submit'}) + self.soup.find_all('button')
        return 1 if buttons else -1

    def has_password_field(self):
        if not self.soup:
            return 0
        password_fields = self.soup.find_all('input', {'type': 'password'})
        return 1 if password_fields else -1

    def has_social_net(self):
        if not self.soup:
            return 0
        social_keywords = ['facebook', 'twitter', 'linkedin', 'instagram', 'youtube', 'pinterest']
        for keyword in social_keywords:
            if self.soup.find_all(href=re.compile(keyword, re.I)):
                return 1
        return -1

    def has_favicon(self):
        if not self.soup:
            return 0
        favicon = self.soup.find('link', rel=re.compile('icon', re.I))
        return 1 if favicon else -1

    def is_domain_ip(self):
        if not self.domain:
            return 0
        try:
            socket.inet_aton(self.domain.split(':')[0])
            return 1
        except (socket.error, ValueError):
            return -1
        except:
            return 0

    def has_copyright_info(self):
        if not self.soup:
            return 0
        copyright_texts = self.soup.find_all(string=re.compile(r'copyright|©', re.I))
        return 1 if copyright_texts else -1

    def has_right_click_disabled(self):
        if not self.page_content:
            return 0
        patterns = [
            r'oncontextmenu\s*=\s*["\']return false["\']',
            r'document\.oncontextmenu\s*=\s*function\(\)\s*{\s*return false',
            r'event\.button\s*==\s*2'
        ]
        return 1 if any(re.search(pattern, self.page_content, re.I) for pattern in patterns) else -1

    def has_popup_window(self):
        if not self.page_content:
            return 0
        return 1 if re.search(r'window\.open\s*\(|alert\s*\(|confirm\s*\(|prompt\s*\(', self.page_content, re.I) else -1

    def has_iframe(self):
        if not self.soup:
            return 0
        iframes = self.soup.find_all('iframe')
        return 1 if iframes else -1

    def is_abnormal_url(self):
        if not self.url:
            return 0
        abnormal_patterns = [
            r'@',
            r'//\w+@',
            r'\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}',
            r'https?://[^/]+/',
            r'\.(exe|zip|rar|js|jar|dll|bat|cmd|msi)$'
        ]
        return 1 if any(re.search(pattern, self.url, re.I) for pattern in abnormal_patterns) else -1

    def has_redirect(self):
        if not self.response:
            return 0
        return 1 if len(self.response.history) > 0 else -1

    def has_on_mouseover(self):
        if not self.page_content:
            return 0
        return 1 if re.search(r'onmouseover\s*=', self.page_content, re.I) else -1

    def extract_all_features(self):
        features = {
            'URL': self.url,
            'URLLength': self.get_url_length(),
            'DomainLength': self.get_domain_length(),
            'TLDLength': self.get_tld_length(),
            'LetterRatioInURL': self.get_letter_ratio_in_url(),
            'DigitRatioInURL': self.get_digit_ratio_in_url(),
            'SpacialCharRatioInURL': self.get_special_char_ratio_in_url(),
            'LargestLineLength': self.get_largest_line_length(),
            'NoOfImage': self.get_no_of_images(),
            'NoOfJS': self.get_no_of_js(),
            'NoOfCSS': self.get_no_of_css(),
            'NoOfSelfRef': self.get_no_of_self_ref(),
            'NoOfExternalRef': self.get_no_of_external_ref(),
            'IsHTTPS': self.is_https(),
            'HasObfuscation': self.has_obfuscation(),
            'HasTitle': self.has_title(),
            'HasDescription': self.has_description(),
            'HasSubmitButton': self.has_submit_button(),
            'HasPasswordField': self.has_password_field(),
            'HasSocialNet': self.has_social_net(),
            'HasFavicon': self.has_favicon(),
            'IsDomainIP': self.is_domain_ip(),
            'HasCopyrightInfo': self.has_copyright_info(),
            'RightClick': self.has_right_click_disabled(),
            'popUpWindow': self.has_popup_window(),
            'Iframe': self.has_iframe(),
            'Abnormal_URL': self.is_abnormal_url(),
            'Redirect': self.has_redirect(),
            'on_mouseover': self.has_on_mouseover(),
            'error': self.error
        }
        return features

def process_single_url(url, timeout=10):
    try:
        extractor = URLFeatureExtractor(url, timeout)
        features = extractor.extract_all_features()
        return features
    except Exception as e:
        return {'URL': url, 'error': str(e)}

def process_urls_colab(input_file, output_file, max_workers=20, timeout=10, retries=2):
    """Colab-optimized URL processor with all requested features"""
    try:
        df = pd.read_excel(input_file)
        if 'URL' not in df.columns:
            raise ValueError("Input file must contain 'URL' column")

        print("Colab Resource Info:")
        !nvidia-smi
        !free -h

        urls = df['URL'].tolist()
        results = []

        for attempt in range(retries + 1):
            with concurrent.futures.ThreadPoolExecutor(max_workers=max_workers) as executor:
                remaining_urls = [url for url in urls if url not in [r.get('URL') for r in results]]
                futures = {executor.submit(process_single_url, url, timeout): url
                          for url in remaining_urls}

                try:
                    for future in tqdm(concurrent.futures.as_completed(futures),
                                     total=len(futures), desc=f"Attempt {attempt+1}"):
                        url = futures[future]
                        try:
                            results.append(future.result())
                        except Exception as e:
                            print(f"\nError processing {url}: {e}")
                            if attempt == retries:
                                results.append({'URL': url, 'error': str(e)})
                except Exception as e:
                    print(f"\nBatch processing error (attempt {attempt+1}): {e}")
                    if attempt == retries:
                        raise
                    max_workers = max(1, max_workers // 2)
                    print(f"Reducing workers to {max_workers} for next attempt")
                    time.sleep(5)

        output_df = pd.merge(df, pd.DataFrame(results), on='URL', how='left')
        output_df.to_excel(output_file, index=False)
        print(f"\nSuccessfully processed {len([r for r in results if r.get('error') is None])}/{len(urls)} URLs")
        print(f"Results saved to {output_file}")

    except Exception as e:
        print(f"\nFatal error: {e}")
        raise

# Example usage:
# process_urls_colab("input.xlsx", "output.xlsx", max_workers=20)

In [3]:
process_urls_colab("TEST.xlsx", "output.xlsx", max_workers=20)

Colab Resource Info:
/bin/bash: line 1: nvidia-smi: command not found
               total        used        free      shared  buff/cache   available
Mem:            12Gi       881Mi        11Gi       2.0Mi       807Mi        11Gi
Swap:             0B          0B          0B



Assuming this really is an XML document, what you're doing might work, but you should know that using an XML parser will be more reliable. To parse this document as XML, make sure you have the Python package 'lxml' installed, and pass the keyword argument `features="xml"` into the BeautifulSoup constructor.




  self.soup = BeautifulSoup(self.page_content, 'html.parser')


Attempt 1:   0%|          | 0/49999 [00:00<?, ?it/s]


If you meant to use Beautiful Soup to parse the contents of a file on disk, then something has gone wrong. You should open the file first, using code like this:

    filehandle = open(your filename)

You can then feed the open filehandle into Beautiful Soup instead of using the filename.



    
  self.soup = BeautifulSoup(self.page_content, 'html.parser')


Attempt 2: 0it [00:00, ?it/s]

Attempt 3: 0it [00:00, ?it/s]


Successfully processed 25164/49999 URLs
Results saved to output.xlsx


In [4]:
import re
import socket
import requests
from bs4 import BeautifulSoup
from urllib.parse import urlparse, urljoin
from tld import get_tld
import pandas as pd
import concurrent.futures
from functools import partial
from tqdm.notebook import tqdm
import time

class URLFeatureExtractor:
    def __init__(self, url, timeout=10):
        self.url = url
        self.timeout = timeout
        self.parsed_url = self.safe_parse(url)
        self.domain = self.parsed_url.netloc if self.parsed_url else None
        self.soup = None
        self.page_content = None
        self.response = None
        self.error = None

        try:
            headers = {'User-Agent': 'Mozilla/5.0'}
            self.response = requests.get(
                url,
                headers=headers,
                timeout=self.timeout,
                allow_redirects=True
            )
            self.page_content = self.response.text
            self.soup = BeautifulSoup(self.page_content, 'html.parser')
        except Exception as e:
            self.error = str(e)

    def safe_parse(self, url):
        try:
            return urlparse(url)
        except:
            return None

    # Basic URL features
    def get_url_length(self):
        return len(self.url) if self.url else 0

    def get_domain_length(self):
        return len(self.domain) if self.domain else 0

    def get_tld_length(self):
        try:
            tld = get_tld(self.url, fail_silently=True)
            return len(tld) if tld else 0
        except:
            return 0

    def get_letter_ratio_in_url(self):
        letters = sum(c.isalpha() for c in self.url)
        return letters / len(self.url) if self.url else 0

    def get_digit_ratio_in_url(self):
        digits = sum(c.isdigit() for c in self.url)
        return digits / len(self.url) if self.url else 0

    def get_special_char_ratio_in_url(self):
        special = sum(not c.isalnum() for c in self.url)
        return special / len(self.url) if self.url else 0

    # Page content features
    def get_largest_line_length(self):
        if not self.page_content:
            return 0
        lines = self.page_content.split('\n')
        return max(len(line) for line in lines) if lines else 0

    def get_no_of_images(self):
        return len(self.soup.find_all('img')) if self.soup else 0

    def get_no_of_js(self):
        return len(self.soup.find_all('script')) if self.soup else 0

    def get_no_of_css(self):
        return len(self.soup.find_all('link', {'rel': 'stylesheet'})) if self.soup else 0

    def get_no_of_self_ref(self):
        if not self.soup or not self.parsed_url:
            return 0
        base_url = f"{self.parsed_url.scheme}://{self.parsed_url.netloc}"
        self_ref = 0

        for tag in self.soup.find_all(['a', 'link', 'script', 'img']):
            url = tag.get('href', '') or tag.get('src', '')
            if url:
                abs_url = urljoin(base_url, url)
                if abs_url.startswith(base_url):
                    self_ref += 1
        return self_ref

    def get_no_of_external_ref(self):
        if not self.soup or not self.parsed_url:
            return 0
        base_url = f"{self.parsed_url.scheme}://{self.parsed_url.netloc}"
        external_ref = 0

        for tag in self.soup.find_all(['a', 'link', 'script', 'img']):
            url = tag.get('href', '') or tag.get('src', '')
            if url:
                abs_url = urljoin(base_url, url)
                if not abs_url.startswith(base_url) and urlparse(abs_url).netloc:
                    external_ref += 1
        return external_ref

    # Security/behavior features
    def is_https(self):
        if not self.parsed_url:
            return 0
        return 1 if self.parsed_url.scheme == 'https' else (-1 if self.parsed_url.scheme == 'http' else 0)

    def has_obfuscation(self):
        if not self.page_content:
            return 0
        patterns = [
            r'%[0-9a-fA-F]{2}',
            r'\\x[0-9a-fA-F]{2}',
            r'&#x[0-9a-fA-F]+;',
            r'javascript:',
            r'eval\s*\(',
            r'document\.write',
            r'String\.fromCharCode'
        ]
        return 1 if any(re.search(pattern, self.page_content) for pattern in patterns) else -1

    def has_title(self):
        if not self.soup:
            return 0
        title = self.soup.title
        return 1 if title and title.string and title.string.strip() else -1

    def has_description(self):
        if not self.soup:
            return 0
        meta = self.soup.find('meta', attrs={'name': 'description'})
        return 1 if meta and meta.get('content', '').strip() else -1

    def has_submit_button(self):
        if not self.soup:
            return 0
        buttons = self.soup.find_all('input', {'type': 'submit'}) + self.soup.find_all('button')
        return 1 if buttons else -1

    def has_password_field(self):
        if not self.soup:
            return 0
        password_fields = self.soup.find_all('input', {'type': 'password'})
        return 1 if password_fields else -1

    def has_social_net(self):
        if not self.soup:
            return 0
        social_keywords = ['facebook', 'twitter', 'linkedin', 'instagram', 'youtube', 'pinterest']
        for keyword in social_keywords:
            if self.soup.find_all(href=re.compile(keyword, re.I)):
                return 1
        return -1

    def has_favicon(self):
        if not self.soup:
            return 0
        favicon = self.soup.find('link', rel=re.compile('icon', re.I))
        return 1 if favicon else -1

    def is_domain_ip(self):
        if not self.domain:
            return 0
        try:
            socket.inet_aton(self.domain.split(':')[0])
            return 1
        except (socket.error, ValueError):
            return -1
        except:
            return 0

    def has_copyright_info(self):
        if not self.soup:
            return 0
        copyright_texts = self.soup.find_all(string=re.compile(r'copyright|©', re.I))
        return 1 if copyright_texts else -1

    def has_right_click_disabled(self):
        if not self.page_content:
            return 0
        patterns = [
            r'oncontextmenu\s*=\s*["\']return false["\']',
            r'document\.oncontextmenu\s*=\s*function\(\)\s*{\s*return false',
            r'event\.button\s*==\s*2'
        ]
        return 1 if any(re.search(pattern, self.page_content, re.I) for pattern in patterns) else -1

    def has_popup_window(self):
        if not self.page_content:
            return 0
        return 1 if re.search(r'window\.open\s*\(|alert\s*\(|confirm\s*\(|prompt\s*\(', self.page_content, re.I) else -1

    def has_iframe(self):
        if not self.soup:
            return 0
        iframes = self.soup.find_all('iframe')
        return 1 if iframes else -1

    def is_abnormal_url(self):
        if not self.url:
            return 0
        abnormal_patterns = [
            r'@',
            r'//\w+@',
            r'\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}',
            r'https?://[^/]+/',
            r'\.(exe|zip|rar|js|jar|dll|bat|cmd|msi)$'
        ]
        return 1 if any(re.search(pattern, self.url, re.I) for pattern in abnormal_patterns) else -1

    def has_redirect(self):
        if not self.response:
            return 0
        return 1 if len(self.response.history) > 0 else -1

    def has_on_mouseover(self):
        if not self.page_content:
            return 0
        return 1 if re.search(r'onmouseover\s*=', self.page_content, re.I) else -1

    def extract_all_features(self):
        features = {
            'URL': self.url,
            'URLLength': self.get_url_length(),
            'DomainLength': self.get_domain_length(),
            'TLDLength': self.get_tld_length(),
            'LetterRatioInURL': self.get_letter_ratio_in_url(),
            'DigitRatioInURL': self.get_digit_ratio_in_url(),
            'SpacialCharRatioInURL': self.get_special_char_ratio_in_url(),
            'LargestLineLength': self.get_largest_line_length(),
            'NoOfImage': self.get_no_of_images(),
            'NoOfJS': self.get_no_of_js(),
            'NoOfCSS': self.get_no_of_css(),
            'NoOfSelfRef': self.get_no_of_self_ref(),
            'NoOfExternalRef': self.get_no_of_external_ref(),
            'IsHTTPS': self.is_https(),
            'HasObfuscation': self.has_obfuscation(),
            'HasTitle': self.has_title(),
            'HasDescription': self.has_description(),
            'HasSubmitButton': self.has_submit_button(),
            'HasPasswordField': self.has_password_field(),
            'HasSocialNet': self.has_social_net(),
            'HasFavicon': self.has_favicon(),
            'IsDomainIP': self.is_domain_ip(),
            'HasCopyrightInfo': self.has_copyright_info(),
            'RightClick': self.has_right_click_disabled(),
            'popUpWindow': self.has_popup_window(),
            'Iframe': self.has_iframe(),
            'Abnormal_URL': self.is_abnormal_url(),
            'Redirect': self.has_redirect(),
            'on_mouseover': self.has_on_mouseover(),
            'error': self.error
        }
        return features

def process_single_url(url, timeout=10):
    try:
        extractor = URLFeatureExtractor(url, timeout)
        features = extractor.extract_all_features()
        return features
    except Exception as e:
        return {'URL': url, 'error': str(e)}

def process_urls_colab(input_file, output_file, max_workers=20, timeout=10, retries=2):
    """Colab-optimized URL processor with all requested features"""
    try:
        df = pd.read_excel(input_file)
        if 'URL' not in df.columns:
            raise ValueError("Input file must contain 'URL' column")

        print("Colab Resource Info:")
        !nvidia-smi
        !free -h

        urls = df['URL'].tolist()
        results = []

        for attempt in range(retries + 1):
            with concurrent.futures.ThreadPoolExecutor(max_workers=max_workers) as executor:
                remaining_urls = [url for url in urls if url not in [r.get('URL') for r in results]]
                futures = {executor.submit(process_single_url, url, timeout): url
                          for url in remaining_urls}

                try:
                    for future in tqdm(concurrent.futures.as_completed(futures),
                                     total=len(futures), desc=f"Attempt {attempt+1}"):
                        url = futures[future]
                        try:
                            results.append(future.result())
                        except Exception as e:
                            print(f"\nError processing {url}: {e}")
                            if attempt == retries:
                                results.append({'URL': url, 'error': str(e)})
                except Exception as e:
                    print(f"\nBatch processing error (attempt {attempt+1}): {e}")
                    if attempt == retries:
                        raise
                    max_workers = max(1, max_workers // 2)
                    print(f"Reducing workers to {max_workers} for next attempt")
                    time.sleep(5)

        output_df = pd.merge(df, pd.DataFrame(results), on='URL', how='left')
        output_df.to_excel(output_file, index=False)
        print(f"\nSuccessfully processed {len([r for r in results if r.get('error') is None])}/{len(urls)} URLs")
        print(f"Results saved to {output_file}")

    except Exception as e:
        print(f"\nFatal error: {e}")
        raise

# Example usage:
# process_urls_colab("input.xlsx", "output.xlsx", max_workers=20)

In [None]:
process_urls_colab("TEST.xlsx", "output.xlsx", max_workers=20)

Colab Resource Info:
/bin/bash: line 1: nvidia-smi: command not found
               total        used        free      shared  buff/cache   available
Mem:            12Gi       1.7Gi        10Gi       2.0Mi       928Mi        10Gi
Swap:             0B          0B          0B


Attempt 1:   0%|          | 0/33282 [00:00<?, ?it/s]


Assuming this really is an XML document, what you're doing might work, but you should know that using an XML parser will be more reliable. To parse this document as XML, make sure you have the Python package 'lxml' installed, and pass the keyword argument `features="xml"` into the BeautifulSoup constructor.




  self.soup = BeautifulSoup(self.page_content, 'html.parser')
