Skip to content

Commit

Permalink
feat(timeline search)
Browse files Browse the repository at this point in the history
  • Loading branch information
lmeyerov committed Sep 6, 2022
1 parent 15f1573 commit ef33740
Show file tree
Hide file tree
Showing 5 changed files with 265 additions and 21 deletions.
1 change: 1 addition & 0 deletions infra/pipelines/docker/datastream-docker-compose.yml
Expand Up @@ -38,6 +38,7 @@ services:
DOMINO_END_DATE: ${DOMINO_END_DATE:-}
DOMINO_JOB_NAME: ${DOMINO_JOB_NAME:-}
DOMINO_SEARCH: ${DOMINO_SEARCH:-}
DOMINO_USERNAMES: ${DOMINO_USERNAMES:-}
DOMINO_FETCH_PROFILES: ${DOMINO_FETCH_PROFILES:-}
DOMINO_WRITE_FORMAT: ${DOMINO_WRITE_FORMAT:-}
DOMINO_S3_FILEPATH: ${DOMINO_S3_FILEPATH:-}
Expand Down
22 changes: 22 additions & 0 deletions jobs/search_pfas_timelines.sh
@@ -0,0 +1,22 @@
#!/bin/bash

USERNAMES=""
while IFS= read -r LINE; do
USERNAMES="$USERNAMES,$LINE"
done < pfas_profiles

echo "Users: $USERNAMES"

#set -ex

cd ../infra/pipelines/docker/

JOB_NAME="pfas_timelines" \
FETCH_PROFILES="true" \
USERNAMES="$USERNAMES" \
STRIDE_SEC="`python -c 'print(30 * 1)'`" \
WRITE_FORMAT="parquet_s3" \
S3_FILEPATH="dt-phase1" \
AWS_ACCESS_KEY_ID=${AWS_ACCESS_KEY_ID} \
AWS_SECRET_ACCESS_KEY=${AWS_SECRET_ACCESS_KEY} \
./search_timelines.sh $@
137 changes: 137 additions & 0 deletions jobs/search_timelines.py
@@ -0,0 +1,137 @@
#!/usr/bin/env python
# coding: utf-8


import logging
logger = logging.getLogger()
logger.setLevel(logging.INFO) #DEBUG, INFO, WARNING, ERROR, CRITICAL



import json, os, pandas as pd, pendulum, sys
from ProjectDomino.Neo4jDataAccess import Neo4jDataAccess
from ProjectDomino.FirehoseJob import FirehoseJob
from ProjectDomino.TwintPool import TwintPool
from prefect.environments.storage import S3
from prefect import context, Flow, task
from prefect.schedules import IntervalSchedule
from datetime import timedelta, datetime
from prefect.engine.executors import DaskExecutor


S3_BUCKET = "wzy-project-domino"

pd.set_option('display.max_colwidth', None)
pd.set_option('display.max_rows', 500)
pd.set_option('display.max_columns', 500)
pd.set_option('display.width', 1000)

def env_non_empty(x: str):
return x in os.environ and os.environ[x]

def str_to_bool (x: str):
if x in ['True', 'true', '1', 'TRUE']:
return True
elif x in ['False', 'false', '0', 'FALSE']:
return False
else:
raise ValueError('Cannot convert to bool: ' + x)

stride_sec = int(os.environ['DOMINO_STRIDE_SEC']) if env_non_empty('DOMINO_STRIDE_SEC') else 30
job_name = os.environ['DOMINO_JOB_NAME'] if env_non_empty('DOMINO_JOB_NAME') else "covid"
write_format = os.environ['DOMINO_WRITE_FORMAT'] if env_non_empty('DOMINO_WRITE_FORMAT') else None
fetch_profiles = str_to_bool(os.environ['DOMINO_FETCH_PROFILES']) if env_non_empty('DOMINO_FETCH_PROFILES') else False
usernames_raw = os.environ['DOMINO_USERNAMES'] if env_non_empty('DOMINO_USERNAMES') else None
if usernames_raw is None:
raise ValueError('DOMINO_USERNAMES is not set, expected comma-delimited str')
usernames = usernames_raw.split(',')
usernames = [ x for x in usernames if len(x) > 0 ]

