In [1]:
import asyncio
import nest_asyncio
nest_asyncio.apply()
loop = asyncio.get_event_loop()

In [2]:
from motor.motor_asyncio import AsyncIOMotorClient


def create_client(host: str, username: str,
                  password: str, port: int,
                  db_name: str) -> AsyncIOMotorClient:
    return AsyncIOMotorClient(
            f"mongodb://{username}:{password}@{host}:{port}/{db_name}?authSource=admin")


In [3]:
from typing import Any, List


class AsyncCRUDBase(object):

    @staticmethod
    async def get(db: Any, query: Any, **kwargs):
        return NotImplemented

    @staticmethod
    async def delete(db: Any, query: Any, **kwargs):
        return NotImplemented

    @staticmethod
    async def insert_many(db: Any, data: Any, **kwargs):
        return NotImplemented

    async def save(self, db, collection):
        return NotImplemented


class AsyncMongoCRUDBase(AsyncCRUDBase):
    """ Provides minimal support for writing to MongoDB
    """
    
    @staticmethod
    async def get(collection: Any,  query: Any, **kwargs) -> List[object]:
        result = [data async for data in collection.find(query)]
        return result

    @staticmethod
    async def delete(collection: Any, query: Any, **kwargs):
        return NotImplemented

    @staticmethod
    async def insert_many(collection: Any, data: Any, **kwargs):
        await collection.insert_many(data)

    async def save(self, collection):
        return NotImplemented


In [4]:
from pydantic import BaseModel
from bson import ObjectId
from datetime import datetime, date
from typing import Optional, List

class MongoModel(BaseModel, AsyncMongoCRUDBase):

    class Config:
        allow_population_by_field_name = True
        json_encoders = {
            datetime: lambda dt: dt.isoformat(),
            ObjectId: lambda oid: str(oid),
        }

    @classmethod
    def from_mongo(cls, data: dict):
        """We must convert _id into "id". """
        if not data:
            return data
        id = data.pop('_id', None)
        return cls(**dict(data, id=id))

    def mongo(self, **kwargs):
        exclude_unset = kwargs.pop('exclude_unset', True)
        by_alias = kwargs.pop('by_alias', True)

        parsed = self.dict(
            exclude_unset=exclude_unset,
            by_alias=by_alias,
            **kwargs,
        )

        # Mongo uses `_id` as default key. We should stick to that as well.
        if '_id' not in parsed and 'id' in parsed:
            parsed['_id'] = parsed.pop('id')

        return parsed
    
    @staticmethod
    async def insert_many(collection: Any, data: List[AsyncMongoCRUDBase], **kwargs):
        await collection.insert_many([d.mongo() for d in data])

    @classmethod
    async def get(cls, collection: Any,  query: Any, **kwargs) -> List[object]:
        result = [cls.from_mongo(data) async for data in collection.find(query)]
        return result

    async def save(self, db, collection_name:str):
        try:
            await db[collection_name].insert_one(self.mongo())
        except Exception as e:
            print(e)


In [5]:
import re

domain_pattern = re.compile("^(?:https?:\/\/)?(?:[^@\/\n]+@)?(?:www\.)?([^:\/?\n]+)")

In [6]:
from enum import Enum


class JobState(str, Enum):
    PENDING= 'pending'
    DONE = 'done'
    WORKING = 'working'
    FAILED = 'failed'


class ContentType(str, Enum):
    WEBPAGE: str = 'webpage'
    IMAGE: str = 'image'
    AUDIO: str = 'audio'
    VIDEO: str = 'video'


class JobType(str, Enum):
    """ Job types supported by spiders

    BASIC_PAGE_SCRAPING: only scrape the provided urls and return the html of those urls,
    SEARCH_RESULT_AGGREGATION: perform searches on search engines or general search page and retrieve their results,
    WEB_CRAWLING: Start from seed urls, follow all links available.
    """
    BASIC_PAGE_SCRAPING: str = 'basic_page_scraping'
    SEARCH_RESULT_AGGREGATION: str = 'search_result_aggregation'
    # WEB_CRAWLING: str = 'web_crawling'

In [7]:
class KeywordRules(BaseModel):
    include: List[str] = []
    exclude: List[str] = []


