Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Trigger text/wait #187

Merged
merged 11 commits into from
Aug 16, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
3 changes: 2 additions & 1 deletion backend/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -404,7 +404,8 @@ def edit_page(uuid):
'tag': form.tag.data.strip(),
'title': form.title.data.strip(),
'headers': form.headers.data,
'fetch_backend': form.fetch_backend.data
'fetch_backend': form.fetch_backend.data,
'trigger_text': form.trigger_text.data
}

# Notification URLs
Expand Down
47 changes: 37 additions & 10 deletions backend/fetch_site_status.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,9 @@
from inscriptis import get_text
import urllib3
from . import html_tools
import re

urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
from selenium import webdriver
from selenium.webdriver.common.desired_capabilities import DesiredCapabilities


# Some common stuff here that can be moved to a base class
Expand Down Expand Up @@ -57,6 +56,8 @@ def run(self, uuid):
changed_detected = False
stripped_text_from_html = ""

watch = self.datastore.data['watching'][uuid]

update_obj = {'previous_md5': self.datastore.data['watching'][uuid]['previous_md5'],
'history': {},
"last_checked": timestamp
Expand All @@ -81,7 +82,7 @@ def run(self, uuid):
url = self.datastore.get_val(uuid, 'url')

# Pluggable content fetcher
prefer_backend = self.datastore.data['watching'][uuid]['fetch_backend']
prefer_backend = watch['fetch_backend']
if hasattr(content_fetcher, prefer_backend):
klass = getattr(content_fetcher, prefer_backend)
else:
Expand All @@ -94,8 +95,15 @@ def run(self, uuid):
# Fetching complete, now filters
# @todo move to class / maybe inside of fetcher abstract base?

# @note: I feel like the following should be in a more obvious chain system
# - Check filter text
# - Is the checksum different?
# - Do we convert to JSON?
# https://stackoverflow.com/questions/41817578/basic-method-chaining ?
# return content().textfilter().jsonextract().checksumcompare() ?

is_html = True
css_filter_rule = self.datastore.data['watching'][uuid]['css_filter']
css_filter_rule = watch['css_filter']
if css_filter_rule and len(css_filter_rule.strip()):
if 'json:' in css_filter_rule:
stripped_text_from_html = html_tools.extract_json_as_string(content=fetcher.content, jsonpath_filter=css_filter_rule)
Expand All @@ -107,7 +115,6 @@ def run(self, uuid):
if is_html:
# CSS Filter, extract the HTML that matches and feed that into the existing inscriptis::get_text
html_content = fetcher.content
css_filter_rule = self.datastore.data['watching'][uuid]['css_filter']
if css_filter_rule and len(css_filter_rule.strip()):
html_content = html_tools.css_filter(css_filter=css_filter_rule, html_content=fetcher.content)

Expand All @@ -123,17 +130,37 @@ def run(self, uuid):

# If there's text to skip
# @todo we could abstract out the get_text() to handle this cleaner
if len(self.datastore.data['watching'][uuid]['ignore_text']):
stripped_text_from_html = self.strip_ignore_text(stripped_text_from_html,
self.datastore.data['watching'][uuid]['ignore_text'])
if len(watch['ignore_text']):
stripped_text_from_html = self.strip_ignore_text(stripped_text_from_html, watch['ignore_text'])
else:
stripped_text_from_html = stripped_text_from_html.encode('utf8')


fetched_md5 = hashlib.md5(stripped_text_from_html).hexdigest()

blocked_by_not_found_trigger_text = False

if len(watch['trigger_text']):
blocked_by_not_found_trigger_text = True
for line in watch['trigger_text']:
# Because JSON wont serialize a re.compile object
if line[0] == '/' and line[-1] == '/':
regex = re.compile(line.strip('/'), re.IGNORECASE)
# Found it? so we don't wait for it anymore
r = re.search(regex, str(stripped_text_from_html))
if r:
blocked_by_not_found_trigger_text = False
break

elif line.lower() in str(stripped_text_from_html).lower():
# We found it don't wait for it.
blocked_by_not_found_trigger_text = False
break


# could be None or False depending on JSON type
if self.datastore.data['watching'][uuid]['previous_md5'] != fetched_md5:
# On the first run of a site, watch['previous_md5'] will be an empty string
if not blocked_by_not_found_trigger_text and watch['previous_md5'] != fetched_md5:
changed_detected = True

# Don't confuse people by updating as last-changed, when it actually just changed from None..
Expand All @@ -144,7 +171,7 @@ def run(self, uuid):

# Extract title as title
if is_html and self.datastore.data['settings']['application']['extract_title_as_title']:
if not self.datastore.data['watching'][uuid]['title'] or not len(self.datastore.data['watching'][uuid]['title']):
if not watch['title'] or not len(watch['title']):
update_obj['title'] = html_tools.extract_element(find='title', html_content=fetcher.content)


Expand Down
3 changes: 2 additions & 1 deletion backend/forms.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
from wtforms.validators import ValidationError
from wtforms.fields import html5
from backend import content_fetcher
import re

class StringListField(StringField):
widget = widgets.TextArea()
Expand Down Expand Up @@ -124,7 +125,6 @@ def __init__(self, message=None):
self.message = message

def __call__(self, form, field):
import re

for line in field.data:
if line[0] == '/' and line[-1] == '/':
Expand Down Expand Up @@ -178,6 +178,7 @@ class watchForm(quickWatchForm):
notification_urls = StringListField('Notification URL List')
headers = StringDictKeyValue('Request Headers')
trigger_check = BooleanField('Send test notification on save')
trigger_text = StringListField('Trigger/wait for text', [validators.Optional(), ValidateListRegex()])


class globalSettingsForm(Form):
Expand Down
1 change: 1 addition & 0 deletions backend/store.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ def __init__(self, datastore_path="/datastore", include_default_watches=True):
'ignore_text': [], # List of text to ignore when calculating the comparison checksum
'notification_urls': [], # List of URLs to add to the notification Queue (Usually AppRise)
'css_filter': "",
'trigger_text': [], # List of text or regex to wait for until a change is detected
'fetch_backend': None,
}