if write_format == 'parquet_s3':
s3_filepath = os.environ['DOMINO_S3_FILEPATH'] if env_non_empty('DOMINO_S3_FILEPATH') else None
AWS_ACCESS_KEY_ID = os.environ['AWS_ACCESS_KEY_ID']
AWS_SECRET_ACCESS_KEY = os.environ['AWS_SECRET_ACCESS_KEY']
compression = os.environ['DOMINO_COMPRESSION'] if env_non_empty('DOMINO_COMPRESSION') else 'snappy'

output_path = f'/output/{job_name}'
os.makedirs(f'{output_path}/tweets', exist_ok=True)
os.makedirs(f'{output_path}/profiles', exist_ok=True)
os.makedirs(f'{output_path}/timelines', exist_ok=True)


# FIXME unsafe when distributed
usernames_queue = usernames.copy()
pending = 0

@task(log_stdout=True, skip_on_upstream_skip=True, max_retries=3, retry_delay=timedelta(seconds=30))
def run_stream():

global pending

if len(usernames_queue) == 0 and pending == 0:
logger.info(f'Successfully processed all usernames ({len(usernames)}), exiting')
sys.exit(0)

if len(usernames_queue) == 0:
logger.info(f'No more usernames to process, but {pending} jobs are still pending')
return

pending += 1
username = usernames_queue.pop(0)

try:

tp = TwintPool(is_tor=True)
fh = FirehoseJob(
PARQUET_SAMPLE_RATE_TIME_S=30,
save_to_neo=False,
tp=tp,
writers={},
write_to_disk=write_format,
write_opts=(
{
's3_filepath': s3_filepath,
's3fs_options': {
'key': AWS_ACCESS_KEY_ID,
'secret': AWS_SECRET_ACCESS_KEY
},
'compression': compression
}
if write_format == 'parquet_s3' else
{}
)
)

try:
for df in fh.get_timelines(
usernames=[username],
job_name=job_name,
fetch_profiles = fetch_profiles
):
print('got: %s', df.shape if df is not None else 'None')
except Exception as e:
logger.error("job exception", exc_info=True)
raise e
except:
logger.error("task exception, reinserting user", exc_info=True)
usernames_queue.insert(0, username)
pending -= 1
print("task finished")


schedule_opts = {
'interval': timedelta(seconds=stride_sec),
'start_date': pendulum.parse('2019-01-01 00:00:00')
}
logger.info(f'Schedule options: {schedule_opts}')
logger.info(f'Task settings: stride_sec={stride_sec}')

schedule = IntervalSchedule(**schedule_opts)
storage = S3(bucket=S3_BUCKET)

#with Flow("covid-19 stream-single") as flow:
#with Flow("covid-19 stream", storage=storage, schedule=schedule) as flow:
with Flow(f"{job_name} stream", schedule=schedule) as flow:
run_stream()
flow.run()

64 changes: 63 additions & 1 deletion modules/FirehoseJob.py
Expand Up @@ -4,6 +4,7 @@
from codecs import ignore_errors
from collections import deque, defaultdict
import datetime, gc, os, string, sys, time, uuid
from datetime import date
from typing import Any, Literal, Optional
import numpy as np
import pandas as pd
Expand All @@ -13,6 +14,7 @@
import s3fs
import simplejson as json # nan serialization
from twarc import Twarc
from twint.user import SuspendedUser

