Skip to content

Commit

Permalink
Fixing #16 and part of #17
Browse files Browse the repository at this point in the history
Signed-off-by: David <Dtaivpp@gmail.com>
  • Loading branch information
dtaivpp committed Nov 9, 2022
1 parent 37bd202 commit 49233cd
Show file tree
Hide file tree
Showing 9 changed files with 129 additions and 35 deletions.
7 changes: 4 additions & 3 deletions community_pulse/cluster_init.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,10 @@ def enforce_index_templates():
"""Validate index template is in place"""
os_client = get_os_client()

if not os_client.indices.exists_index_template("tweets"):
if not os_client.indices.exists_index_template("twitter"):
body = {
"index_patterns": [
"tweets*"
"twitter*"
],
"template": {
"settings": {
Expand Down Expand Up @@ -35,9 +35,10 @@ def enforce_index_templates():
}
}
}
os_client.indices.put_index_template("tweets", body=body)
os_client.indices.put_index_template("twitter", body=body)


""" def create_meta_indices(os_client):
if not os_client.indices.exists("markers"):
os_client.indices.create("markers") """

73 changes: 57 additions & 16 deletions community_pulse/main.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import os
import time
import logging
#import argparse
import argparse
import dotenv
from community_pulse.cluster_init import enforce_index_templates
from community_pulse.util import parse_config, initialize_opensearch_client, init_logging
Expand All @@ -10,32 +11,72 @@
dotenv.load_dotenv()


def main():
"""Main loop for community pulse"""
# parser = argparse.ArgumentParser(description="""Community-Pulse is a project for collecting \
# and aggregating community data.""")
#args = parser.parse_args()
def get_args():
def dir_path(string):
"""Helper for path parsing"""
if os.path.isfile(string):
return string

elif string == '/etc/pulse/config.yml':
try:
os.mkdir("etc", "pulse")
with open("config.yaml", "rw") as f:
f.write()
return string
except PermissionError:
raise PermissionError("Could not create config file")
else:
raise FileNotFoundError(string)

full_config = parse_config('./test_config/community_pulse.yml')
init_logging(logger, full_config['settings']['log_level'])
initialize_opensearch_client(full_config['settings']['opensearch'])
parser = argparse.ArgumentParser(description="""Community-Pulse is a project for collecting \
and aggregating community data.""")
parser.add_argument('--config',
default='/etc/pulse/config.yml',
type=dir_path,
help="Directory where the config is located. \
If not passed a templated is paced in /etc/pulse")
parser.add_argument('--jobs',
nargs='+',
default=[],
help="A list of the jobs (defined in the config) \
you would like to run")

return parser.parse_args()


def job_runner(job_list, full_config):
jobs_map = {
'twitter': community_pulse.twitter.gen_twitter_executor()
}
if len(job_list)==0:
job_list = full_config['jobs']

filtered_jobs = { job: full_config['jobs'][job] for job in job_list }

# Should make this a little smarter
enforce_index_templates()
# create_meta_indices(os_client)

for job, config in full_config['jobs'].items():
curr_job = jobs_map.get(job)
for job, config in filtered_jobs.items():
curr_job = jobs_map.get(config['type'])
start = time.time()
logger.debug("Starting Job %s", job)
curr_job(**config)
curr_job(job, **config)
end = time.time()
logger.debug("Finshed %s in: %is", job, end - start)


def main():
"""Main loop for community pulse"""
args = get_args()
full_config = parse_config(args.config)

init_logging(logger, full_config['settings']['log_level'])
initialize_opensearch_client(full_config['settings']['opensearch'])

enforce_index_templates()

job_runner(args.jobs, full_config)

# Should make this a little smarter
# create_meta_indices(os_client)


if __name__ == "__main__":
main()
23 changes: 16 additions & 7 deletions community_pulse/twitter.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,18 @@
from os import getenv
from datetime import datetime
import time
import logging
import tweepy
from community_pulse.util import to_ndjson, to_opensearch, backoff, get_os_client, matches_filter, jsonpath_filters
from community_pulse.util import to_ndjson, \
to_opensearch, \
backoff, get_os_client, \
matches_filter, \
jsonpath_filters, \
index_name_builder

logger = logging.getLogger('community-pulse')


def get_data(querystring, translate: bool, ignore: list):
def get_data(job, querystring, translate: bool, ignore: list):
"""
Get Tweets is a consumable stream of tweets that match the arg params
"""
Expand All @@ -17,14 +21,15 @@ def get_data(querystring, translate: bool, ignore: list):
from community_pulse.util import translate_text

ignore_filters = jsonpath_filters(ignore)
tw_index = index_name_builder(job, job_type='twitter')

logger.debug(f"Tweet Index {tw_index}")
client = create_twitter_client()
# Needs updating such that if the marker wasn't from within the last
# 7 days it returns none
most_recent_tweet_id = get_marker()

tw_detail = []
tw_index = f"tweets-{datetime.date(datetime.now())}"