Expand Down
15 changes: 14 additions & 1 deletion backend/templates/edit.html
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
<li class="tab" id="default-tab"><a href="#general">General</a></li>
<li class="tab"><a href="#notifications">Notifications</a></li>
<li class="tab"><a href="#filters">Filters</a></li>
<li class="tab"><a href="#triggers">Triggers</a></li>
</ul>
</div>

Expand Down Expand Up @@ -101,8 +102,20 @@
</span>

</fieldset>
</div>


<div class="tab-pane-inner" id="triggers">
<fieldset>
<div class="pure-control-group">
{{ render_field(form.trigger_text, rows=5, placeholder="Some text to wait for in a line
/some.regex\d{2}/ for case-INsensitive regex
") }}</br>
<span class="pure-form-message-inline">Text to wait for before triggering a change/notification, all text and regex are tested <i>case-insensitive</i>.</span><br/>
<span class="pure-form-message-inline">Trigger text is processed from the result-text that comes out of any <a href="#filters">CSS/JSON Filters</a> for this watch</span>.<br/>
<span class="pure-form-message-inline">Each line is process separately (think of each line as "OR")</span><br/>
<span class="pure-form-message-inline">Note: Wrap in forward slash / to use regex example: <span style="font-family: monospace; background: #eee">/foo\d/</span> </span>
</div>
</fieldset>
</div>
<div id="actions">
<div class="pure-control-group">
Expand Down
131 changes: 131 additions & 0 deletions backend/tests/test_trigger.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,131 @@
#!/usr/bin/python3

import time
from flask import url_for
from . util import live_server_setup


def set_original_ignore_response():
test_return_data = """<html>
<body>
Some initial text</br>
<p>Which is across multiple lines</p>
</br>
So let's see what happens. </br>
</body>
</html>

"""

with open("test-datastore/endpoint-content.txt", "w") as f:
f.write(test_return_data)


def set_modified_original_ignore_response():
test_return_data = """<html>
<body>
Some NEW nice initial text</br>
<p>Which is across multiple lines</p>
</br>
So let's see what happens. </br>
</body>
</html>

"""

with open("test-datastore/endpoint-content.txt", "w") as f:
f.write(test_return_data)


def set_modified_with_trigger_text_response():
test_return_data = """<html>
<body>
Some NEW nice initial text</br>
<p>Which is across multiple lines</p>
</br>
foobar123
<br/>
So let's see what happens. </br>
</body>
</html>

"""

with open("test-datastore/endpoint-content.txt", "w") as f:
f.write(test_return_data)


def test_trigger_functionality(client, live_server):

live_server_setup(live_server)

sleep_time_for_fetch_thread = 3
trigger_text = "foobar123"
set_original_ignore_response()

# Give the endpoint time to spin up
time.sleep(1)

# Add our URL to the import page
test_url = url_for('test_endpoint', _external=True)
res = client.post(
url_for("import_page"),
data={"urls": test_url},
follow_redirects=True
)
assert b"1 Imported" in res.data

# Trigger a check
client.get(url_for("api_watch_checknow"), follow_redirects=True)

# Give the thread time to pick it up
time.sleep(sleep_time_for_fetch_thread)

# Goto the edit page, add our ignore text
# Add our URL to the import page
res = client.post(
url_for("edit_page", uuid="first"),
data={"trigger_text": trigger_text,
"url": test_url,
"fetch_backend": "html_requests"},
follow_redirects=True
)
assert b"Updated watch." in res.data

# Check it saved
res = client.get(
url_for("edit_page", uuid="first"),
)
assert bytes(trigger_text.encode('utf-8')) in res.data

# Trigger a check
client.get(url_for("api_watch_checknow"), follow_redirects=True)

# Give the thread time to pick it up
time.sleep(sleep_time_for_fetch_thread)

# It should report nothing found (no new 'unviewed' class)
res = client.get(url_for("index"))
assert b'unviewed' not in res.data
assert b'/test-endpoint' in res.data

# Make a change
set_modified_original_ignore_response()

# Trigger a check
client.get(url_for("api_watch_checknow"), follow_redirects=True)
# Give the thread time to pick it up
time.sleep(sleep_time_for_fetch_thread)

# It should report nothing found (no new 'unviewed' class)
res = client.get(url_for("index"))
assert b'unviewed' not in res.data

# Just to be sure.. set a regular modified change..
time.sleep(sleep_time_for_fetch_thread)
set_modified_with_trigger_text_response()

client.get(url_for("api_watch_checknow"), follow_redirects=True)
time.sleep(sleep_time_for_fetch_thread)
res = client.get(url_for("index"))
assert b'unviewed' in res.data
81 changes: 81 additions & 0 deletions backend/tests/test_trigger_regex.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
#!/usr/bin/python3

import time
from flask import url_for
from . util import live_server_setup


def set_original_ignore_response():
test_return_data = """<html>
<body>
Some initial text</br>
<p>Which is across multiple lines</p>
</br>
So let's see what happens. </br>
</body>
</html>

"""

with open("test-datastore/endpoint-content.txt", "w") as f:
f.write(test_return_data)



def test_trigger_regex_functionality(client, live_server):

live_server_setup(live_server)

sleep_time_for_fetch_thread = 3

set_original_ignore_response()

# Give the endpoint time to spin up
time.sleep(1)

# Add our URL to the import page
test_url = url_for('test_endpoint', _external=True)
res = client.post(
url_for("import_page"),
data={"urls": test_url},
follow_redirects=True
)
assert b"1 Imported" in res.data

# Trigger a check
client.get(url_for("api_watch_checknow"), follow_redirects=True)

# Give the thread time to pick it up
time.sleep(sleep_time_for_fetch_thread)

# It should report nothing found (just a new one shouldnt have anything)
res = client.get(url_for("index"))
assert b'unviewed' not in res.data

### test regex
res = client.post(
url_for("edit_page", uuid="first"),
data={"trigger_text": '/something \d{3}/',
"url": test_url,
"fetch_backend": "html_requests"},
follow_redirects=True
)


with open("test-datastore/endpoint-content.txt", "w") as f:
f.write("some new noise")

client.get(url_for("api_watch_checknow"), follow_redirects=True)
time.sleep(sleep_time_for_fetch_thread)

# It should report nothing found (nothing should match the regex)
res = client.get(url_for("index"))
assert b'unviewed' not in res.data

with open("test-datastore/endpoint-content.txt", "w") as f:
f.write("regex test123<br/>\nsomething 123")

client.get(url_for("api_watch_checknow"), follow_redirects=True)
time.sleep(sleep_time_for_fetch_thread)
res = client.get(url_for("index"))
assert b'unviewed' in res.data