Skip to content

Commit

Permalink
Merge pull request #44 from damklis/develop
Browse files Browse the repository at this point in the history
Develop
  • Loading branch information
damklis committed Nov 11, 2020
2 parents 5981a5e + e5434a8 commit 7a62492
Show file tree
Hide file tree
Showing 11 changed files with 150 additions and 80 deletions.
2 changes: 1 addition & 1 deletion .coveragerc
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
[run]
omit =
*test*.py
*main.py
*operator.py

[report]
exclude_lines =
Expand Down
2 changes: 2 additions & 0 deletions airflow/dags/custom_operators/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
from custom_operators.proxypool_operator import ProxyPoolOperator
from custom_operators.rss_news_operator import RSSNewsOperator
55 changes: 55 additions & 0 deletions airflow/dags/custom_operators/proxypool_operator.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
import json
from concurrent.futures import ThreadPoolExecutor
from retry import RetryOnException as retry
from proxypool import (
ProxyPoolValidator,
ProxyPoolScraper,
RedisProxyPoolClient
)

from airflow.models.baseoperator import BaseOperator
from airflow.utils.decorators import apply_defaults


class ProxyPoolOperator(BaseOperator):

@apply_defaults
def __init__(
self,
proxy_webpage,
number_of_proxies,
testing_url,
max_workers,
redis_config,
redis_key,
*args, **kwargs):
super().__init__(*args, **kwargs)
self.proxy_webpage = proxy_webpage
self.testing_url = testing_url
self.number_of_proxies = number_of_proxies
self.max_workers = max_workers
self.redis_config = redis_config
self.redis_key = redis_key

@retry(5)
def execute(self, context):
proxy_scraper = ProxyPoolScraper(self.proxy_webpage)
proxy_validator = ProxyPoolValidator(self.testing_url)
proxy_stream = proxy_scraper.get_proxy_stream(self.number_of_proxies)

with ThreadPoolExecutor(max_workers=self.max_workers) as executor:
results = executor.map(
proxy_validator.validate_proxy, proxy_stream
)
valid_proxies = filter(lambda x: x.is_valid is True, results)
sorted_valid_proxies = sorted(
valid_proxies, key=lambda x: x.health, reverse=True
)

with RedisProxyPoolClient(self.redis_key, self.redis_config) as client:
client.override_existing_proxies(
[
json.dumps(record.proxy)
for record in sorted_valid_proxies[:5]
]
)
57 changes: 57 additions & 0 deletions airflow/dags/custom_operators/rss_news_operator.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
from log import log
from retry import RetryOnException as retry
from proxypool import RedisProxyPoolClient
from rss_news import (
NewsProducer,
NewsExporter,
NewsValidator
)

from airflow.models.baseoperator import BaseOperator
from airflow.utils.decorators import apply_defaults


@log
class RSSNewsOperator(BaseOperator):

@apply_defaults
def __init__(
self,
validator_config,
rss_feed,
language,
redis_config,
redis_key,
bootstrap_servers,
topic,
*args, **kwargs):
super().__init__(*args, **kwargs)
self.validator_config = validator_config
self.rss_feed = rss_feed
self.language = language
self.redis_config = redis_config
self.redis_key = redis_key
self.bootstrap_servers = bootstrap_servers
self.topic = topic

@retry(5)
def execute(self, context):
validator = NewsValidator(self.validator_config)
producer = NewsProducer(self.rss_feed, self.language)
redis = RedisProxyPoolClient(self.redis_key, self.redis_config)

with NewsExporter(self.bootstrap_servers) as exporter:
proxy = redis.get_proxy()
self.logger.info(proxy)
try:
for news in producer.get_news_stream(proxy):
self.logger.info(news)
validator.validate_news(news)
exporter.export_news_to_broker(
self.topic,
news.as_dict()
)
except Exception as err:
redis.lpop_proxy()
self.logger.error(f"Exception: {err}")
raise err
35 changes: 24 additions & 11 deletions airflow/dags/rss_news_dag.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,10 @@
from airflow.operators.python_operator import PythonOperator

