Skip to content
This repository has been archived by the owner on Jan 3, 2023. It is now read-only.

Commit

Permalink
cleanup
Browse files Browse the repository at this point in the history
  • Loading branch information
amygdala committed Mar 7, 2015
1 parent 34aed68 commit 1d42201
Show file tree
Hide file tree
Showing 4 changed files with 13 additions and 52 deletions.
1 change: 1 addition & 0 deletions pubsub/pubsub-pipe-image/Dockerfile
@@ -1,5 +1,6 @@
FROM google/python

RUN pip install --upgrade pip
RUN pip install tweepy
RUN pip install --upgrade google-api-python-client
RUN pip install python-dateutil
Expand Down
32 changes: 6 additions & 26 deletions pubsub/pubsub-pipe-image/utils.py
Expand Up @@ -27,6 +27,8 @@

SCOPES = ['https://www.googleapis.com/auth/bigquery',
'https://www.googleapis.com/auth/pubsub']
NUM_RETRIES = 3



def get_credentials():
Expand All @@ -51,9 +53,9 @@ def create_pubsub_client(credentials):
return discovery.build('pubsub', 'v1beta2', http=http)


def flatten(l):
def flatten(lst):
"""Helper function used to massage the raw tweet data."""
for el in l:
for el in lst:
if (isinstance(el, collections.Iterable) and
not isinstance(el, basestring)):
for sub in flatten(el):
Expand Down Expand Up @@ -96,7 +98,6 @@ def cleanup(data):

def bq_data_insert(bigquery, project_id, dataset, table, tweets):
"""Insert a list of tweets into the given BigQuery table."""
WAIT = 2 # retry pause
try:
rowlist = []
# Generate the data that will be sent to BigQuery
Expand All @@ -107,29 +108,8 @@ def bq_data_insert(bigquery, project_id, dataset, table, tweets):
# Try the insertion.
response = bigquery.tabledata().insertAll(
projectId=project_id, datasetId=dataset,
tableId=table, body=body).execute()
tableId=table, body=body).execute(num_retries=NUM_RETRIES)
print "streaming response: %s" % response
# TODO: 'invalid field' errors can be detected here.
except Exception, e1:
# If an exception was thrown in making the insertion call, try again.
print e1
time.sleep(WAIT)
print "trying again."
try:
response = bigquery.tabledata().insertAll(
projectId=project_id, datasetId=dataset,
tableId=table, body=body).execute()
print "streaming response: %s" % response
except Exception:
time.sleep(WAIT * 2)
print "One more retry."
try:
# first refresh on the auth, as if there has been a long gap
# since we last obtained data, we may need to re-auth.
bigquery = create_bigquery_client()
response = bigquery.tabledata().insertAll(
projectId=project_id, datasetId=dataset,
tableId=table, body=body).execute()
print "streaming response: %s" % response
except Exception, e3:
print "Giving up: %s" % e3
print "Giving up: %s" % e1
1 change: 1 addition & 0 deletions redis/redis-pipe-image/Dockerfile
@@ -1,5 +1,6 @@
FROM google/python

RUN pip install --upgrade pip
RUN pip install tweepy
RUN pip install --upgrade google-api-python-client
RUN pip install redis
Expand Down
31 changes: 5 additions & 26 deletions redis/redis-pipe-image/utils.py
Expand Up @@ -26,6 +26,7 @@
from oauth2client.client import GoogleCredentials

BQ_SCOPES = ['https://www.googleapis.com/auth/bigquery']
NUM_RETRIES = 3


def create_bigquery_client():
Expand All @@ -38,9 +39,9 @@ def create_bigquery_client():
return discovery.build('bigquery', 'v2', http=http)


def flatten(l):
def flatten(lst):
"""Helper function used to massage the raw tweet data."""
for el in l:
for el in lst:
if (isinstance(el, collections.Iterable) and
not isinstance(el, basestring)):
for sub in flatten(el):
Expand Down Expand Up @@ -83,7 +84,6 @@ def cleanup(data):

def bq_data_insert(bigquery, project_id, dataset, table, tweets):
"""Insert a list of tweets into the given BigQuery table."""
WAIT = 2 # retry pause
try:
rowlist = []
# Generate the data that will be sent to BigQuery
Expand All @@ -94,29 +94,8 @@ def bq_data_insert(bigquery, project_id, dataset, table, tweets):
# Try the insertion.
response = bigquery.tabledata().insertAll(
projectId=project_id, datasetId=dataset,
tableId=table, body=body).execute()
tableId=table, body=body).execute(num_retries=NUM_RETRIES)
print "streaming response: %s" % response
# TODO: 'invalid field' errors can be detected here.
except Exception, e1:
# If an exception was thrown in making the insertion call, try again.
print e1
time.sleep(WAIT)
print "trying again."
try:
response = bigquery.tabledata().insertAll(
projectId=project_id, datasetId=dataset,
tableId=table, body=body).execute()
print "streaming response: %s" % response
except Exception:
time.sleep(WAIT * 2)
print "One more retry."
try:
# first refresh on the auth, as if there has been a long gap
# since we last obtained data, we may need to re-auth.
bigquery = create_bigquery_client()
response = bigquery.tabledata().insertAll(
projectId=project_id, datasetId=dataset,
tableId=table, body=body).execute()
print "streaming response: %s" % response
except Exception, e3:
print "Giving up: %s" % e3
print "Giving up: %s" % e1

0 comments on commit 1d42201

Please sign in to comment.