# Scraper
```
State := Current site, collected data, links, times, etc
Init := (Inital site, 0, 0, 0, ...)
Goal := (*, specific amount of data, *, timeout, ...)
Action := Start | Collect | Travel | Stop | Exit
    Start := Execute the scraper in S if it is valid
    Collect := Collect useful data and links. Test whether it is new or not
    Travel := Move S2 via links
    Stop := Stop during a little time
    Exit := Exit the scraper
Transition :=
    (Start, S1) -> (Collect, S1) | (Travel, S1, S2 in links) | (Stop, S1) | (Exit, S1)
    (Collect, S1) -> (Travel, S1, S2 in links) | (Stop, S1) | (Exit, S1)
    (Travel, S1, S2) -> (Start, S2)
    (Stop, S1) -> (Collect, S1)
    (Exit, S1) -> Done
Action Cost := Equal
```

In [342]:
# Abstract Class
import time
import random
from functools import wraps
from operator import lt, eq
from copy import copy, deepcopy

from dataclasses import dataclass
from abc import ABCMeta, abstractmethod

import requests
from urllib.error import URLError
from requests.exceptions import HTTPError
from urllib.robotparser import RobotFileParser


@dataclass
class State:
    def __init__(self, site: str, data: dict = dict(), links: set = set(), times:float = 0.):
        self.site = site
        self.data = data
        self.links = links
        self.times = times
    
    def __copy__(self):
        cls = self.__class__
        result = cls.__new__(cls)
        result.__dict__.update(self.__dict__)
        return result

    def __deepcopy__(self, memo):
        cls = self.__class__
        result = cls.__new__(cls)
        memo[id(self)] = result
        for k, v in self.__dict__.items():
            setattr(result, k, deepcopy(v, memo))
        return result

class ABCScraper(metaclass=ABCMeta):
    def __init__(self, init:State, goal:State):
        self.__init = init
        self.__current = deepcopy(self.__init)
        self.__goal = goal
        self.lastStep = None
    
    @property
    def initState(self):
        return self.__init
    @property
    def currentState(self):
        return self.__current
    @property
    def goalState(self):
        return self.__goal
    
    def __canFetch():
        robots = dict()
        
        def canFetch(self, path:"uri", ua:"user-agent" = '*') -> bool:
            url = urljoin(path, "/robots.txt")
            
            if url in robots.keys():
                return robots[url]
            
            try:
                robotParser =  RobotFileParser(url)
                robotParser.read()
                robots[url] = robotParser.can_fetch(ua, path)
                return robots[url]
            except HTTPError as e:
                if e.status_code // 100 == 4:
                    robots[url] = True # robots.txt doesn't exist. So, the access is permitted
                else:
                    robots[url] = False
                return robots[url]
            except URLError:
                raise # Invalid URL Error

        return canFetch
    canFetch = __canFetch()
    
    def meet_unexpected_error(e:"Error"):
        print("Unexpected Error")
        print(e)

    # Run
    def run(self):
        print("Running Scraper")
        return self.start()
    
    # Action
    def start(self):
        # Vaild URL Test
        url = self.currentState.site
        try:
            # Check robots.txt
#             if self.canFetch(url):
#                 #Stop randomly
#                 return self.collect() if random.randint(1, 100) > 10 else self.stop()
#             else:
#                 print("Robots.txt doesn't permit a scraper")
            
#             link = self.select_link()
#             if link:
#                 return self.travel(link)
#             else:
#                 self.lastStep = 'Start'
#                 return self.exit()

            # Check robots.txt but ignore it
            if not self.canFetch(url):
                print("Robots.txt doesn't permit a scraper")
            return self.collect() if random.randint(1, 100) > 10 else self.stop()
        except URLError:
            print("ERROR: URLError")
            return self.exit()
        except Exception as e:
            ABCScraper.meet_unexpected_error(e)
            return self.exit()
    
    @abstractmethod
    def select_link(self):
        '''Return selected link or None'''
        pass
    
    def collect(self):
        try:
            self.collect_by_custom()
        except HTTPError as e:
            if e.response.status_code // 100 == 5: #5xx
                return self.stop()
            print("Code", e.response.status_code)
            print("Reason", e.response.reason)
            print("Req Header", e.request.headers)
            return self.exit()
        except Exception as e:
            ABCScraper.meet_unexpected_error(e)
            return self.exit()
            
        else:
            if not self.is_exit():
                link = self.select_link()
                if link:
                    return self.travel(link)
            self.lastStep = 'Collect'
            return self.exit()
    
    @abstractmethod
    def collect_by_custom(self):
        '''make collect algorithm'''
        pass
    
    def travel(self, link):
        self.currentState.site = link
        return self.start()
    
    def stop(self):
        time.sleep(random.randint(1, 10))
        return self.collect()
        
    def exit(self):
        print("Last step is", self.lastStep)
        print("Last site is", self.currentState.site)
        print("Collected data size is", len(set(self.currentState.data)))
        print("Remaining links is", len(set(self.currentState.links)))
        print("Collect time is", self.currentState.times, 'secs')
        return self.currentState
    
    @abstractmethod
    def is_exit(self):
        '''make exit condition. True if exit'''
        pass