class SizeLimit(BaseModel):
    max_pages: Optional[int]
    max_size: Optional[int]


class TimeRange(BaseModel):
    past_days: Optional[int]
    date_before: Optional[date]
    date_after: Optional[date]


class RegexPattern(BaseModel):
    patterns: Optional[List[str]] = []


class ScrapeRules(BaseModel):
    """ Describes rules a spider should follow

    Fields:
        keywords: Optional[KeywordRules]
        size_limit: Optional[SizeLimit]
        time_range: Optional[TimeRange]
        regular_expressions: Optional[RegexPattern]
        max_retry: Optional[int] = 1  
    """
    keywords: Optional[KeywordRules]
    size_limit: Optional[SizeLimit]
    time_range: Optional[TimeRange]
    regular_expressions: Optional[RegexPattern]
    max_retry: Optional[int] = 1



class JobSpecification(BaseModel):
    """ Describes what kind of task a spider should perform

    Fields:
        urls: List[str]
        job_type: JobType
        scrape_rules: ScrapeRules
        data_collection: str = 'test'
    """
    urls: List[str]
    job_type: JobType
    scrape_rules: ScrapeRules
    data_collection: str = 'test'
    job_collection: str = "jobs"

In [8]:
from typing import Optional, List, Any
from datetime import datetime, timedelta

class JobStatus(MongoModel):
    job_id: str
    create_dt: datetime
    page_count: int = 0
    time_consumed: Optional[timedelta]
    current_state: JobState
    specification: JobSpecification


In [9]:
job_spec = JobSpecification(
    urls=['http://www.qq.com',
          "http://www.taobao.com",
          "http://www.baidu.com",
          'http://www.guancha.cn',
          'http://www.sina.com.cn']*5,
    job_type=JobType.BASIC_PAGE_SCRAPING,
    scrape_rules=ScrapeRules(
        sizelimit=SizeLimit(max_pages=10)
    )
)
job_spec

JobSpecification(urls=['http://www.qq.com', 'http://www.taobao.com', 'http://www.baidu.com', 'http://www.guancha.cn', 'http://www.sina.com.cn', 'http://www.qq.com', 'http://www.taobao.com', 'http://www.baidu.com', 'http://www.guancha.cn', 'http://www.sina.com.cn', 'http://www.qq.com', 'http://www.taobao.com', 'http://www.baidu.com', 'http://www.guancha.cn', 'http://www.sina.com.cn', 'http://www.qq.com', 'http://www.taobao.com', 'http://www.baidu.com', 'http://www.guancha.cn', 'http://www.sina.com.cn', 'http://www.qq.com', 'http://www.taobao.com', 'http://www.baidu.com', 'http://www.guancha.cn', 'http://www.sina.com.cn'], job_type=<JobType.BASIC_PAGE_SCRAPING: 'basic_page_scraping'>, scrape_rules=ScrapeRules(keywords=None, size_limit=None, time_range=None, regular_expressions=None, max_retry=1), data_collection='test', job_collection='jobs')

In [10]:
from uuid import uuid4
job_status = JobStatus(
            job_id=str(uuid4()),
            create_dt=datetime.now(),
            page_count=0,
            specification=job_spec,
            current_state=JobState.PENDING,
            time_consumed=timedelta(seconds=0))
job_status

