diff --git a/infra/pipelines/docker/datastream-docker-compose.yml b/infra/pipelines/docker/datastream-docker-compose.yml index 2c9029d..234de2e 100644 --- a/infra/pipelines/docker/datastream-docker-compose.yml +++ b/infra/pipelines/docker/datastream-docker-compose.yml @@ -38,6 +38,7 @@ services: DOMINO_END_DATE: ${DOMINO_END_DATE:-} DOMINO_JOB_NAME: ${DOMINO_JOB_NAME:-} DOMINO_SEARCH: ${DOMINO_SEARCH:-} + DOMINO_USERNAMES: ${DOMINO_USERNAMES:-} DOMINO_FETCH_PROFILES: ${DOMINO_FETCH_PROFILES:-} DOMINO_WRITE_FORMAT: ${DOMINO_WRITE_FORMAT:-} DOMINO_S3_FILEPATH: ${DOMINO_S3_FILEPATH:-} diff --git a/jobs/search_pfas_timelines.sh b/jobs/search_pfas_timelines.sh new file mode 100755 index 0000000..3a9983c --- /dev/null +++ b/jobs/search_pfas_timelines.sh @@ -0,0 +1,22 @@ +#!/bin/bash + +USERNAMES="" +while IFS= read -r LINE; do + USERNAMES="$USERNAMES,$LINE" +done < pfas_profiles + +echo "Users: $USERNAMES" + +#set -ex + +cd ../infra/pipelines/docker/ + +JOB_NAME="pfas_timelines" \ + FETCH_PROFILES="true" \ + USERNAMES="$USERNAMES" \ + STRIDE_SEC="`python -c 'print(30 * 1)'`" \ + WRITE_FORMAT="parquet_s3" \ + S3_FILEPATH="dt-phase1" \ + AWS_ACCESS_KEY_ID=${AWS_ACCESS_KEY_ID} \ + AWS_SECRET_ACCESS_KEY=${AWS_SECRET_ACCESS_KEY} \ + ./search_timelines.sh $@ diff --git a/jobs/search_timelines.py b/jobs/search_timelines.py new file mode 100644 index 0000000..6c9e5bc --- /dev/null +++ b/jobs/search_timelines.py @@ -0,0 +1,137 @@ +#!/usr/bin/env python +# coding: utf-8 + + +import logging +logger = logging.getLogger() +logger.setLevel(logging.INFO) #DEBUG, INFO, WARNING, ERROR, CRITICAL + + + +import json, os, pandas as pd, pendulum, sys +from ProjectDomino.Neo4jDataAccess import Neo4jDataAccess +from ProjectDomino.FirehoseJob import FirehoseJob +from ProjectDomino.TwintPool import TwintPool +from prefect.environments.storage import S3 +from prefect import context, Flow, task +from prefect.schedules import IntervalSchedule +from datetime import timedelta, datetime +from prefect.engine.executors import DaskExecutor + + +S3_BUCKET = "wzy-project-domino" + +pd.set_option('display.max_colwidth', None) +pd.set_option('display.max_rows', 500) +pd.set_option('display.max_columns', 500) +pd.set_option('display.width', 1000) + +def env_non_empty(x: str): + return x in os.environ and os.environ[x] + +def str_to_bool (x: str): + if x in ['True', 'true', '1', 'TRUE']: + return True + elif x in ['False', 'false', '0', 'FALSE']: + return False + else: + raise ValueError('Cannot convert to bool: ' + x) + +stride_sec = int(os.environ['DOMINO_STRIDE_SEC']) if env_non_empty('DOMINO_STRIDE_SEC') else 30 +job_name = os.environ['DOMINO_JOB_NAME'] if env_non_empty('DOMINO_JOB_NAME') else "covid" +write_format = os.environ['DOMINO_WRITE_FORMAT'] if env_non_empty('DOMINO_WRITE_FORMAT') else None +fetch_profiles = str_to_bool(os.environ['DOMINO_FETCH_PROFILES']) if env_non_empty('DOMINO_FETCH_PROFILES') else False +usernames_raw = os.environ['DOMINO_USERNAMES'] if env_non_empty('DOMINO_USERNAMES') else None +if usernames_raw is None: + raise ValueError('DOMINO_USERNAMES is not set, expected comma-delimited str') +usernames = usernames_raw.split(',') +usernames = [ x for x in usernames if len(x) > 0 ] + +if write_format == 'parquet_s3': + s3_filepath = os.environ['DOMINO_S3_FILEPATH'] if env_non_empty('DOMINO_S3_FILEPATH') else None + AWS_ACCESS_KEY_ID = os.environ['AWS_ACCESS_KEY_ID'] + AWS_SECRET_ACCESS_KEY = os.environ['AWS_SECRET_ACCESS_KEY'] + compression = os.environ['DOMINO_COMPRESSION'] if env_non_empty('DOMINO_COMPRESSION') else 'snappy' + +output_path = f'/output/{job_name}' +os.makedirs(f'{output_path}/tweets', exist_ok=True) +os.makedirs(f'{output_path}/profiles', exist_ok=True) +os.makedirs(f'{output_path}/timelines', exist_ok=True) + + +# FIXME unsafe when distributed +usernames_queue = usernames.copy() +pending = 0 + +@task(log_stdout=True, skip_on_upstream_skip=True, max_retries=3, retry_delay=timedelta(seconds=30)) +def run_stream(): + + global pending + + if len(usernames_queue) == 0 and pending == 0: + logger.info(f'Successfully processed all usernames ({len(usernames)}), exiting') + sys.exit(0) + + if len(usernames_queue) == 0: + logger.info(f'No more usernames to process, but {pending} jobs are still pending') + return + + pending += 1 + username = usernames_queue.pop(0) + + try: + + tp = TwintPool(is_tor=True) + fh = FirehoseJob( + PARQUET_SAMPLE_RATE_TIME_S=30, + save_to_neo=False, + tp=tp, + writers={}, + write_to_disk=write_format, + write_opts=( + { + 's3_filepath': s3_filepath, + 's3fs_options': { + 'key': AWS_ACCESS_KEY_ID, + 'secret': AWS_SECRET_ACCESS_KEY + }, + 'compression': compression + } + if write_format == 'parquet_s3' else + {} + ) + ) + + try: + for df in fh.get_timelines( + usernames=[username], + job_name=job_name, + fetch_profiles = fetch_profiles + ): + print('got: %s', df.shape if df is not None else 'None') + except Exception as e: + logger.error("job exception", exc_info=True) + raise e + except: + logger.error("task exception, reinserting user", exc_info=True) + usernames_queue.insert(0, username) + pending -= 1 + print("task finished") + + +schedule_opts = { + 'interval': timedelta(seconds=stride_sec), + 'start_date': pendulum.parse('2019-01-01 00:00:00') +} +logger.info(f'Schedule options: {schedule_opts}') +logger.info(f'Task settings: stride_sec={stride_sec}') + +schedule = IntervalSchedule(**schedule_opts) +storage = S3(bucket=S3_BUCKET) + +#with Flow("covid-19 stream-single") as flow: +#with Flow("covid-19 stream", storage=storage, schedule=schedule) as flow: +with Flow(f"{job_name} stream", schedule=schedule) as flow: + run_stream() +flow.run() + diff --git a/modules/FirehoseJob.py b/modules/FirehoseJob.py index af99d0e..52b49cc 100644 --- a/modules/FirehoseJob.py +++ b/modules/FirehoseJob.py @@ -4,6 +4,7 @@ from codecs import ignore_errors from collections import deque, defaultdict import datetime, gc, os, string, sys, time, uuid +from datetime import date from typing import Any, Literal, Optional import numpy as np import pandas as pd @@ -13,6 +14,7 @@ import s3fs import simplejson as json # nan serialization from twarc import Twarc +from twint.user import SuspendedUser from .Timer import Timer from .TwarcPool import TwarcPool @@ -848,7 +850,7 @@ def search_time_range(self, Until="2020-01-01 21:00:00", job_name=None, tp=None, - write_to_disk: Optional[Literal['csv', 'json']] = None, + write_to_disk: Optional[Literal['csv', 'json', 'parquet', 'parquet_s3']] = None, fetch_profiles: bool = False, **kwargs): tic = time.perf_counter() @@ -903,3 +905,63 @@ def search_time_range(self, logger.info(f'finished twint loop in: {toc - tic:0.4f} seconds') logger.info('done search_time_range') + def get_timelines(self, + usernames, + job_name=None, + tp=None, + write_to_disk: Optional[Literal['csv', 'json', 'parquet', 'parquet_s3']] = None, + fetch_profiles: bool = False, + **kwargs): + tic = time.perf_counter() + if job_name is None: + job_name = "timeline_%s" % user + tp = tp or self.tp or TwintPool(is_tor=True) + for user in usernames: + logger.info('start user: %s', user) + t_prev = time.perf_counter() + now = date.today().strftime('%Y-%m-%d%H:%M:%S') + tp.reset_config() + try: + df = tp._get_timeline(username=user, **kwargs) + user_exists = True + except SuspendedUser: + logger.info(f'User {user} is suspended') + df = None + user_exists = False + if df is not None: + self._maybe_write_batch( + df, + write_to_disk, + f'{job_name}/timelines/{user}', + write_opts=kwargs.get('write_opts', self.write_opts) + ) + + t_iter = time.perf_counter() + logger.info(f'finished tp._get_timeline ({user}): {t_iter - t_prev:0.4f} seconds') + t_prev = t_iter + + if fetch_profiles: + if user_exists: + tp.reset_config() + users_df = self.search_user_info_by_name(pd.DataFrame({'username': [user]}), tp) + else: + users_df = pd.DataFrame({'username': [user], 'suspended': [True]}) + if users_df is not None: + self._maybe_write_batch( + users_df, + write_to_disk, + f'{job_name}/profiles/{now}', + write_opts=kwargs.get('write_opts', self.write_opts) + ) + t_iter = time.perf_counter() + logger.info(f'finished tp.search_user_info_by_name: {t_iter - t_prev:0.4f} seconds') + t_prev = t_iter + + yield df + + toc = time.perf_counter() + logger.info(f'finished twint loop in: {toc - tic:0.4f} seconds') + logger.info('done get_timelines') + + + diff --git a/modules/TwintPool.py b/modules/TwintPool.py index fa841df..24647ee 100644 --- a/modules/TwintPool.py +++ b/modules/TwintPool.py @@ -7,29 +7,47 @@ import logging import time from twint.token import RefreshTokenException +from twint.run import Profile logger = logging.getLogger() extractor = URLExtract() + +def reset_config(config, is_tor=False): + if config is None: + config = twint.Config() + config.Limit = 1000 + config.Pandas = True + config.Hide_output = True + config.Verified = None + config.Username = None + config.User_full = False + config.Since = None + config.Until = None + config.Search = None + config.Retweets = None + if is_tor: + config.Proxy_host = 'localhost' + config.Proxy_port = "9050" + config.Proxy_type = "socks5" + else: + config.Proxy_host = None # "tor" + config.Proxy_port = None # "9050" + config.Proxy_type = None # "socks5" + return config + + class TwintPool: + is_tor = False + def __init__(self, is_tor=False, twint_config=None): - self.config = twint_config or twint.Config() - self.config.Limit = 1000 - self.config.Pandas = True - self.config.Hide_output = True - self.config.Verified = None - self.config.Username = None - self.config.User_full = None - if is_tor: - self.config.Proxy_host = 'localhost' - self.config.Proxy_port = "9050" - self.config.Proxy_type = "socks5" - else: - self.config.Proxy_host = None # "tor" - self.config.Proxy_port = None # "9050" - self.config.Proxy_type = None # "socks5" + self.is_tor = is_tor + self.config = reset_config(twint_config or twint.Config(), is_tor) + + def reset_config(self, is_tor=None): + self.config = reset_config(self.config, is_tor if is_tor is not None else self.is_tor) def twint_loop(self, since, until, stride_sec=600, limit=None): def get_unix_time(time_str): @@ -73,14 +91,18 @@ def _get_term(self, Search="IngSoc", Since="1984-04-20 13:00:00", Until="1984-04 toc = time.perf_counter() logger.info(f'finished get_term searching for tweets in: {toc - tic:0.4f} seconds') - def _get_timeline(self, username, limit): - self.config.Retweets = True - self.config.Search = "from:" + username - self.config.Limit = limit - twint.run.Search(self.config) + def _get_timeline(self, username): + #self.config.Retweets = True + #self.config.Search = "from:" + username + #self.config.Limit = limit + #twint.run.Search(self.config) + self.config.Username = username + self.config.Pandas = True + twint.run.Profile(self.config) tweets_df = twint.storage.panda.Tweets_df return tweets_df + def _get_user_info(self, username, ignore_errors=False): self.config.User_full = True self.config.Username = username