In [1]:
from nltk.stem import SnowballStemmer
from nltk import word_tokenize
from nltk.corpus import stopwords
from bs4 import BeautifulSoup as bs

from urllib.parse import urlparse, urljoin
from urllib import parse
from urllib.error import URLError
import urllib.robotparser as urobot

import string
from copy import copy
import requests
import time
import re
import datetime
import dateutil.parser as dparser

import os
import pickle
from shutil import copyfile

import _pickle
import hashlib
import socket
import email.utils as eutils
import datetime

from collections import deque
import sys
from shutil import copyfile

In [2]:
class TextUtils: # TODO: cool name
    @staticmethod
    def text_to_words(text):
        return word_tokenize(text.decode('utf-8').translate(str.maketrans("", "", "()!@#$%^&*_+=?<>~`',…©»")))

    @staticmethod
    def filter_stop_words(words, locales):
        current_words = words
        for locale in locales:
            current_words = [word for word in current_words if word not in stopwords.words(locale)]
        return current_words

    @staticmethod
    def only_words(words):
        return [word for word in words
                if word != "" and
                word[0] not in string.digits and
                word[0] not in string.punctuation]

    @staticmethod
    def extract_places(text):
        pass  # TODO using NLTK

    @staticmethod
    def stem(words, locale):
        stemmer = SnowballStemmer(locale)
        return [stemmer.stem(word) for word in words]

    @staticmethod
    def handle(text, main_locale, locales):
        return TextUtils.stem(
            TextUtils.only_words(TextUtils.filter_stop_words(
                TextUtils.text_to_words(text),
                locales=locales
            )),
            locale=main_locale
        )

In [3]:
"""
Two-pass index
"""

class WordDocInfo:
    def __init__(self, id, count, positions):
        self.id = id
        self.count = count
        self.positions = positions

    def add_position(self, pos):
        self.count += 1
        self.positions.append(pos)

    def serialize(self):
        return '{} {} {}\n'.format(self.id, self.count, ' '.join(map(str, self.positions)))

    @staticmethod
    def deserialize(file):
        word_doc_info = file.readline().decode('utf-8').split()
        id = word_doc_info[0]
        count = int(word_doc_info[1])
        positions = list(map(int, word_doc_info[2:]))
        return WordDocInfo(id, count, positions)


class InvertedIndex:
    def __init__(self):
        self.file_name = None
        self.words_begin = None

    def create_index(self, docs, file_name='inverted_index.txt'):
        self.file_name = file_name
        words_size = {}
        words_doc_count = {}
        for num, doc_id in enumerate(docs):
            cur_words = {}
            text = docs[doc_id].get_text()
            for pos, word in enumerate(text):
                if word in cur_words:
                    cur_words[word].add_position(pos)
                else:
                    cur_words[word] = WordDocInfo(doc_id, 1, [pos])
            for word in cur_words:
                if word not in words_doc_count:
                    words_doc_count[word] = 0
                if word not in words_size:
                    words_size[word] = 0
                words_size[word] += len(cur_words[word].serialize().encode('utf-8'))
                words_doc_count[word] += 1
            if num % 100 == 0:
                print('indexing: passed {} from {}'.format(num, len(docs)))
        for word in words_size:
            words_size[word] += len('{} {}\n'.format(word, words_doc_count[word]).encode('utf-8'))
        self.words_begin = {}
        cur_pos = 0
        for word in words_size:
            self.words_begin[word] = cur_pos
            cur_pos += words_size[word]

        cur_words_begin = copy(self.words_begin)
        open(self.file_name, 'w').close() # Create if didn't exist & wipe
        for word in cur_words_begin:
            file = open(self.file_name, 'r+b')
            file.seek(cur_words_begin[word])
            file.write('{} {}\n'.format(word, words_doc_count[word]).encode('utf-8'))
            file.close()
            cur_words_begin[word] += len('{} {}\n'.format(word, words_doc_count[word]).encode('utf-8'))

        for doc_id in docs:
            cur_words = {}
            text = docs[doc_id].get_text()
            for pos, word in enumerate(text):
                if word in cur_words:
                    cur_words[word].add_position(pos)
                else:
                    cur_words[word] = WordDocInfo(doc_id, 1, [pos])
            for word in cur_words:
                file = open(self.file_name, 'r+b')
                file.seek(cur_words_begin[word])
                serialised_info = cur_words[word].serialize()
                file.write(serialised_info.encode('utf-8'))
                cur_words_begin[word] += len(serialised_info.encode('utf-8'))
                file.close()

    def get_index(self, word):
        if word not in self.words_begin:
            return None
        else:
            file = open(self.file_name, 'rb')
            file.seek(self.words_begin[word])
            cnt_docs = int(file.readline().decode('utf-8').split()[1])
            info = []
            for _ in range(cnt_docs):
                info.append(WordDocInfo.deserialize(file))
            file.close()
            return info


