In [None]:
from google.colab import drive
drive.mount('/content/gdrive')

Drive already mounted at /content/gdrive; to attempt to forcibly remount, call drive.mount("/content/gdrive", force_remount=True).


In [None]:
# TODO
# code crashing when folder already exist because of makedirs
# create folders dinamicaly to run with different depths without intervention
# save metrics on file 

ROOT_URL = '<url inicial aqui. Ex. www.teste.com.br>'
# ROOT_DIR = './'
ROOT_DIR = 'diretório/onde/você/vai/salvar'
MAX_DEPTH = 8

## UrlManager


In [None]:
from urllib.parse import urlparse
import re

class UrlManager:
  def __init__(self, root_url):
    self.__root_url = root_url
    self.__url_parsed = urlparse(root_url)
  
  @staticmethod
  def is_same_domain(root_url, url):
    root_domain = urlparse(root_url).netloc
    url_domain = urlparse(url).netloc
    return root_domain == url_domain

  @staticmethod
  def get_name_from_url(url):
    # url = re.compile(r"https?://(www\.)?")
    name_regex = re.compile(r"https?://(www\.)?")
    return name_regex.sub('', url).strip().strip('/')
  
  @staticmethod
  def get_domain(url):
    return urlparse(url).netloc
  
  @staticmethod
  def get_html_info(url):
    parsed = urlparse(url)
    root = UrlManager.get_name_from_url(parsed.scheme+'://'+parsed.netloc)
    path = parsed.path
    name = path.split('/')[-1]
    if len(name) and name.split('.')[-1] in ['html','htm']:
      return {
        "root": root,
        "path": '/'.join(path.split('/')[:-1]),
        "filename": name,
      }
    if len(name):
      return {
        "root": root,
        "path": path,
        "filename": "index.html",
      }
    return {
      "root": root,
      "path": '',
      "filename": "index.html",
    }
    
  @staticmethod
  def get_name_of_file(url):
    parsed = urlparse(url)
    path = parsed.path
    last_name = path.split('/')[-1]
    return last_name
  
  @staticmethod
  def has_query_strings(url):
    parsed = urlparse(url)
    return parsed.query != ''
  
  @staticmethod
  def sanitize_links(root_url, links):
    if type(links) == 'string':
      links = [links]
    
    sanitized_links = []
    for link in links:
      if len(link) == 0:
        continue
      elif link[0] == 'h':
        sanitized_links.append(link)
      elif link[0] == '/':
        sanitized_links.append(root_url + link)
      else:
        continue
      parsed_url = urlparse(sanitized_links[-1])
      if len(parsed_url) == 0:
        sanitized_links.pop()
    
    return sanitized_links


## Logger

In [None]:
import os

RELATIVE_PATH = 'logs'

class Logger:
  def __init__(self, class_name, log_filename):
    try:
      os.makedirs(ROOT_DIR + RELATIVE_PATH)
    except:
      pass
    self.__file = open(ROOT_DIR + RELATIVE_PATH + '/' + log_filename + '.txt', 'w+')
    self.__class_name = class_name
    self.__prefix = '[{}]: '.format(class_name)
  
  def log(self, message):
    self.__file.write(self.__prefix + message + '\n')

## Metrics


In [None]:
import time
from enum import Enum

COUNTER_IMAGES_DOWNLOADED = 'COUNTER_IMAGES_DOWNLOADED'
COUNTER_HTML_DOWNLOADED = 'COUNTER_HTML_DOWNLOADED'
COUNTER_JS_DOWNLOADED = 'COUNTER_JS_DOWNLOADED'
COUNTER_CSS_DOWNLOADED = 'COUNTER_CSS_DOWNLOADED'
COUNTER_DIFFERENT_DOMAINS_AVOIDED = 'COUNTER_DIFFERENT_DOMAINS_AVOIDED'
COUNTER_QUERY_STRINGS_AVOIDED = 'COUNTER_QUERY_STRINGS_AVOIDED'
COUNTER_URLS_VISITED = 'COUNTER_URLS_VISITED'
COUNTER_FAILED_REQUESTS = 'COUNTER_FAILED_REQUESTS'

GAUGE_QUEUE_LENGTH = 'GAUGE_QUEUE_LENGTH'

