Skip to content

Commit

Permalink
feat(parameterized flows)
Browse files Browse the repository at this point in the history
  • Loading branch information
lmeyerov committed Oct 16, 2020
1 parent 8454f55 commit dc9a0d7
Show file tree
Hide file tree
Showing 6 changed files with 36 additions and 57 deletions.
3 changes: 3 additions & 0 deletions infra/pipelines/docker/datastream-Dockerfile
Expand Up @@ -18,6 +18,9 @@ RUN echo "ok6" && pip install git+https://github.com/TheDataRideAlongs/twint.git

COPY ./modules /app/ProjectDomino
COPY ./infra/pipelines/docker/jobs /app
COPY ./infra/pipelines/docker/datastream-entrypoint.sh /entrypoint.sh

ENTRYPOINT ["/entrypoint.sh"]

HEALTHCHECK --interval=60s --timeout=15s --start-period=20s \
CMD curl -sf --socks5-hostname localhost:9050 https://check.torproject.org | grep Congrat
Expand Down
1 change: 1 addition & 0 deletions infra/pipelines/docker/datastream-docker-compose.yml
Expand Up @@ -20,6 +20,7 @@ services:
volumes:
- /home/codywebb/ProjectDomino/infra/pipelines/docker/jobs/neo4jcreds.json:/secrets/neo4jcreds.json:ro
environment:
JOB_FILE: search_by_date_job.py
PREFECT__SERVER__HOST: ${PREFECT__SERVER__HOST:-http://host.docker.internal}
PREFECT__SERVER__PORT: ${PREFECT__SERVER__PORT:-4200}
PREFECT__SERVER__UI__HOST: ${PREFECT__SERVER__UI__HOST:-http://host.docker.internal}
Expand Down
6 changes: 6 additions & 0 deletions infra/pipelines/docker/datastream-entrypoint.sh
@@ -0,0 +1,6 @@
#!/bin/bash
set -ex

service tor start

python3 /app/${JOB_FILE} $@
71 changes: 15 additions & 56 deletions infra/pipelines/docker/jobs/search_by_date_job.py
@@ -1,71 +1,33 @@
#!/usr/bin/env python
# coding: utf-8

# In[26]:





# In[27]:


import logging
logger = logging.getLogger()
logger.setLevel(logging.INFO) #DEBUG, INFO, WARNING, ERROR, CRITICAL




# In[28]:


import json, pandas as pd
from ProjectDomino.Neo4jDataAccess import Neo4jDataAccess
from ProjectDomino.FirehoseJob import FirehoseJob
from ProjectDomino.TwintPool import TwintPool
from prefect.environments.storage import S3
from prefect import Flow,task
from prefect import context, Flow, task
from prefect.schedules import IntervalSchedule
from datetime import timedelta, datetime
from random import randrange
from prefect.engine.executors import DaskExecutor
import time
import random


# In[29]:





# In[30]:


S3_BUCKET = "wzy-project-domino"


# In[31]:


pd.set_option('display.max_colwidth', None)
pd.set_option('display.max_rows', 500)
pd.set_option('display.max_columns', 500)
pd.set_option('display.width', 1000)


# ## task

# In[33]:


def random_date(start, end):
delta = end - start
int_delta = (delta.days * 24 * 60 * 60) + delta.seconds
random_second = randrange(int_delta)
return start + timedelta(seconds=random_second)

def get_creds():
neo4j_creds = None
with open('/secrets/neo4jcreds.json') as json_file:
Expand All @@ -75,42 +37,39 @@ def get_creds():
@task(log_stdout=True, skip_on_upstream_skip=True)
def run_stream():
creds = get_creds()
start = datetime.strptime("2020-03-11 20:00:00", "%Y-%m-%d %H:%M:%S")
current = datetime.strptime(datetime.now().strftime("%Y-%m-%d %H:%M:%S"), "%Y-%m-%d %H:%M:%S")
#rand_dt=random_date(start, current)

start = context.scheduled_start_time - timedelta(seconds=60)
current = start + timedelta(seconds=30)
#start = datetime.strptime("2020-10-06 22:10:00", "%Y-%m-%d %H:%M:%S")
#current = datetime.strptime("2020-10-10 16:08:00", "%Y-%m-%d %H:%M:%S")
#current = datetime.strptime(datetime.now().strftime("%Y-%m-%d %H:%M:%S"), "%Y-%m-%d %H:%M:%S")
#2020-10-10 16:07:30
#2020-10-11 06:29:00 to 2020-10-11 06:29:30:
#2020-10-11 06:29:00 to 2020-10-11 06:29:30:
#2020-10-11 18:45:00 to 2020-10-11 18:45:30:
#2020-10-05 17:00:30-2020-10-05 17:01:00
# 2020-10-06 22:10:00 to 2020-10-06 22:10:30:
tp = TwintPool(is_tor=True)
fh = FirehoseJob(neo4j_creds=creds, PARQUET_SAMPLE_RATE_TIME_S=30, save_to_neo=True, writers={})
try:
search = "covid OR corona OR virus OR pandemic"
job_name = "covid multi test"
limit = 10000000
for df in fh.search_time_range(tp=tp, Search=search, Since=str(start), Until=str(current), job_name=job_name, Limit=10000000, stride_sec=30):
for df in fh.search_time_range(tp=tp, Search=search, Since=datetime.strftime(start, "%Y-%m-%d %H:%M:%S"), Until=datetime.strftime(current, "%Y-%m-%d %H:%M:%S"), job_name=job_name, Limit=10000000, stride_sec=30):
logger.info('got: %s', len(df) if not (df is None) else 'None')
logger.info('proceed to next df')
except Exception as e:
logger.error("job exception", exc_info=True)
raise e
logger.info("job finished")

# In[ ]:


schedule = IntervalSchedule(
start_date=datetime(2020, 9, 5),
interval=timedelta(seconds=10),
interval=timedelta(seconds=30),
)
storage = S3(bucket=S3_BUCKET)

#with Flow("covid-19 stream-single") as flow:
#with Flow("covid-19 stream", storage=storage, schedule=schedule) as flow:
with Flow("covid-19 stream-single") as flow:
with Flow("covid-19 stream", schedule=schedule) as flow:
run_stream()
flow.run()


# In[ ]:




10 changes: 9 additions & 1 deletion infra/pipelines/docker/search.sh
@@ -1,3 +1,11 @@
#!/bin/bash
set -ex

docker-compose -f datastream-docker-compose.yml build && docker-compose -f datastream-docker-compose.yml up -d && docker-compose -f datastream-docker-compose.yml logs -f -t --tail=1
FILE=${JOB_FILE:-search_by_date_job.py}
PROJECT=${PROJECT_NAME:-docker}


docker-compose -f datastream-docker-compose.yml build
JOB_FILE="search_by_date_job.py" docker-compose -f datastream-docker-compose.yml -p ${PROJECT} up -d data-stream
sleep 5
docker-compose -f datastream-docker-compose.yml -p ${PROJECT} logs -f -t --tail=100
2 changes: 2 additions & 0 deletions modules/TwintPool.py
Expand Up @@ -32,6 +32,8 @@ def __init__(self, is_tor=False):

def twint_loop(self, since, until, stride_sec=600, limit=None):
def get_unix_time(time_str):
if isinstance(time_str, datetime):
return time_str
return datetime.strptime(time_str, '%Y-%m-%d %H:%M:%S')

since = get_unix_time(since)
Expand Down

0 comments on commit dc9a0d7

Please sign in to comment.