class Document:
    def __init__(self, file_name):
        self.file_name = file_name

    def get_text(self):
        with open(self.file_name, 'rb') as file_from:
            text = file_from.read()
            return TextUtils.handle(text, main_locale='russian', locales=['russian', 'english'])


class TestDoc:
    def __init__(self, text):
        self.text = text

    def get_text(self):
        return self.text.split()

In [4]:
def is_absolute(url):
    return bool(urlparse(url).netloc)


class Page:
    DEFAULT_WAIT_TIME = 1  # 1 second

    def __init__(self, url, useragent, crawl_delay=None):
        self.url = url
        self.headers = {
            'User-Agent': useragent
        }
        self.soup = None
        self.crawl_delay = crawl_delay
        self._page = None

    def retrieve(self):
        try:
            self._page = requests.get(self.url, headers=self.headers)
        except (requests.exceptions.MissingSchema, requests.exceptions.ConnectionError,
                requests.exceptions.InvalidSchema, requests.exceptions.ChunkedEncodingError):
            print("[PAGE -- retrieve] exception")
            return False
        time_to_wait = self.DEFAULT_WAIT_TIME
        if self.crawl_delay is not None \
                and self.DEFAULT_WAIT_TIME > self.crawl_delay:
            time_to_wait = self.crawl_delay
        status_code = self._page.status_code
        time.sleep(time_to_wait)

        if 400 <= status_code < 600:
            # invalidate handler value due to an error
            self.soup = None
            return False
        self.soup = bs(self._page.text, 'html.parser')
        return True

    def extract_urls(self, current_url):
        result = []
        if not self.allow_follow():
            return result
        for html_link in self.soup.find_all('a'):
            link = html_link.get('href')
            if is_absolute(link):
                result.append(link)
            else:
                result.append(urljoin(current_url, link))
        return result

    def allow_cache(self):
        return self.check_permission('NOARCHIVE')

    def allow_index(self):
        return self.check_permission('NOINDEX')

    def allow_follow(self):
        return self.check_permission('NOFOLLOW')

    def check_permission(self, perm):
        for tag in self.soup.find_all('ROBOTS', 'meta'):
            if perm in tag['content'].split(', '):
                return False
        return True

    def get_text(self):
        if self.soup is None:
            return ""
        strings = []
        for div in self.soup.find_all(['div', 'span', 'body']):
            strings.extend([string for string in div.stripped_strings if string != "" and re.search(r'[<>{}=\[\]\|]', string) is None])
        return " ".join(strings)

    @staticmethod
    def text_to_words(text):
        return word_tokenize(text.translate(str.maketrans("", "", "()!@#$%^&*_+=?<>~`',…©»")))

    @staticmethod
    def filter_stop_words(words, locale):
        return [word for word in words if word not in stopwords.words(locale)]

    @staticmethod
    def only_words(words):
        return [word for word in words
                if word != "" and
                word[0] not in string.digits and
                word[0] not in string.punctuation]

    def extract_places(self, text):
        pass  # TODO using NLTK

    @staticmethod
    def stem(words, locale):
        stemmer = SnowballStemmer(locale)
        return [stemmer.stem(word) for word in words]

def url_retriever_factory(url):
    return URLBaseRetriever(url)


class DBEntry:
    def __init__(self, type, time, date, price, city, venue, name):
        self.type = type
        self.time = time
        self.date = date
        self.price = price
        self.city = city
        self.venue = venue
        self.name = name