from .Timer import Timer
from .TwarcPool import TwarcPool
Expand Down Expand Up @@ -848,7 +850,7 @@ def search_time_range(self,
Until="2020-01-01 21:00:00",
job_name=None,
tp=None,
write_to_disk: Optional[Literal['csv', 'json']] = None,
write_to_disk: Optional[Literal['csv', 'json', 'parquet', 'parquet_s3']] = None,
fetch_profiles: bool = False,
**kwargs):
tic = time.perf_counter()
Expand Down Expand Up @@ -903,3 +905,63 @@ def search_time_range(self,
logger.info(f'finished twint loop in: {toc - tic:0.4f} seconds')
logger.info('done search_time_range')

def get_timelines(self,
usernames,
job_name=None,
tp=None,
write_to_disk: Optional[Literal['csv', 'json', 'parquet', 'parquet_s3']] = None,
fetch_profiles: bool = False,
**kwargs):
tic = time.perf_counter()
if job_name is None:
job_name = "timeline_%s" % user
tp = tp or self.tp or TwintPool(is_tor=True)
for user in usernames:
logger.info('start user: %s', user)
t_prev = time.perf_counter()
now = date.today().strftime('%Y-%m-%d%H:%M:%S')
tp.reset_config()
try:
df = tp._get_timeline(username=user, **kwargs)
user_exists = True
except SuspendedUser:
logger.info(f'User {user} is suspended')
df = None
user_exists = False
if df is not None:
self._maybe_write_batch(
df,
write_to_disk,
f'{job_name}/timelines/{user}',
write_opts=kwargs.get('write_opts', self.write_opts)
)

t_iter = time.perf_counter()
logger.info(f'finished tp._get_timeline ({user}): {t_iter - t_prev:0.4f} seconds')
t_prev = t_iter

if fetch_profiles:
if user_exists:
tp.reset_config()
users_df = self.search_user_info_by_name(pd.DataFrame({'username': [user]}), tp)
else:
users_df = pd.DataFrame({'username': [user], 'suspended': [True]})
if users_df is not None:
self._maybe_write_batch(
users_df,
write_to_disk,
f'{job_name}/profiles/{now}',
write_opts=kwargs.get('write_opts', self.write_opts)
)
t_iter = time.perf_counter()
logger.info(f'finished tp.search_user_info_by_name: {t_iter - t_prev:0.4f} seconds')
t_prev = t_iter

yield df

toc = time.perf_counter()
logger.info(f'finished twint loop in: {toc - tic:0.4f} seconds')
logger.info('done get_timelines')



62 changes: 42 additions & 20 deletions modules/TwintPool.py
Expand Up @@ -7,29 +7,47 @@
import logging
import time
from twint.token import RefreshTokenException
from twint.run import Profile

logger = logging.getLogger()

extractor = URLExtract()


def reset_config(config, is_tor=False):
if config is None:
config = twint.Config()
config.Limit = 1000
config.Pandas = True
config.Hide_output = True
config.Verified = None
config.Username = None
config.User_full = False
config.Since = None
config.Until = None
config.Search = None
config.Retweets = None
if is_tor:
config.Proxy_host = 'localhost'
config.Proxy_port = "9050"
config.Proxy_type = "socks5"
else:
config.Proxy_host = None # "tor"
config.Proxy_port = None # "9050"
config.Proxy_type = None # "socks5"
return config


class TwintPool:

is_tor = False

def __init__(self, is_tor=False, twint_config=None):
self.config = twint_config or twint.Config()
self.config.Limit = 1000
self.config.Pandas = True
self.config.Hide_output = True
self.config.Verified = None
self.config.Username = None
self.config.User_full = None
if is_tor:
self.config.Proxy_host = 'localhost'
self.config.Proxy_port = "9050"
self.config.Proxy_type = "socks5"
else:
self.config.Proxy_host = None # "tor"
self.config.Proxy_port = None # "9050"
self.config.Proxy_type = None # "socks5"
self.is_tor = is_tor
self.config = reset_config(twint_config or twint.Config(), is_tor)

def reset_config(self, is_tor=None):
self.config = reset_config(self.config, is_tor if is_tor is not None else self.is_tor)

def twint_loop(self, since, until, stride_sec=600, limit=None):
def get_unix_time(time_str):
Expand Down Expand Up @@ -73,14 +91,18 @@ def _get_term(self, Search="IngSoc", Since="1984-04-20 13:00:00", Until="1984-04
toc = time.perf_counter()
logger.info(f'finished get_term searching for tweets in: {toc - tic:0.4f} seconds')

def _get_timeline(self, username, limit):
self.config.Retweets = True
self.config.Search = "from:" + username
self.config.Limit = limit
twint.run.Search(self.config)
def _get_timeline(self, username):
#self.config.Retweets = True
#self.config.Search = "from:" + username
#self.config.Limit = limit
#twint.run.Search(self.config)
self.config.Username = username
self.config.Pandas = True
twint.run.Profile(self.config)
tweets_df = twint.storage.panda.Tweets_df
return tweets_df


def _get_user_info(self, username, ignore_errors=False):
self.config.User_full = True
self.config.Username = username
Expand Down

0 comments on commit ef33740

Please sign in to comment.