class Metrics:
  _instance = None

  def __init__(self):
    self.__start = None
    self.__end = None
    self.counters = {}
    self.gauges = {}

  @classmethod
  def instance(cls):
    if cls._instance is None:
      cls._instance = cls()
    return cls._instance
  
  def start_timer(self):
    if self.__start:
      print('Timer was started before. Reseting timer with the current time...')
    self.__start = time.time()
  
  def stop_timer(self):
    if not self.__start:
      print('Cannot stop timer before it was started')
      return
    self.__end = time.time()
  
  def increment_counter(self, counter_name):
    self.counters[counter_name] = self.counters.get(counter_name, 0) + 1
  
  def set_gauge(self, gauge_name, value):
    if self.gauges.get(gauge_name) == None:
      self.gauges[gauge_name] = {
          "maximum": value,
          "current": value,
          "minimum": value,
      }
    
    self.gauges[gauge_name]["current"] = value
    self.gauges[gauge_name]["minimum"] = min(self.gauges[gauge_name]["minimum"], value)
    self.gauges[gauge_name]["maximum"] = max(self.gauges[gauge_name]["maximum"], value)
  
  def show_statistics(self):
    time = None
    if self.__start and self.__end:
      time = self.__end - self.__start
    else:
      time = 'Timer not used'
    print('Running time: ', time)
    
    for key in self.counters.keys():
      print('{}: {}'.format(key, self.counters[key]))
    
    for key in self.gauges.keys():
      print('{}: min({}) max({}) curr({})'.format(key, self.gauges[key]["minimum"], self.gauges[key]["maximum"], self.gauges[key]["current"]))


## FileManager

In [None]:
import os
from enum import Enum
# from UrlManager import UrlManager
import shutil
# from Logger import Logger
# from Metrics import *

'''
classe responsável por salvar os arquivos adequadamente
'''

REPLACE_LABEL = 'root_directory'
DIR_IMAGES = 'root_directory/assets/images'
DIR_CSS = 'root_directory/assets/css'
DIR_JS = 'root_directory/assets/js'
DIR_HTML = 'root_directory/html'

class FileType(Enum):
  HTML = 0
  CSS = 1
  JS = 2
  IMAGE = 3

class FileManager:
  def __init__(self, root_url, root_path):
    self.__root_path = root_path
    self.__website_name = UrlManager.get_name_from_url(root_url)
    self.__html_folder = self.__create_folder(DIR_HTML)
    self.__css_folder = self.__create_folder(DIR_CSS)
    self.__js_folder = self.__create_folder(DIR_JS)
    self.__images_folder = self.__create_folder(DIR_IMAGES)
    self.__html_files_saved = set()
    self.__css_files_saved = set()
    self.__js_files_saved = set()
    self.__images_files_saved = set()
    self.logger = Logger('FileManager', 'filemanager')
    self.metrics = Metrics.instance()
  
  def save_file(self, url, content, file_type):
    file_saver = self.__get_correct_saver(file_type)
    file_saver(url, content)
  
  def open(self, path, mode):
    file = open(self.__root_path+path, mode)
    return file

  def makedirs(self, path):
    try:
      os.makedirs(self.__root_path + path)
    except (FileExistsError):
      self.logger.log("[ERRO] cannot create a directory with path {}".format(path))
  
  def __create_folder(self, directory):
    folder_path = directory.replace(REPLACE_LABEL, self.__website_name)
    self.makedirs(folder_path)
    return folder_path
  
  def __get_correct_saver(self, file_type):
    savers = {
      FileType.HTML: self.__html_saver,
      FileType.CSS: self.__css_saver,
      FileType.JS: self.__js_saver,
      FileType.IMAGE: self.__image_saver,
    }
    return savers[file_type]

  def __html_saver(self, url, content):
    if url in self.__html_files_saved:
      return
    html_info = UrlManager.get_html_info(url)
    directory = self.__html_folder + '/' + html_info["root"] + html_info["path"]
    self.makedirs(directory)
    
    try:
      file = self.open(directory+'/'+html_info["filename"], 'w+')
      file.write(content.text)
      file.close()
      self.metrics.increment_counter(COUNTER_HTML_DOWNLOADED)
    except:
      self.logger.log("[ERROR] cannot save html {} with name {}".format(url, html_info["filename"]))
    self.__html_files_saved.add(url)
  
  def __js_saver(self, url, content):
    if url in self.__js_files_saved:
      return
    filename = UrlManager.get_name_of_file(url)
    try:
      file = self.open(self.__js_folder+'/'+filename, 'w+')
      file.write(content.text)
      file.close()
      self.metrics.increment_counter(COUNTER_JS_DOWNLOADED)
    except:
      self.logger.log("[ERROR] cannot save js {} with name {}".format(url, filename))
    self.__js_files_saved.add(url)
    
  def __css_saver(self, url, content):
    if url in self.__css_files_saved:
      return
    filename = UrlManager.get_name_of_file(url)
    try:
      file = self.open(self.__css_folder+'/'+filename, 'w+')
      file.write(content.text)
      file.close()
      self.metrics.increment_counter(COUNTER_CSS_DOWNLOADED)
    except:
      self.logger.log("[ERROR] cannot save css {} with name {}".format(url, filename))
    self.__css_files_saved.add(url)
  
  def __image_saver(self, url, content):
    if url in self.__images_files_saved:
      return
    filename = UrlManager.get_name_of_file(url)
    try:
      file = self.open(self.__images_folder+'/'+filename, 'wb')
      content.raw.decode_content = True
      shutil.copyfileobj(content.raw, file)
      file.close()
      self.metrics.increment_counter(COUNTER_IMAGES_DOWNLOADED)
    except:
      self.logger.log("[ERROR] cannot save image {} with name {}".format(url, filename))
    self.__images_files_saved.add(url)
    del content