class URLBaseRetriever:
    def __init__(self, url, type=None, time=None, date=None, price=None, city=None, venue=None, name=None):
        self.url = url

        src = requests.get(url)
        soup = bs(src.text, 'html.parser')
        strings = []
        for div in soup.find_all(['div', 'span', 'body']):
            strings.extend([string for string in div.stripped_strings if string != "" and re.search(r'[<>{}=\[\]\|]', string) is None])

        self.time = time
        self.date = date
        if self.time is None or self.date is None:
            for s in strings:
                try:
                    d = dparser.parse(s, fuzzy=True)
                    if self.time is None:
                        self.time = str(d.time())
                    if self.date is None:
                        self.date = str(d.date())
                    break
                except:
                    continue

        self.type = type
        self.price = price
        self.city = city
        self.venue = venue

        self.name = name
        if self.name is None:
            pieces = url.split('/')
            self.name = pieces[len(pieces) - 1].split('?', 1)[0]

    def get_type(self):
        return self.type

    def get_time(self):
        return self.time

    def get_date(self):
        return self.date

    def get_price(self):
        return self.price

    def get_city(self):
        return self.city

    def get_venue(self):
        return self.venue

    def get_name(self):
        return self.name

    def form_db_entry(self):
        return DBEntry(self.get_type(), self.get_time(), self.get_date(), self.get_price(), self.get_city(), self.get_venue(), self.get_name())

### Crawler

* Мы сохраняем файлы с именем == хеш от url сайта, так как сам url не атируется как название файла
* Для того чтобы потом востановить название url через хеш -- мы храним отдельный файл с описанием: descr.txt
* Каждые 100 итераций происходит сохранение накопившейся базы
* Каждую 1000 итераций происходит 

In [5]:
class Crawler:
    USERAGENT = 'loaferspider'

    def __init__(self, frontier, dir_to_save, dir_checkpoints, checkpoints_name, inv_index_file_name="inv.index"):
        self.dir_checkpoints = dir_checkpoints
        self.frontier = frontier
        self.dir_to_save = dir_to_save
        self.documents = {}
        self.file_description = 'descr.txt'
        self.checkpoints_name = checkpoints_name
        self.steps_count = 0
        self.inv_index_file_name = inv_index_file_name
        self.index = None

    @staticmethod
    def create_file_name(url_hash):
        return 'document_from_url_with_hash_{}'.format(str(url_hash))

    def restore(self):
        with open(os.path.join(self.dir_checkpoints, self.file_description), 'r', encoding='utf-8') as file_descr:
            for line in file_descr:
                url, hash_value = line.strip().split('\t')
                path_to_file = os.path.join(self.dir_to_save, self.create_file_name(hash_value))
                if os.path.exists(path_to_file):
                    self.frontier.add_url(url)
                    self.steps_count += 1
                    self.documents[url] = path_to_file

    def run(self):
        while not self.frontier.done():
            print(self.steps_count)
            self.steps_count += 1
            website = self.frontier.next_site()
            if not website.read_robots_txt():
                continue
            current_url = website.next_url()
            try:
                website_delay = website.get_crawl_delay(self.USERAGENT)
            except AttributeError as e:
                print("[CRAWLER -- run] AttributeError: ", e)
                continue
            page = Page(current_url, self.USERAGENT, website_delay)
            if not page.retrieve():
                continue
            if website.permit_crawl(current_url):
                if page.allow_cache():
                    text = page.get_text()
                    self.store_document(current_url, text)
                urls = page.extract_urls(current_url)
                for url in urls:
                    self.frontier.add_url(url)
            if self.steps_count % 100 == 0:
                self.create_checkpoint(self.steps_count)
            self.frontier.releaseSite(website)
            if self.steps_count % 10000 == 0:
                self.create_index()

    def store_document(self, url, text):
        hash = hashlib.md5()
        hash.update(url.encode('utf-8'))
        hash_value = hash.hexdigest()
        path = os.path.join(self.dir_to_save, self.create_file_name(hash_value))
        with open(path, 'w', encoding='utf-8') as file_to:
            print(text, file=file_to, end='')
        self.documents[url] = path
        return True

    def create_checkpoint(self, count_passed):
        try:
            byte_present = pickle.dumps(self)
        except _pickle.PicklingError:
            print("Error getting pickling!")
            return
        with open(os.path.join(self.dir_checkpoints, self.checkpoints_name), 'wb') as file:
            file.write(byte_present)

        with open(self.file_description, 'w', encoding='utf-8') as descr:
            for url in self.documents:
                print('{}\t{}'.format(url, self.documents[url]), file=descr)
        copyfile(self.file_description, os.path.join(self.dir_checkpoints, self.file_description))
        print('Saved, step passed: {}, urls in queue: {}'.format(self.steps_count, self.frontier.cnt_added))

    def create_index(self):
        