In [264]:
import re
import json
from itertools import chain
from requests.compat import urljoin
from bs4 import BeautifulSoup
from enum import Enum, unique, auto

@unique
class MailType(Enum):
    SPAM = auto()
    NORMAL = auto()
    
    def __str__(self):
        mail_type = ''
        match self:
            case MailType.SPAM:
                mail_type = 'SPAM'
            case MailType.NORMAL:
                mail_type = 'NORMAL'
        return mail_type

@dataclass
class Data:
    sender: str
    date: datetime.date
    title: str
    content: str
    mail_type: MailType
        
    def to_dict(self):
        return {'sender': self.sender, 'date': self.date, 'title': self.title,\
                'content': self.content, 'mail_type': str(self.mail_type)}
    

# Scraper
class GMailScraper(ABCScraper):
    def __init__(self, init:State, goal:State, configPath:"Safari config file path" = "mail.google.com.har"):
        '''
        HTTP Gmail url: https://mail.google.com/mail/u/0/h/
        How to get config file:
            1. Go to Safari
            2. Click network in development-tools
            3. Click "export" at right-side
        '''
        super().__init__(init, goal)
        url, headers = self.__parse_config(configPath)
        if not url or not headers:
            print("GMailScraper cannot be initialized")
            return None
        self.currentState.site = url
        self.currentState.data = iter([])
        self.headers = headers
        
    def __parse_config(self, configPath)->("url", "headers"):
        try:
            fp = open(configPath, 'r')
            config = json.load(fp)
            fp.close()    
        except FileNotFoundError:
            print("Invalid configuration Path")
            return None, None
        except OSError:
            print("Invalid configuration Path")
            return None, None
        except Exception as e:
            Scraper.meet_unexpected_error(e)
            return None, None
            
        else:
            try:
                req = config['log']['entries'][0]['request']
                headers = {attr['name']:attr['value'] for attr in req['headers']}
                return req['url'], headers
            except KeyError:
                print("Invalid configuration file")
                return None, None
            except IndexError:
                print("Invalid configuration file")
                return None, None
            except Exception as e:
                Scraper.meet_unexpected_error(e)
                return None, None
                
        

    def collect_by_custom(self):
        try:
            res = requests.get(self.currentState.site, headers=self.headers)
            res.raise_for_status()
        except HTTPError as e:
            print(e.response.status_code)
            print(e.response.reason)
            print(e.request.headers)
        except URLError as e:
            print("URLError")
        except Exception as e:
            Scraper.meet_unexpected_error(e)
        else:
            self.__collect_data(res)
            self.__collect_links(res)
        return
    
    
    def __collect_data(self, res):
        dom = BeautifulSoup(res.text, 'lxml')
        sender = dom.find('table', {'class': 'h'}).next_sibling.find('table').find('h3').text
        
        date = dom.find('table', {'class': 'h'}).next_sibling.find('table').find('td').next_sibling.text
        date = re.search(r'^(\d+)년 (\d+)월 (\d+)일', date)
        date = datetime.date(int(date.group(1)), int(date.group(2)), int(date.group(3)))
        
        title = dom.find('table', {'class': 'h'}).find('h2').text

        content = dom.find('div', {'class': 'msg'}).text
        content = re.sub(r'(\s)+', r'\1', content).strip()
        
        if '스팸함' in dom.find('a', {'class': 'searchPageLink'}).text:
            mail_type = MailType.SPAM
        else:
            mail_type = MailType.NORMAL
            
        datum = Data(sender, date, title, content, mail_type)
        self.currentState.data = chain(self.currentState.data, [ datum ])
        
    
    def __collect_links(self, res):
        '''
        link := [next_main_page, mail_pages]
        if mail_pages == empty: next main_page is selected & collect mail_pages
        '''
        
        if len(self.currentState.links) == 1: # empty mail_pages
        dom = BeautifulSoup(res.text, 'lxml')
        links = dom.find('table', {'class': 'ft'}).previous_sibling.find_all('a', {'class':'searchPageLink'})
        
        mail_url = dom.find('span', {'class': 'ts'}).parent['href']
        mail_url = urljoin(url, mail_url)
        
        
        for link in links:
            # raw val = 이전   ›
            form = re.sub(r'›', '', link.get_text()).strip()
            match form:
                case "이전":
                    previous = link['href']
        self.currentState.links.add((previous, recent, latest))
        return
        
    
    def select_link(self):
        '''
        select mail_pages not main_page
        '''
        if not len(self.currentState.links):
            return None
        previous, _, _ = self.currentState.links.pop()
        return urljoin(self.currentState.site, previous) if previous else None
    
    def is_exit(self):
        return True if not len(self.currentState.links) else False