# if __name__ == '__main__':
#   a = FileManager('www.amazon.com.br')

        

## ParseManager

In [None]:
from bs4 import BeautifulSoup

class ParseManager:
  @staticmethod
  def get_links_to_navigate(requests_response):
    html = requests_response.text
    soup = BeautifulSoup(html, features="html.parser")
    all_tags_a = soup.find_all('a', href=True)
    links = []
    for tag in all_tags_a:
      links.append(tag['href'])
    return links

  @staticmethod
  def get_images_urls(requests_response):
    html = requests_response.text
    soup = BeautifulSoup(html, features="html.parser")
    all_tags_img = soup.find_all('img', src=True)
    srcs = []
    for tag in all_tags_img:
      srcs.append(tag['src'])
    
    return srcs

  @staticmethod
  def get_js_urls(requests_response):
    html = requests_response.text
    soup = BeautifulSoup(html, features="html.parser")
    all_tags_script = soup.find_all('script', src=True)
    srcs = []
    for tag in all_tags_script:
      srcs.append(tag['src'])
    
    return srcs

  # [TODO] tem outros tipos alem de stylesheet
  @staticmethod
  def get_css_urls(requests_response):
    html = requests_response.text
    soup = BeautifulSoup(html, features="html.parser")
    all_tags_link = soup.find_all('link', href=True)
    srcs = []
    for tag in all_tags_link:
      if 'stylesheet' in tag['rel']:
        srcs.append(tag['href'])
    
    return srcs

## RequestsManager

In [None]:
import requests
# from Logger import Logger

headers = {'user-agent': 'Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/80.0.3987.132 Safari/537.36'}

class RequestsManager:
  def __init__(self, root_url):
    self.root_url = root_url
    self.logger = Logger('RequestsManager', 'failed_requests')
    self.metrics = Metrics.instance()

  def simple_request(self, url):
    r = None
    try:
      r = requests.get(url, headers=headers)
      self.metrics.increment_counter(COUNTER_URLS_VISITED)
    except:
      self.logger.log("[ERROR] cannot request {}".format(url))
      self.metrics.increment_counter(COUNTER_FAILED_REQUESTS)
    return r

  def request_image(self, url):
    r = None
    try:
      r = requests.get(url, headers=headers, stream=True)
      self.metrics.increment_counter(COUNTER_URLS_VISITED)
    except:
      self.logger.log("[ERROR] cannot request image {}".format(url))
      self.metrics.increment_counter(COUNTER_FAILED_REQUESTS)
    return r


## RequestsQueue

In [None]:
from collections import deque
# from UrlManager import UrlManager
# from Logger import Logger
# from Metrics import *

# MAX_DEPTH = 1