JobStatus(job_id='5f8bd784-a939-4f4e-89c4-9985642d2271', create_dt=datetime.datetime(2021, 5, 31, 0, 47, 27, 189535), page_count=0, time_consumed=datetime.timedelta(0), current_state=<JobState.PENDING: 'pending'>, specification=JobSpecification(urls=['http://www.qq.com', 'http://www.taobao.com', 'http://www.baidu.com', 'http://www.guancha.cn', 'http://www.sina.com.cn', 'http://www.qq.com', 'http://www.taobao.com', 'http://www.baidu.com', 'http://www.guancha.cn', 'http://www.sina.com.cn', 'http://www.qq.com', 'http://www.taobao.com', 'http://www.baidu.com', 'http://www.guancha.cn', 'http://www.sina.com.cn', 'http://www.qq.com', 'http://www.taobao.com', 'http://www.baidu.com', 'http://www.guancha.cn', 'http://www.sina.com.cn', 'http://www.qq.com', 'http://www.taobao.com', 'http://www.baidu.com', 'http://www.guancha.cn', 'http://www.sina.com.cn'], job_type=<JobType.BASIC_PAGE_SCRAPING: 'basic_page_scraping'>, scrape_rules=ScrapeRules(keywords=None, size_limit=None, time_range=None, regula

In [11]:
import asyncio
import time


def timeit(func):
    async def process(func, *args, **params):
        if asyncio.iscoroutinefunction(func):
            print('this function is a coroutine: {}'.format(func.__name__))
            return await func(*args, **params)
        else:
            print('this is not a coroutine')
            return func(*args, **params)

    async def helper(*args, **params):
        print('{}.time'.format(func.__name__))
        start = time.time()
        result = await process(func, *args, **params)

        # Test normal function route...
        # result = await process(lambda *a, **p: print(*a, **p), *args, **params)

        print('>>>', time.time() - start)
        return result

    return helper


In [12]:
from typing import Optional, List
from datetime import datetime

class URL(BaseModel):
    """ Holds an url and its domain name.

    If domain name is not specified, it will be guessed from the url

    Fields:
        url: str
        domain: Optional[str]
    """
    url: str
    domain: Optional[str] = None

    def __init__(self, **data: Any) -> None:
        super().__init__(**data)
        parsed_domain = domain_pattern.findall(self.url)

        if self.domain is None and len(parsed_domain):
            # auto fills domain name if not provided
            self.domain = parsed_domain[0]


class HTMLData(MongoModel):
    """ Builds a html data representation

    Fields:
        url: URL
        html: str
        create_dt: datetime
        job_id: Optional[str]
        keywords: Optional[List[str]] = []
    """
    url: URL
    html: str
    create_dt: datetime
    job_id: Optional[str]
    keywords: Optional[List[str]] = []


In [13]:
def periodic(period):
    def scheduler(fcn):

        async def wrapper(*args, **kwargs):

            while True:
                asyncio.create_task(fcn(*args, **kwargs))
                await asyncio.sleep(period)

        return wrapper

    return scheduler

In [14]:
from abc import ABC
from typing import Callable
import aiohttp

class AsyncIterator:
    def __init__(self, seq):
        self.iter = iter(seq)

    def __aiter__(self):
        return self

    async def __anext__(self):
        try:
            return next(self.iter)
        except StopIteration:
            raise StopAsyncIteration


class BaseSpiderService(ABC):
    """ Defines common interface for spider services.
    """

    def get(self, data_src: URL) -> Any:
        return NotImplemented

    def get_many(self, data_src: List[URL], rules: Any) -> Any:
        return NotImplemented

class HTMLSpiderService(BaseSpiderService):

    def __init__(self, session: aiohttp.ClientSession, job_id: str = None):
        BaseSpiderService.__init__(self)
        self.session = session
        self.html_data: List[HTMLData] = []
        self.job_id = job_id
        self.page_count = 0

    async def get(self, data_src: URL) -> None:
        async with self.session.get(data_src.url) as response:
            html = await response.text()
            return html

    async def get_many(self, data_src: List[str], rules: ScrapeRules,
                       async_db_action: Callable = None, async_in_progress_callback: Callable = None,
                       async_job_done_callback: Callable = None, execute_job_callback_interval: int = 1,
                       **kwargs) -> None:
        """ Get html data given the data source
        
        Pass callback coroutines to this method when using a BackgroundTask scheduler.

        Args: 
            data_src: List[str]
            rules: ScrapeRules
            async_in_progress_callback: corountine for handling job status during scraping
            async_job_done_callback: corountine for handling job status after
            async_db_action: coroutine for handling database operations
            execute_job_callback_interval: time interval for executing in_progress_callback
            kwargs: arguments for callbacks
        """
        self.html_data = []
        self.page_count = 0
        
        loop = asyncio.get_running_loop()
        
        async def scrape(url):
            target_url = URL(url=url)
            html = await self.get(target_url)
            html_data = HTMLData(url=target_url, html=html,
                                 create_dt=datetime.now(),
                                 job_id=self.job_id)
            self.page_count += 1
            self.html_data.append(html_data)
            
        @periodic(1)
        async def tick():
#             if async_in_progress_callback:
            await asyncio.sleep(1)
            print("tik tok")
        
        def unblock_periodic_task(task):
            return lambda task: task.cancel()
            
        async def execute_db_action_after(tasks, periodic_task):
            await asyncio.gather(*tasks)
            if async_db_action:
                await async_db_action(data=self.html_data, **kwargs)
                periodic_task.cancel()
            print("done")
        
        tasks = [scrape(url) for url in data_src]
        periodic_in_progress_task = loop.create_task(tick())
#         loop.call_later(1, unblock_periodic_task(periodic_in_progress_task))
        db_task = asyncio.create_task(execute_db_action_after(tasks, periodic_in_progress_task))
        
#         await periodic_in_progress_task
        await db_task
        
            
        return self.html_data


In [15]:
session = aiohttp.ClientSession()

In [16]:
spider = HTMLSpiderService(session, job_id=str(uuid4()))

In [21]:
job_spec = JobSpecification(
    urls=[
          "http://www.taobao.com",
          "http://www.baidu.com",
          'http://www.guancha.cn',
          'http://www.sina.com.cn'],
    job_type=JobType.BASIC_PAGE_SCRAPING,
    scrape_rules=ScrapeRules(
        sizelimit=SizeLimit(max_pages=10)
    )
)

In [22]:
client = create_client(username="admin", password="root", host="localhost", port=27017, db_name="spiderDB")
test_collection = client.spiderDB.test

tik tok


In [23]:
start_time = time.time()
data = asyncio.run(spider.get_many(job_spec.urls, job_spec.scrape_rules, async_db_action=HTMLData.insert_many, collection=test_collection))
used_time = time.time() - start_time
print(f"used {used_time} seconds")
print(f"Collected {len(data)} entries")

done
tik tok
tik tok
done
used 2.119396209716797 seconds
Collected 4 entries
tik tok


In [17]:
@timeit
async def test_scrape(spider):
    data = await spider.get_many(job_spec.urls, job_spec.scrape_rules, async_db_action=HTMLData.insert_many, collection=test_collection)
    return data

In [23]:
time.time()

1622383686.52325

In [73]:
import asyncio

async def factorial(name, number):
    f = 1
    for i in range(2, number + 1):
        print(f"Task {name}: Compute factorial({number}), currently i={i}...")
        await asyncio.sleep(1)
        f *= i
    print(f"Task {name}: factorial({number}) = {f}")
    return f

async def main():
    # Schedule three calls *concurrently*:
    L = await asyncio.gather(
        factorial("A", 2),
        factorial("B", 3),
        factorial("C", 4),
    )
    print(L)

asyncio.run(main())


Task A: Compute factorial(2), currently i=2...
Task B: Compute factorial(3), currently i=2...
Task C: Compute factorial(4), currently i=2...
Task A: factorial(2) = 2
Task B: Compute factorial(3), currently i=3...
Task C: Compute factorial(4), currently i=3...
Task B: factorial(3) = 6
Task C: Compute factorial(4), currently i=4...
Task C: factorial(4) = 24
[2, 6, 24]


In [None]:
async def find_all(test_collection):
    print(test_collection)
    data = [HTMLData(**d) async for d in test_collection.find({})]
    return data

In [None]:
test_data = HTMLData(url=URL(url='http://www.bbc.com'), html='<p>news</p>', create_dt=datetime(1976, 5, 28, 4, 21, 11, 901000), job_id='1', keywords=[])

In [None]:
asyncio.run(test_data.save(client.spiderDB, 'test'))

In [None]:
asyncio.run(HTMLData.get(client.spiderDB.test, {}))

In [None]:
from uuid import uuid4

In [None]:
test_set = [HTMLData(url=URL(url=f'http://www.{s}.com'), html=f'<p>{s}</p>', create_dt=datetime.now(), job_id=str(uuid4()), keywords=[]) for s in "abcde" ]

In [None]:
test_set

In [None]:
asyncio.run(HTMLData.insert_many(client.spiderDB.test, test_set))