### Sites 
Чтобы кроулить сайты нам нужно хранить следить за временем между кроулингом. Для этого для каждого сайта мы храним когда его опоследний раз кроулели.

scheme -- название протокола

hostname -- название домена в нижнем регистре

In [6]:
class Site:
    _RESPONSE_TIMEOUT = 1

    def __init__(self, scheme, hostname):
        self.hostname = hostname
        self.iter = 0
        self.urls = {0: scheme + '://' + self.hostname}
        self.timestamps = {0: None}
        self._rp = urobot.RobotFileParser()
        self._rp.set_url(scheme + '://' + hostname + '/robots.txt')

    @staticmethod
    def get_last_modified(url):
        try:
            r = requests.get(url)
        except ConnectionError as err:
            print("[SITE -- read_robots_txt] ConnectionError")
            return None
        if 400 <= r.status_code < 600 or 'Last-Modified' not in r.headers:
            return None
        return datetime.datetime(*eutils.parsedate(r.headers['Last-Modified'])[:6])

    def read_robots_txt(self):
        default_timeout = socket.getdefaulttimeout()
        socket.setdefaulttimeout(self._RESPONSE_TIMEOUT)
        try:
            self._rp.read()
            status = True
        except (URLError, ValueError) as e:
            status = False
            print("[SITE -- read_robots_txt] URL- or ValueError: ", e)
        except socket.timeout:
            status = False
            print("[SITE -- read_robots_txt] socket.timeout")
        finally:
            socket.setdefaulttimeout(default_timeout)
        return status

    def update_urls(self, url):
        if url[-1] == '/':
            url = url[:-1]
        pattern = re.compile('\.(css|jpg|pdf|docx|js|png|ico)$')
        if pattern.search(url):
            return False
        if url not in self.urls.values():
            cnt = len(self.urls)
            self.urls[cnt] = url
            self.timestamps[cnt] = None
            return True
        return False

    # returns the first url which needs update (if any) or just hasn't been inspected yet
    def next_url(self):
        # setup
        init_iter = self.iter
        self.iter = (self.iter + 1) % len(self.urls)
        url = self.urls[self.iter]
        timestamp = self.timestamps[self.iter]
        last_modified = self.get_last_modified(url)

        # while we...
        #   * haven't made the whole cycle
        #   * and are sure that the page is up to date
        #     (i.e. our timestamp for it is not None and is later or equal than its last-modified header)
        # -- iterate
        while self.iter != init_iter \
            and timestamp is not None and last_modified is not None \
            and timestamp >= last_modified:
            self.iter = (self.iter + 1) % len(self.urls)
            url = self.urls[self.iter]
            timestamp = self.timestamps[self.iter]
            last_modified = self.get_last_modified(url)
        return url

    def permit_crawl(self, url):
        return self._rp.can_fetch('bot', url)

    def get_crawl_delay(self, useragent):
        try:
            return self._rp.crawl_delay(useragent)
        except AttributeError as e:
            print("[SITE -- get_crawl_delay] url: {0} AttributeError:".format(self.hostname), e)
            return None

### UrlQueue
This queue contain Sites 

Проверяет на существование такого сайта, и добовляет/выдает его из очереди. Храним сами сайты, а не url чтобы следить за временем последнего кроуленга сата.

In [7]:
class UrlQueue:
    def __init__(self):
        self.sites = {}
        self.site_queue = deque()

    def add_url(self, url):
        parsed_url = parse.urlparse(url)
        hostname = parsed_url.hostname
        scheme = parsed_url.scheme
        if scheme is None:
            print(url)
        if hostname not in self.sites.keys():
            if hostname and scheme:
                site = Site(scheme, hostname)
                self.sites[hostname] = site
            else:
                return False
        else:
            site = self.sites[hostname]

        self.site_queue.append(site)

        return site.update_urls(url)

    def next_site(self):
        site = self.site_queue.popleft()
        self.site_queue.append(site)
        return site

    def release_site(self, site):
        self.site_queue.append(site)

    def has_next_site(self):
        return len(self.site_queue) != 0

    def add_url_if_site_exists(self, url):
        parsed_url = parse.urlparse(url)
        hostname = parsed_url.hostname
        if hostname in self.sites.keys():
            site = self.sites[hostname]
            self.site_queue.append(site)
            return site.update_urls(url)
        return False



