Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP] Implement Scrapy source #320

Closed
wants to merge 55 commits into from
Closed

[WIP] Implement Scrapy source #320

wants to merge 55 commits into from

Conversation

sultaniman
Copy link
Contributor

@sultaniman sultaniman commented Jan 12, 2024

Hey,

NOTE: This PR is still WIP

This PR implements Scrapy source which set's up communication between threads via queue, see the image below

scrapy

Also in spider.py we have our own base spider class DLTSpiderBase which provides very basic logic and as reference you can see our generic DLTSpider which is used only when callbacks are specified.

NOTE:

  1. If custom spider is specified then it has a higher priority thus our source will ignore it,
  2. Else if spider is not provided then it expects two callbacks on_result, on_next_page

on_result callback must be a generator because we want to progressively yield results to dlt pipeline

def parse(response: Response) -> Generator[Dict, None, None]:
    for quote in response.css("div.quote"):
        yield {
            "quote": {
                "text": quote.css("span.text::text").get(),
                "author": quote.css("small.author::text").get(),
                "tags": quote.css("div.tags a.tag::text").getall(),
            },
        }

on_next_page should provide the next page or None value thus indicating the end of crawling process and will stop the pipeline.

def next_page(response: Response) -> Optional[str]:
    return response.css("li.next a::attr(href)").get()

TODO

  • Write readme and manual,
  • README
  • Tests

Copy link
Collaborator

@burnash burnash left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Great progress, see my comments above.

@sultaniman
Copy link
Contributor Author

@burnash Thanks for comments and guidelines, I will address your concerns.

Copy link
Contributor

@rudolfix rudolfix left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@sultaniman good job! it was not easy to put those complicated things together :) please look at my comments

  • better use resource not dlt source
  • I have a few edge cases for the queue and I do not understand how spider really works - happy to discuss that
  • IMO we need better thread runner for dlt and scrapy

My other idea to feed dlt via scrapy pipeline https://docs.scrapy.org/en/latest/topics/item-pipeline.html#topics-item-pipeline
what is good about it is that (IMO) you can use standard scrapy spiders and scrapy cli to run scraping
still (most probably) you'd use queue and a thread to send items to dlt but architecture should be way way easier so please take a look


### 🛡️ Custom spider vs callbacks

`build_scrapy_source` accepts callbacks and a custom spider implementation. If custom spider is provided then it will be used while callbacks are skipped, if you instead resort to custom callbacks then we will use our generic `spider.DLTSpider` which takes care of calling them and
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if possible, link to original scrapy docs, where writing spider is explained. we should aim to minimize what user needs to learn in order to use the this source. s probably you could mention that you write the spider in usual way just derive it from our base class.

Scraping source allows you to scrape content from web and uses [Scrapy](https://doc.scrapy.org/en/latest/)
to enable this capability.

## 🧠 How it works?
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IMO such technical details should come at the end. people interested in scrapers are mostly data scientists and they are interested in usage, not in internals (IMO :))

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fair, I will move this kind of information to the very bottom or a separate markdown file.


__all__ = ["build_scrapy_source"]

logger = logging.getLogger(__file__)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

please use logger from dlt.common

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will change it.

yield get_scraping_results(queue=queue)


def build_scrapy_source(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd use a little different structure here

  1. do not use decorators to create source and resource
  2. do not resource at all. create dlt.resource dynamically here and return it
  3. thanks to apply_hints method users are able to modify such resources as they wish ie. create dynamic table names etc.
  4. please check with_config decorator and pass all configuration using this and specs: https://dlthub.com/docs/general-usage/credentials/config_specs#writing-custom-specs

"""Builder which configures scrapy and Dlt pipeline to run in and communicate using queue"""
if not spider and not (on_next_page and on_result):
logger.error("Please provide spider or lifecycle hooks")
raise RuntimeError(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think raising exception is enough.

# If next page is available
# Then we create next request
# Else we stop spider because no pages left
next_page = self.on_next_page(response)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have a feeling you reimplement scrapy internals here

# Else we stop spider because no pages left
next_page = self.on_next_page(response)
if next_page is not None:
next_page = response.urljoin(next_page)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hmmmm why? this is how they do it in their spiders:

for next_page in response.css('a.next'):
            yield response.follow(next_page, self.parse)



OnNextPage = Callable[[Response], Optional[str]]
OnResult = Callable[[Response], Generator[Any, None, None]]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Iterator[Any] == Generator[Any, None, None]]


d = runner.join()
d.addBoth(lambda _: reactor.stop()) # type: ignore[attr-defined]
reactor.run() # type: ignore[attr-defined]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

here you should

  1. wait for scrapy to finish (all spiders stopped yielding)
  2. make sure that all messages got consumed from the queue
  3. close the queue at the very end

) -> None:
"""Convenience method which handles the order of starting of pipeline and scrapy"""
logger.info("Starting scraping pipeline")
pipeline_thread_runner = threading.Thread(target=pipeline_runner)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we need a better "host" for dlt. at the very least if should close the queue if exception occurred or if dlt exited and queue is not yet closed. if possible the dlt exception should be re-raised in main thread

@sultaniman
Copy link
Contributor Author

Closing this in favor of #332

@sultaniman sultaniman closed this Jan 25, 2024
@sultaniman sultaniman deleted the source/scrapy branch January 25, 2024 20:36
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

3 participants