class RequestsQueue:
  def __init__(self, root_url):
    self.__deque = deque()
    self.__urls_visited = set()
    self.__root_url = root_url
    self.logger_qs = Logger('RequestsQueue', 'query_strings')
    self.logger_domain = Logger('RequestsQueue', 'different_domain')
    self.metrics = Metrics.instance()
    self.current_depth = 0
    self.current_depth_counter = 0
  
  def add(self, url, depth):
    if not UrlManager.is_same_domain(self.__root_url, url):
      self.metrics.increment_counter(COUNTER_DIFFERENT_DOMAINS_AVOIDED)
      self.logger_domain.log("[INFO] url with domain {} was ignored".format(UrlManager.get_domain(url)))
      return
    if UrlManager.has_query_strings(url):
      self.metrics.increment_counter(COUNTER_QUERY_STRINGS_AVOIDED)
      self.logger_qs.log("[INFO] url with query strings was ignored: {}".format(url))
      return
    if depth >= MAX_DEPTH:
      return
    if url not in self.__urls_visited:
      self.__deque.append((url, depth))
      self.__urls_visited.add(url)
    self.metrics.set_gauge(GAUGE_QUEUE_LENGTH, len(self.__deque))
  
  def get(self):
    if len(self.__deque):
      next_url = self.__deque.popleft()
      self.metrics.set_gauge(GAUGE_QUEUE_LENGTH, len(self.__deque))
      if next_url[1] != self.current_depth:
        print("depth {} finished: {} urls".format(self.current_depth, self.current_depth_counter))
        self.current_depth = next_url[1]
        self.current_depth_counter = 1
        return next_url
      self.current_depth_counter += 1
      return next_url
    return None

  @property
  def is_queue_empty(self):
    return len(self.__deque) == 0

## main


In [None]:
# import requests
# import FileManager
# from FileManager import FileType
# from RequestsQueue import RequestsQueue
# from RequestsManager import RequestsManager
# import ParseManager as parser
# from UrlManager import UrlManager
# from Metrics import *

req = RequestsManager(ROOT_URL)
a = FileManager(ROOT_URL, ROOT_DIR)
queue = RequestsQueue(ROOT_URL)
queue.add(ROOT_URL, 0)

metrics = Metrics.instance()

def handle_imported_file_img(response):
  image_urls = ParseManager.get_images_urls(response)
  for image_url in image_urls:
    try:
      image_stream = req.request_image(image_url)
      a.save_file(image_url, image_stream, FileType.IMAGE)
    except:
      print("[LOG] error saving/requesting img file")

def handle_imported_file_js(response):
  links = ParseManager.get_js_urls(response)
  for link in links:
    try:
      js_file = req.simple_request(link)
      a.save_file(link, js_file, FileType.JS)
    except:
      print("[LOG] error saving/requesting js file")

def handle_imported_file_css(response):
  links = ParseManager.get_css_urls(response)
  for link in links:
    try:
      css_file = req.simple_request(link)
      a.save_file(link, css_file, FileType.CSS)
    except:
      print("[LOG] error saving/requesting css file")

def main():
  metrics.start_timer()
  while not queue.is_queue_empty:
    url, depth = queue.get()
    response = req.simple_request(url)
    a.save_file(url, response, FileType.HTML)
    links = ParseManager.get_links_to_navigate(response)
    links = UrlManager.sanitize_links(ROOT_URL, links)
    for l in links:
      queue.add(l, depth + 1)
    
    handle_imported_file_img(response)
    handle_imported_file_js(response)
    handle_imported_file_css(response)
  
  metrics.stop_timer()
  metrics.show_statistics()

if __name__ == '__main__':
  try:
    main()
  except (KeyboardInterrupt):
    print("Finished with Ctrl+C")

depth 0 finished: 1 urls
depth 1 finished: 76 urls
depth 2 finished: 21 urls
depth 3 finished: 9 urls
Running time:  128.2797451019287
COUNTER_URLS_VISITED: 1206
COUNTER_HTML_DOWNLOADED: 110
COUNTER_QUERY_STRINGS_AVOIDED: 1010
COUNTER_DIFFERENT_DOMAINS_AVOIDED: 495
COUNTER_FAILED_REQUESTS: 62
COUNTER_IMAGES_DOWNLOADED: 363
COUNTER_CSS_DOWNLOADED: 21
COUNTER_JS_DOWNLOADED: 1
GAUGE_QUEUE_LENGTH: min(0) max(79) curr(0)