In [266]:
x = GMailScraper(State(""), State(""), "mail.google.com.har")
s = x.run()
s.data

Running Scraper
Robots.txt doesn't permit a scraper
Last step is Collect
Last site is https://mail.google.com/mail/u/0/h/1cosfy7y37n1h/?f=1
Collected data size is 6
Remaining links is 0
Collect time is 0.0 secs


ChainMap({}, {'YouTube Premium': '축하합니다 | YouTube Premium 회원이 되셨습니다 - 광고 없는 YouTube와 YouTube Music YouTube Premium YouTube Premium에 오신 것을 환영합니다 광고 없는 YouTube와 YouTube Music을 즐기세요. 멤버십 혜택을 모두 알아보세요. 광고 없는 감상과 백그라운드 재생 광고 없이 동영상을 시청하고 다른 앱을 사용하면서 계속 들을 수 있습니다 오프라인 저장 좋아하는', 'The Google Account Team': 'Xxxx님, Google 계정 설정을 확인하여 Mac 기기에서 다음 단계를 진행하세요 - Xxxx님, 안녕하세요 Mac 기기에서 Google에 로그인해 주셔서 감사합니다 Google 계정 설정이 알맞게 구성되어 있는지 확인해 주세요 개인 정보 보호 진단 완료하기 단계별 안내를 통해 나에게 알맞은 개인 정보 보호 설정을 선택하세요 완료하기 보안 진단 완료하기 보안 진단에서 계정 보안을 강화하기 위한 맞춤 권장사항을 확인하세요 완료하기 Google', 'YouTube': 'Welcome to YouTube Premium! - Hi Xxxx, Welcome to your YouTube Premium membership! Your 1-month trial begins immediately. Your payment method will be charged monthly once your trial ends. You can explore, manage, and cancel your', 'Google Play': 'Google Play 주문 영수증(2022. 6. 23.) - Google Play 감사합니다. Google Play에서 Google Ireland Limited의 무료 체험판을 신청하셨습니다. 무료 체험판은 2022. 7. 23.에 종료됩니다. 취소하지 않으면 무료 체험판이 끝난 후 자동으로 (현재 매월 ₩10450)의 정기 결제 

In [337]:
import requests
import re
from bs4 import BeautifulSoup
from requests.compat import urljoin
import datetime

scraper = GMailScraper(State(""), State(""), )

url = scraper.currentState.site
headers = scraper.headers

html = requests.get(url, headers=headers)
dom = BeautifulSoup(html.text, 'lxml')
mail_url = dom.find('span', {'class': 'ts'}).parent['href']
mail_url = urljoin(url, mail_url)
mail_html = requests.get(mail_url, headers=headers)
mail_dom = BeautifulSoup(mail_html.text, 'lxml')
title = mail_dom.find('table', {'class': 'h'}).find('h2').text # title
print(title)

sender = mail_dom.find('table', {'class': 'h'}).next_sibling.find('table').find('h3').text # sender
print(sender)
date = mail_dom.find('table', {'class': 'h'}).next_sibling.find('table').find('td').next_sibling.text # date
date = re.search(r'^(\d+)년 (\d+)월 (\d+)일', date)
date = datetime.date(int(date.group(1)), int(date.group(2)), int(date.group(3)))
print(date)

content = mail_dom.find('div', {'class': 'msg'}).text # content
content = re.sub(r'(\s)+', r'\1', content).strip() # content

## Spam-mail
spam_url = dom.find('table', {'class': 'm'}).find(string='스팸함').parent['href']
spam_url = urljoin(url, spam_url)

'스팸함' in mail_dom.find('a', {'class': 'searchPageLink'}).text # SPAM check

Xxxx님, Google 계정 설정을 확인하여 Mac 기기에서 다음 단계를 진행하세요
The Google Account Team
2022-06-26


False

In [274]:
html.text

'<html lang="ko"><pre style="font-size: 0;display: none;visibility: hidden;">\n\n\n</pre><head><meta http-equiv=Content-Type content="text/html; charset=UTF-8"><title>Gmail - 받은편지함</title><link rel="canonical" href="https://mail.google.com/mail/"/><link rel="shortcut icon" href="https://ssl.gstatic.com/ui/v1/icons/mail/rfr/gmail.ico" type="image/x-icon"><link rel="stylesheet" type="text/css" href="https://mail.google.com/mail/u/0/h/_//?&amp;name=c&amp;ver=6wrl3yi4mm9e&amp;v=ss" nonce="MkQohKz_P4V8dv9v4k-rOA"><style type="text/css" nonce="MkQohKz_P4V8dv9v4k-rOA">\n@import url("https://mail.google.com/mail/u/0/h/_//?&name=a&ver=1sthyqjwgl8hj&v=ss"); #gbar,#guser{font-size:13px;padding-right:8px;padding-top:4px !important;}#gbar{padding-left:8px;height:22px}#guser{padding-bottom:7px !important;text-align:right}.gbh,.gbd{border-top:1px solid #c9d7f1;font-size:1px}.gbh{height:0;position:absolute;top:24px;width:100%}@media all{.gb1{height:22px;margin-right:.5em;vertical-align:top}#gbar{float

# DB

```
Relation Scheme(mail)

Data := json format [{"sender": sender, "date": date, "title": title, "content": content, "type":type}, ...]
Sender := CHAR(50)
Date := Date
Title := CHAR(100)
Content := TEXT
Type := BOOLEAN (TRUE if Spam else Normal)


Query := SaveQuery | SearchQuery
SaveQuery := ("SAVE", _)
SearchQuery := ("SEARCH", CONDITIONS SQL SYNTAX like title="test")
```

```
State := (data, query)
Init := (data, saveQuery) | (_, SearchQuery)
Goal := (_, saveQuery) | (data, SearchQuery)

Action := start | connect | create | save | search | exit
    start := Start db jobs
    connect := Make a connection to db
    create := Create relation scheme
    save := Save data in db
    search := Execute searchQuery in db
    exit := Exit the db jobs.
    
Transition
    (start, S1) := (connect, S1) | (save, S1) | (search, S1) | (exit, S1)
    (connect, S1) := (start, S1) | (exit, S1)
    (save, S1) := (create, S1) | (exit, S1)
    (create, S1) := (save, S1) | (exit, S1)
    (search, S1) := (exit, S1)
    (exit, S1) := Done
    
Action Cost := Manually defined
```

In [252]:
import random
from abc import ABCMeta, abstractmethod, abstractproperty
from dataclasses import dataclass
from enum import Enum, auto, unique
import datetime

@unique
class QueryType(Enum):
    SAVE = auto()
    SEARCH = auto()
    
    def __str__(self):
        query_type = ''
        match self:
            case QueryType.SAVE:
                query_type = 'SAVE'
            case QueryType.SEARCH:
                query_type = 'SEARCH'
        return query_type

@dataclass
class State:
    data: list()
    query: tuple()
        
class ABCSingleton(metaclass=ABCMeta):
    __instances = {}
    def __call__(cls, *args, **kwargs):
        if cls not in __instances.keys():
            cls.__instances[cls] = super().__call__(*args, **kwargs)
        return cls.__instances[cls]
    
    
class ABCDataBase(ABCSingleton):
    def __init__(self, init:State, db:"DataBase", table_name: "Relation"):
        '''
        Init := (data, saveQuery) | (_, SearchQuery)
        '''
        self.lastStep = "Init"
        self.__state = init
        self.db = db
        self.table_name = table_name
        
        self.__conns = []
        self.__conn = None
    
    @property
    def state(self):
        return self.__state
    
    @state.setter
    def state(self, new):
        self.__state = new
    
    @property
    def conn(self):
        '''Current connection'''
        return self.__conn
    @conn.setter
    def conn(self, new):
        self.__conn = new
    
    @property
    def conns(self):
        '''Connection pool'''
        return self.__conns
    
    def run(self):
        print("Run...")
        print(f"DB: {self.db}")
        print(f"Table: {self.table_name}")
        print(f"Mode: {str(self.state.query[0])}")
        return self.start()
    
    def start(self):
        self.lastStep = "Start"
        
        if not len(self.conns):
            return self.connect()
        self.conn = self.conns.pop()
        
        queryType = self.state.query[0]
        match queryType:
            case QueryType.SAVE:
                return self.save()
            case QueryType.SEARCH:
                return self.search()
            case _:
                print("Unexpected querytype")
                return self.exit()
    
    @abstractmethod
    def connect(self):
        '''Make a connection to db & Save it in __conns'''
        pass
    
    @abstractmethod
    def save(self):
        '''Save data in relation'''
        pass
    
    @abstractmethod
    def create(self):
        '''Create relation Scheme'''
        pass
    
    @abstractmethod
    def search(self):
        '''Search query & Fill it data in status'''
        pass
        
        
    def exit(self):
        print("Last step is", self.lastStep)
        return self.state
    

In [253]:
import sqlite3

class SQLiteDB(ABCDataBase):
    def connect(self):
        self.lastStep = "Connect"
        try:
            conn = sqlite3.connect(self.db)
            self.conns.append(conn)
            return self.start()
        except sqlite3.Error as e:
            print("An error occured:", e.args[0])
            return self.exit()
        
    def save(self):
        self.lastStep = "Save"
        try:
            with self.conn as conn:
                cur = conn.cursor()
                cur.execute('''
                SELECT name FROM sqlite_schema
                WHERE type = 'table' AND name NOT LIKE 'sqlite_%';
                ''')
                tables = cur.fetchall()
                if not any([True if self.table_name in table else False for table in tables]):
                    print("Enter create")
                    return self.create()
                cur.executemany(f'''
                    INSERT INTO {self.table_name}(sender, date, title, content, type)
                    VALUES(:sender, :date, :title, :content, :mail_type);
                ''', self.state.data)
                conn.commit()
        except sqlite3.Error as e:
            print("An error occured:", e.args[0])
        
        return self.exit()
        
    def create(self):
        self.lastStep = "Create"
        try:
            with self.conn as conn:
                cur = conn.cursor()
                cur.execute(f'''
                    CREATE TABLE {self.table_name}(
                        id INTEGER PRIMARY KEY AUTOINCREMENT,
                        sender CHAR(100) NOT NULL,
                        date DATE NOT NULL,
                        title CHAR(200) NOT NULL,
                        content TEXT,
                        type CHAR(10) NOT NULL
                    );
                    ''')
                conn.commit()
            return self.save()
        except sqlite3.Error as e:
            print("An error occured:", e.args[0])
            return self.exit()
            
    def search(self):
        self.lastStep = "Search"
        try:
            with self.conn as conn:
                cur = conn.cursor()
                condition = self.state.query[1]
                if condition:
                    cur.execute(f'''SELECT * FROM {self.table_name} WHERE {condition}''')
                else:
                    cur.execute(f'''SELECT * FROM {self.table_name}''')
                data = [{'sender': sender, 'date': date, 'title': title, 'content': content, 'type': mail_type} \
                        for _, sender, date, title, content, mail_type in cur.fetchall()]
                self.state.data = data
                
        except sqlite3.Error as e:
            print("An error occured:", e.args[0])
        except Exception as e:
            print(e)
        finally:
            return self.exit()
            



In [254]:
data = Data("yyr", datetime.date.today(), "Greeting", "hello", MailType.NORMAL)
data.to_dict()

db = SQLiteDB(State([data.to_dict()], (QueryType.SAVE, _)), "test.db", "mail")
#db.run()

db.state = State(_, (QueryType.SEARCH, ''))
x = db.run()
print(x)

Run...
DB: test.db
Table: mail
Mode: SEARCH
Last step is Search
State(data=[{'sender': 'yyr', 'date': '2022-07-13', 'title': 'Greeting', 'content': 'hello', 'type': 'NORMAL'}, {'sender': 'yyr', 'date': '2022-07-13', 'title': 'Greeting', 'content': 'hello', 'type': 'NORMAL'}], query=(<QueryType.SEARCH: 2>, ''))


In [346]:
from itertools import chain

next(iter([]))

StopIteration: 