from dags_config import Config as config
from rss_news import export_news_to_broker
from proxypool import update_proxypool
from custom_operators import (
ProxyPoolOperator,
RSSNewsOperator
)


def extract_feed_name(url):
Expand All @@ -19,12 +21,15 @@ def dummy_callable(action):

def export_events(config, rss_feed, language, dag):
feed_name = extract_feed_name(rss_feed)
return PythonOperator(
return RSSNewsOperator(
task_id=f"exporting_{feed_name}_news_to_broker",
python_callable=export_news_to_broker,
op_kwargs={
"config": config, "rss_feed": rss_feed, "language": language
},
validator_config=config.VALIDATOR_CONFIG,
rss_feed=rss_feed,
language=language,
redis_config=config.REDIS_CONFIG,
redis_key=config.REDIS_KEY,
bootstrap_servers=config.BOOTSTRAP_SERVERS,
topic=config.TOPIC,
dag=dag
)

Expand All @@ -46,10 +51,14 @@ def create_dag(dag_id, interval, config, language, rss_feeds):
dag=dag
)

proxypool = PythonOperator(
proxypool = ProxyPoolOperator(
task_id="updating_proxypoool",
python_callable=update_proxypool,
op_kwargs={"config": config},
proxy_webpage=config.PROXY_WEBPAGE,
number_of_proxies=config.NUMBER_OF_PROXIES,
testing_url=config.TESTING_URL,
max_workers=config.NUMBER_OF_PROXIES,
redis_config=config.REDIS_CONFIG,
redis_key=config.REDIS_KEY,
dag=dag
)

Expand All @@ -76,5 +85,9 @@ def create_dag(dag_id, interval, config, language, rss_feeds):
interval = f"{n*4}-59/10 * * * *"

globals()[dag_id] = create_dag(
dag_id, interval, config, language, rss_feeds
dag_id,
interval,
config,
language,
rss_feeds
)
1 change: 0 additions & 1 deletion airflow/modules/proxypool/__init__.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
from proxypool.redis_proxypool_client import RedisProxyPoolClient
from proxypool.main import update_proxypool
from proxypool.proxypool_scraper import ProxyPoolScraper, ProxyRecord
from proxypool.proxypool_validator import ProxyPoolValidator
25 changes: 0 additions & 25 deletions airflow/modules/proxypool/main.py

This file was deleted.

3 changes: 1 addition & 2 deletions airflow/modules/rss_news/__init__.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
from rss_news.main import export_news_to_broker
from rss_news.rss_news_producer import NewsProducer, NewsFormatter, News
from rss_news.rss_news_exporter import NewsExporter
from rss_news.rss_news_validator import NewsValidator
from rss_news.rss_news_validator import NewsValidator
30 changes: 0 additions & 30 deletions airflow/modules/rss_news/main.py

This file was deleted.

2 changes: 1 addition & 1 deletion minio/create_default_bucket.sh
Original file line number Diff line number Diff line change
Expand Up @@ -3,4 +3,4 @@
echo "Creating default bucket $DEFAULT_BUCKET"

mkdir -p /data/$DEFAULT_BUCKET \
&& /usr/bin/minio server /data
&& /usr/bin/minio server /data
18 changes: 9 additions & 9 deletions run_tests.sh
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
#!/bin/sh

pip install -r airflow/requirements.txt \
&& py.test airflow/modules/tests/ --doctest-modules --cov airflow/modules --show-capture=no -v \
&& flake8 -v \
&& docker-compose up -d mongo \
&& sleep 10 \
&& docker exec -it mongo /usr/local/bin/init.sh \
&& docker-compose up -d api \
&& sleep 30 \
&& docker exec -it api ./manage.py test -k \
&& docker-compose down
&& py.test airflow/modules/tests/ --doctest-modules --cov airflow/modules --show-capture=no -v \
&& flake8 -v \
&& docker-compose up -d mongo \
&& sleep 10 \
&& docker exec -it mongo /usr/local/bin/init.sh \
&& docker-compose up -d api \
&& sleep 30 \
&& docker exec -it api ./manage.py test -k \
&& docker-compose down

0 comments on commit 7a62492

Please sign in to comment.