tweets = tweepy.Paginator(client.search_recent_tweets,
querystring,
Expand Down Expand Up @@ -101,13 +106,16 @@ def tweets_iterator(iterator):
logger.debug("Twitter 503/429. Backing off for: %ss", sleep_time)
sleep_time = backoff(sleep_time)
except StopIteration:
logger.debug("Hit Stop Iteration")
return None


def create_twitter_client() -> tweepy.Client:
"""Returns twitter client"""
try:
token = getenv("TW_BEARER_TOKEN")
if not len(token) == 0:
logger.debug("Got non-zero bearer token")
client = tweepy.Client(bearer_token=token)
except Exception as err:
logger.exception(err)
Expand All @@ -116,8 +124,9 @@ def create_twitter_client() -> tweepy.Client:

def gen_twitter_executor():
"""Wraps the twitter execuition pipeline"""
def execute_twitter(query: str, translate=False, ignore={}):
tweets = get_data(querystring=query,
def execute_twitter(job ,query: str, translate=False, ignore={}, type=""):
tweets = get_data(job,
querystring=query,
translate=translate,
ignore=ignore)

Expand Down Expand Up @@ -174,7 +183,7 @@ def get_marker():
],
"size": 1
}
result = os_client.search(index='tweets*', body=query)
result = os_client.search(index='twitter*', body=query)

try:
_id = result['hits']['hits'][0]['_id']
Expand Down
22 changes: 19 additions & 3 deletions community_pulse/util.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
import os
import time
import logging
from logging.handlers import RotatingFileHandler
import json
from datetime import datetime
from opensearchpy import OpenSearch


Expand Down Expand Up @@ -58,8 +60,12 @@ def to_opensearch(data: list):
"""
os_client = get_os_client()
for batch in data:
os_client.bulk(batch)

logger.debug("Performing Bulk Insert")
try:
response = os_client.bulk(batch)
logger.debug(response)
except Exception as e:
logger.exception(e)

def translate_text(text: str):
"""Translates text with Google Cloud Translate
Expand Down Expand Up @@ -133,6 +139,9 @@ def initialize_opensearch_client(opensearch_args):
_OS_CLIENT = OpenSearch(**opensearch_args)


def index_name_builder(job, job_type):
return f"{job_type}-{job}-{datetime.date(datetime.now())}"

def get_os_client():
"""Returns Already Instantiated OpenSearch Client"""
if _OS_CLIENT is None:
Expand All @@ -143,9 +152,16 @@ def get_os_client():
def init_logging(_logger: logging.Logger, log_level):
"""Initialize the logging"""
formatter = logging.Formatter('%(asctime)s - %(levelname)s - %(message)s')
log_dir='./log'

if not os.path.isdir(log_dir):
try:
os.makedirs(log_dir)
except:
raise PermissionError("Couldnt create %s", log_dir)

# File Handler
file_handle = RotatingFileHandler('./log/community-pulse', backupCount=15)
file_handle = RotatingFileHandler(log_dir + "/community-pulse.log", backupCount=15)
file_handle.setLevel(logging.DEBUG)
file_handle.setFormatter(formatter)

Expand Down
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ build-backend = "hatchling.build"
[project]
name = "community-pulse"

version = "0.0.2"
version = "0.0.3"

authors = [
{ name="David Tippett" },
Expand Down
1 change: 1 addition & 0 deletions requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ google-cloud-translate
iso-639
pyyaml
jsonpath-ng
python-dotenv

# Build/Test
pytest
Expand Down
6 changes: 3 additions & 3 deletions test/util_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@


def test_matches_filter():
from community_pulse.util import jsonpath_filters, matches_filter
from util import jsonpath_filters, matches_filter
"""Tests to ensure json path filtering works"""
test_filters = [
{
Expand Down Expand Up @@ -42,14 +42,14 @@ def test_parse_config():


def test_backoff():
from community_pulse.util import backoff
from util import backoff
current = 3
expected = 9
assert backoff(current) == expected


def test_backoff_max():
from community_pulse.util import backoff
from util import backoff
current = 20
expected = 25
assert backoff(current, max_backoff=25) == 25
15 changes: 13 additions & 2 deletions test_config/community_pulse.yml
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,22 @@ settings:
ssl_show_warn: False

jobs:
twitter:
translate: True
OpenSearchCommunity:
type: twitter
translate: False
query: "OpenSearch -#NFT -opensea -openseanft"
ignore:
username:
- archivearticles
- OPENSEARCH2
- OpenSearch

OpenSearchCon:
type: twitter
translate: False
query: "#OpenSearchCon"
ignore:
username:
- archivearticles
- OPENSEARCH2
- OpenSearch
15 changes: 15 additions & 0 deletions test_config/unbuntucon.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
settings:
log_level: DEBUG
log_dir: /var/log/
opensearch:
hosts:
- https://admin:admin@localhost:9200
use_ssl: True
verify_certs: False
ssl_show_warn: False

jobs:
ubuntucon:
type: twitter
translate: False
query: "UbuntuSummit"

0 comments on commit 49233cd

Please sign in to comment.