### Frontier
It's same sort of Wrapper over Queue in wich we contain all urls

In [8]:
class Frontier:
    def __init__(self, seed_urls, docs_bound):
        self.cnt_added = 0
        for seed_url in seed_urls:
            self.queue = UrlQueue()
            self.queue.add_url(seed_url)
            self.docs_bound = docs_bound
            self.cnt_added +=1

    def done(self):
        return not self.queue.has_next_site()

    def next_site(self):
        return self.queue.next_site()

    def add_url(self, url):
        if self.cnt_added < self.docs_bound:
            self.cnt_added += 1
            self.queue.add_url(url)
        elif self.cnt_added < self.docs_bound*2:
           if self.queue.add_url_if_site_exists(url):
               self.cnt_added += 1

    def releaseSite(self, site):
        self.queue.release_site(site)

# Main Crawler
Create dirictory

In [9]:
dir_for_docs = 'documents'
if not os.path.exists(dir_for_docs):
    os.mkdir(dir_for_docs)

dir_checkpoints = 'checkpoints'
if not os.path.exists(dir_checkpoints):
    os.mkdir(dir_checkpoints)

### Const define

In [10]:
checkpoints_name = 'checkpoints.save'
pages_bound = 10000
seeds = ['https://search.crossref.org', 'https://arxiv.org']

#### Define Crawler

In [11]:
if not os.listdir(dir_checkpoints):
    print('No checkpoints were found.')
    frontier = Frontier(seeds, pages_bound)
    crawler = Crawler(frontier, dir_for_docs, dir_checkpoints, checkpoints_name)
    if os.path.exists(os.path.join(dir_checkpoints, crawler.file_description)):
        copyfile(os.path.join(dir_checkpoints, crawler.file_description), crawler.file_description)
    else:
        open(crawler.file_description, 'w').close() # Wipe file
else:
    with open(os.path.join(dir_checkpoints, checkpoints_name), 'rb') as check_file:
        crawler_loaded = pickle.load(check_file)

    frontier = Frontier(seeds, pages_bound)
    crawler = Crawler(frontier, dir_for_docs, dir_checkpoints, checkpoints_name)

    if (crawler_loaded.__dict__.keys() == crawler.__dict__.keys()) \
            and (crawler_loaded.frontier.__dict__.keys() == crawler.frontier.__dict__.keys()):
        crawler = crawler_loaded
        print('Found checkpoints. Loaded crawler. Count urls in queue is {}'.format(crawler.frontier.cnt_added))
    else:
        print('Found checkpoints. Unable to load crawler. Load sources.')
        if os.path.exists(os.path.join(dir_checkpoints, crawler.file_description)):
            copyfile(os.path.join(dir_checkpoints, crawler.file_description), crawler.file_description)
        else:
            open(crawler.file_description, 'w').close() # Wipe file
        crawler.restore()
        print(crawler.steps_count)



No checkpoints were found.


#### Run Crawler

In [12]:
crawler.run()

0
1
2
3
4
5
6
[SITE -- read_robots_txt] URL- or ValueError:  <urlopen error _ssl.c:761: The handshake operation timed out>
7
[SITE -- read_robots_txt] URL- or ValueError:  <urlopen error _ssl.c:761: The handshake operation timed out>
8
9
10
11
12
13
14
15
[SITE -- read_robots_txt] socket.timeout
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
[SITE -- read_robots_txt] URL- or ValueError:  <urlopen error _ssl.c:761: The handshake operation timed out>
141
[SITE -- read_robots_txt] URL- or ValueError:  <urlopen error _ssl.c:761: The handshake operation timed out>
142
143
144
145
146
147
148


ConnectionError: ('Connection aborted.', ConnectionResetError(54, 'Connection reset by peer'))