diff --git a/infra/pipelines/docker/datastream-Dockerfile b/infra/pipelines/docker/datastream-Dockerfile index 9f7f2a1..f9a47a7 100644 --- a/infra/pipelines/docker/datastream-Dockerfile +++ b/infra/pipelines/docker/datastream-Dockerfile @@ -18,6 +18,9 @@ RUN echo "ok6" && pip install git+https://github.com/TheDataRideAlongs/twint.git COPY ./modules /app/ProjectDomino COPY ./infra/pipelines/docker/jobs /app +COPY ./infra/pipelines/docker/datastream-entrypoint.sh /entrypoint.sh + +ENTRYPOINT ["/entrypoint.sh"] HEALTHCHECK --interval=60s --timeout=15s --start-period=20s \ CMD curl -sf --socks5-hostname localhost:9050 https://check.torproject.org | grep Congrat diff --git a/infra/pipelines/docker/datastream-docker-compose.yml b/infra/pipelines/docker/datastream-docker-compose.yml index d07d3ac..405a162 100644 --- a/infra/pipelines/docker/datastream-docker-compose.yml +++ b/infra/pipelines/docker/datastream-docker-compose.yml @@ -20,6 +20,7 @@ services: volumes: - /home/codywebb/ProjectDomino/infra/pipelines/docker/jobs/neo4jcreds.json:/secrets/neo4jcreds.json:ro environment: + JOB_FILE: search_by_date_job.py PREFECT__SERVER__HOST: ${PREFECT__SERVER__HOST:-http://host.docker.internal} PREFECT__SERVER__PORT: ${PREFECT__SERVER__PORT:-4200} PREFECT__SERVER__UI__HOST: ${PREFECT__SERVER__UI__HOST:-http://host.docker.internal} diff --git a/infra/pipelines/docker/datastream-entrypoint.sh b/infra/pipelines/docker/datastream-entrypoint.sh new file mode 100755 index 0000000..935f140 --- /dev/null +++ b/infra/pipelines/docker/datastream-entrypoint.sh @@ -0,0 +1,6 @@ +#!/bin/bash +set -ex + +service tor start + +python3 /app/${JOB_FILE} $@ diff --git a/infra/pipelines/docker/jobs/search_by_date_job.py b/infra/pipelines/docker/jobs/search_by_date_job.py index a6c75b3..f98ffa9 100644 --- a/infra/pipelines/docker/jobs/search_by_date_job.py +++ b/infra/pipelines/docker/jobs/search_by_date_job.py @@ -1,14 +1,6 @@ #!/usr/bin/env python # coding: utf-8 -# In[26]: - - - - - -# In[27]: - import logging logger = logging.getLogger() @@ -16,56 +8,26 @@ - -# In[28]: - - import json, pandas as pd from ProjectDomino.Neo4jDataAccess import Neo4jDataAccess from ProjectDomino.FirehoseJob import FirehoseJob from ProjectDomino.TwintPool import TwintPool from prefect.environments.storage import S3 -from prefect import Flow,task +from prefect import context, Flow, task from prefect.schedules import IntervalSchedule from datetime import timedelta, datetime -from random import randrange from prefect.engine.executors import DaskExecutor -import time -import random - - -# In[29]: - - - - -# In[30]: S3_BUCKET = "wzy-project-domino" - -# In[31]: - - pd.set_option('display.max_colwidth', None) pd.set_option('display.max_rows', 500) pd.set_option('display.max_columns', 500) pd.set_option('display.width', 1000) -# ## task - -# In[33]: - - -def random_date(start, end): - delta = end - start - int_delta = (delta.days * 24 * 60 * 60) + delta.seconds - random_second = randrange(int_delta) - return start + timedelta(seconds=random_second) - def get_creds(): neo4j_creds = None with open('/secrets/neo4jcreds.json') as json_file: @@ -75,18 +37,24 @@ def get_creds(): @task(log_stdout=True, skip_on_upstream_skip=True) def run_stream(): creds = get_creds() - start = datetime.strptime("2020-03-11 20:00:00", "%Y-%m-%d %H:%M:%S") - current = datetime.strptime(datetime.now().strftime("%Y-%m-%d %H:%M:%S"), "%Y-%m-%d %H:%M:%S") - #rand_dt=random_date(start, current) + + start = context.scheduled_start_time - timedelta(seconds=60) + current = start + timedelta(seconds=30) + #start = datetime.strptime("2020-10-06 22:10:00", "%Y-%m-%d %H:%M:%S") + #current = datetime.strptime("2020-10-10 16:08:00", "%Y-%m-%d %H:%M:%S") + #current = datetime.strptime(datetime.now().strftime("%Y-%m-%d %H:%M:%S"), "%Y-%m-%d %H:%M:%S") #2020-10-10 16:07:30 - #2020-10-11 06:29:00 to 2020-10-11 06:29:30: + #2020-10-11 06:29:00 to 2020-10-11 06:29:30: + #2020-10-11 18:45:00 to 2020-10-11 18:45:30: + #2020-10-05 17:00:30-2020-10-05 17:01:00 + # 2020-10-06 22:10:00 to 2020-10-06 22:10:30: tp = TwintPool(is_tor=True) fh = FirehoseJob(neo4j_creds=creds, PARQUET_SAMPLE_RATE_TIME_S=30, save_to_neo=True, writers={}) try: search = "covid OR corona OR virus OR pandemic" job_name = "covid multi test" limit = 10000000 - for df in fh.search_time_range(tp=tp, Search=search, Since=str(start), Until=str(current), job_name=job_name, Limit=10000000, stride_sec=30): + for df in fh.search_time_range(tp=tp, Search=search, Since=datetime.strftime(start, "%Y-%m-%d %H:%M:%S"), Until=datetime.strftime(current, "%Y-%m-%d %H:%M:%S"), job_name=job_name, Limit=10000000, stride_sec=30): logger.info('got: %s', len(df) if not (df is None) else 'None') logger.info('proceed to next df') except Exception as e: @@ -94,23 +62,14 @@ def run_stream(): raise e logger.info("job finished") -# In[ ]: - - schedule = IntervalSchedule( - start_date=datetime(2020, 9, 5), - interval=timedelta(seconds=10), + interval=timedelta(seconds=30), ) storage = S3(bucket=S3_BUCKET) +#with Flow("covid-19 stream-single") as flow: #with Flow("covid-19 stream", storage=storage, schedule=schedule) as flow: -with Flow("covid-19 stream-single") as flow: +with Flow("covid-19 stream", schedule=schedule) as flow: run_stream() flow.run() - -# In[ ]: - - - - diff --git a/infra/pipelines/docker/search.sh b/infra/pipelines/docker/search.sh index cf7325c..64fd41d 100755 --- a/infra/pipelines/docker/search.sh +++ b/infra/pipelines/docker/search.sh @@ -1,3 +1,11 @@ #!/bin/bash +set -ex -docker-compose -f datastream-docker-compose.yml build && docker-compose -f datastream-docker-compose.yml up -d && docker-compose -f datastream-docker-compose.yml logs -f -t --tail=1 +FILE=${JOB_FILE:-search_by_date_job.py} +PROJECT=${PROJECT_NAME:-docker} + + +docker-compose -f datastream-docker-compose.yml build +JOB_FILE="search_by_date_job.py" docker-compose -f datastream-docker-compose.yml -p ${PROJECT} up -d data-stream +sleep 5 +docker-compose -f datastream-docker-compose.yml -p ${PROJECT} logs -f -t --tail=100 diff --git a/modules/TwintPool.py b/modules/TwintPool.py index e1abae9..71a0db6 100644 --- a/modules/TwintPool.py +++ b/modules/TwintPool.py @@ -32,6 +32,8 @@ def __init__(self, is_tor=False): def twint_loop(self, since, until, stride_sec=600, limit=None): def get_unix_time(time_str): + if isinstance(time_str, datetime): + return time_str return datetime.strptime(time_str, '%Y-%m-%d %H:%M:%S') since = get_unix_time(since)