Skip to content
This repository has been archived by the owner on Jan 3, 2023. It is now read-only.

Commit

Permalink
Merge pull request #12 from GoogleCloudPlatform/pubsub
Browse files Browse the repository at this point in the history
Use GCR images, and updated README accordingly. Minor changes to logg…
  • Loading branch information
amygdala committed Sep 14, 2015
2 parents 7289ac2 + 2d586f2 commit da4fe64
Show file tree
Hide file tree
Showing 11 changed files with 64 additions and 26 deletions.
8 changes: 6 additions & 2 deletions pubsub/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -187,9 +187,13 @@ If you have cluster startup issues, double check that you have set your default

## Configure your app

Now you're ready to configure your app. This involves two things: building the Docker image used by the app, and editing two Kubernetes *replication controller* config files with your configuration information.
Now you're ready to configure your app. This involves two things: optionally building a Docker image to be used by the app, and editing two Kubernetes *replication controller* config files with your configuration information.

### Build and push a Docker image for your app
### Optional: Build and push a Docker image for your app

If you like, you can use the prebuilt docker image, `gcr.io/google-samples/pubsub-bq-pipe:v1`, for your app. This is the image used by default in the `bigquery-controller.yaml` and `twitter-stream.yaml` files.

Follow the instructions below if you'd like to build and use your own image instead.

This Kubernetes app uses a [Docker](https://www.docker.com/) image that runs the app's python scripts. (An environment variable set in the replication controller specification files, `PROCESSINGSCRIPT`, indicates which script to run). Once the image is built, it needs to be pushed somewhere that Kubernetes can access it. For this example, we'll use the new [Google Container
Registry](https://cloud.google.com/tools/container-registry/) (GCR), in Beta. It uses a Google Cloud Storage bucket in your own project to store the images, for privacy and low latency. The GCR [docs](https://cloud.google.com/tools/container-registry/) provide more information on GCR and how to push images to it. You can also push your
Expand Down
4 changes: 2 additions & 2 deletions pubsub/bigquery-controller.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -5,15 +5,15 @@ metadata:
labels:
name: bigquery-controller
spec:
replicas: 1
replicas: 2
template:
metadata:
labels:
name: bigquery-controller
spec:
containers:
- name: bigquery
image: gcr.io/your-project/your-image
image: gcr.io/google-samples/pubsub-bq-pipe:v1
env:
- name: PROCESSINGSCRIPT
value: pubsub-to-bigquery
Expand Down
13 changes: 10 additions & 3 deletions pubsub/pubsub-pipe-image/pubsub-to-bigquery.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
"""

import base64
import datetime
import json
import os
import time
Expand Down Expand Up @@ -92,7 +93,9 @@ def write_to_bq(pubsub, sub_name, bigquery):
WAIT = 2
tweet = None
mtweet = None
while True:
count = 0
count_max = 50000
while count < count_max:
while len(tweets) < CHUNK:
twmessages = pull_messages(pubsub, PROJECT_ID, sub_name)
if twmessages:
Expand All @@ -108,16 +111,19 @@ def write_to_bq(pubsub, sub_name, bigquery):
if 'delete' in mtweet:
continue
if 'limit' in mtweet:
print mtweet
continue
tweets.append(mtweet)
else:
# pause before checking again
print 'sleeping...'
time.sleep(WAIT)
utils.bq_data_insert(bigquery, PROJECT_ID, os.environ['BQ_DATASET'],
response = utils.bq_data_insert(bigquery, PROJECT_ID, os.environ['BQ_DATASET'],
os.environ['BQ_TABLE'], tweets)
tweets = []
count += 1
if count % 25 == 0:
print ("processing count: %s of %s at %s: %s" %
(count, count_max, datetime.datetime.now(), response))


if __name__ == '__main__':
Expand All @@ -134,3 +140,4 @@ def write_to_bq(pubsub, sub_name, bigquery):
except Exception, e:
print e
write_to_bq(pubsub, sub_name, bigquery)
print 'exited write loop'
3 changes: 2 additions & 1 deletion pubsub/pubsub-pipe-image/twitter-to-pubsub.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
"""

import base64
import datetime
import os
from tweepy import OAuthHandler
from tweepy import Stream
Expand Down Expand Up @@ -77,7 +78,7 @@ def on_data(self, data):
if self.count > self.total_tweets:
return False
if (self.count % 1000) == 0:
print 'count is: %s' % self.count
print 'count is: %s at %s' % (self.count, datetime.datetime.now())
return True

def on_error(self, status):
Expand Down
5 changes: 3 additions & 2 deletions pubsub/pubsub-pipe-image/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ def cleanup(data):
# temporarily, ignore some fields not supported by the
# current BQ schema.
# TODO: update BigQuery schema
elif k == 'video_info' or k == 'scopes' or 'quoted_status' in k:
elif k == 'video_info' or k == 'scopes' or k == 'withheld_in_countries' or 'quoted_status' in k:
pass
else:
if k and v:
Expand Down Expand Up @@ -110,7 +110,8 @@ def bq_data_insert(bigquery, project_id, dataset, table, tweets):
response = bigquery.tabledata().insertAll(
projectId=project_id, datasetId=dataset,
tableId=table, body=body).execute(num_retries=NUM_RETRIES)
print "streaming response: %s %s" % (datetime.datetime.now(), response)
# print "streaming response: %s %s" % (datetime.datetime.now(), response)
return response
# TODO: 'invalid field' errors can be detected here.
except Exception, e1:
print "Giving up: %s" % e1
2 changes: 1 addition & 1 deletion pubsub/twitter-stream.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ spec:
spec:
containers:
- name: twitter-to-pubsub
image: gcr.io/your-project/your-image
image: gcr.io/google-samples/pubsub-bq-pipe:v1
env:
- name: PROCESSINGSCRIPT
value: twitter-to-pubsub
Expand Down
2 changes: 1 addition & 1 deletion redis/bigquery-controller.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ spec:
spec:
containers:
- name: bigquery
image: gcr.io/your-project/your-image
image: gcr.io/google-samples/redis-bq-pipe:v1
env:
- name: PROCESSINGSCRIPT
value: redis-to-bigquery
Expand Down
35 changes: 26 additions & 9 deletions redis/redis-pipe-image/redis-to-bigquery.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
"""This script grabs tweets from a redis server, and stores them in BiqQuery
using the BigQuery Streaming API.
"""

import datetime
import json
import os

Expand All @@ -41,19 +41,33 @@ def write_to_bq(bigquery):
"""Write the data to BigQuery in small chunks."""
tweets = []
CHUNK = 50 # The size of the BigQuery insertion batch.
twstring = ''
tweet = None
mtweet = None
while True:
count = 0
count_max = 50000
redis_errors = 0
allowed_redis_errors = 3
while count < count_max:
while len(tweets) < CHUNK:
# We'll use a blocking list pop -- it returns when there is
# new data.
res = r.brpop(REDIS_LIST)
twstring = res[1]
res = None
try:
res = r.brpop(REDIS_LIST)
except:
print 'Problem getting data from Redis.'
redis_errors += 1
if redis_errors > allowed_redis_errors:
print "Too many redis errors: exiting."
return
continue
try:
tweet = json.loads(res[1])
except Exception, bqe:
print bqe
except Exception, e:
print e
if redis_errors > allowed_redis_errors:
print "Too many redis errors: exiting."
return
continue
# First do some massaging of the raw data
mtweet = utils.cleanup(tweet)
Expand All @@ -62,13 +76,16 @@ def write_to_bq(bigquery):
if 'delete' in mtweet:
continue
if 'limit' in mtweet:
print mtweet
continue
tweets.append(mtweet)
# try to insert the tweets into bigquery
utils.bq_data_insert(bigquery, PROJECT_ID, os.environ['BQ_DATASET'],
response = utils.bq_data_insert(bigquery, PROJECT_ID, os.environ['BQ_DATASET'],
os.environ['BQ_TABLE'], tweets)
tweets = []
count += 1
if count % 25 == 0:
print ("processing count: %s of %s at %s: %s" %
(count, count_max, datetime.datetime.now(), response))


if __name__ == '__main__':
Expand Down
11 changes: 9 additions & 2 deletions redis/redis-pipe-image/twitter-to-redis.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
to pull in tweets and store them in a Redis server.
"""

import datetime
import os

import redis
Expand Down Expand Up @@ -44,6 +45,8 @@ class StdOutListener(StreamListener):
"""

count = 0
redis_errors = 0
allowed_redis_errors = 3
twstring = ''
tweets = []
r = redis.StrictRedis(host=REDIS_HOST, port=REDIS_PORT, db=0)
Expand All @@ -53,7 +56,8 @@ def write_to_redis(self, tw):
try:
self.r.lpush(REDIS_LIST, tw)
except:
print 'Problem adding sensor data to Redis.'
print 'Problem adding data to Redis.'
self.redis_errors += 1

def on_data(self, data):
"""What to do when tweet data is received."""
Expand All @@ -65,8 +69,11 @@ def on_data(self, data):
# that happens.
if self.count > self.total_tweets:
return False
if self.redis_errors > self.allowed_redis_errors:
print 'too many redis errors.'
return False
if (self.count % 1000) == 0:
print 'count is: %s' % self.count
print 'count is: %s at %s' % (self.count, datetime.datetime.now())
return True

def on_error(self, status):
Expand Down
5 changes: 3 additions & 2 deletions redis/redis-pipe-image/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ def cleanup(data):
# temporarily, ignore some fields not supported by the
# current BQ schema.
# TODO: update BigQuery schema
elif k == 'video_info' or k == 'scopes' or 'quoted_status' in k:
elif k == 'video_info' or k == 'scopes' or k == 'withheld_in_countries' or 'quoted_status' in k:
pass
else:
if k and v:
Expand Down Expand Up @@ -96,7 +96,8 @@ def bq_data_insert(bigquery, project_id, dataset, table, tweets):
response = bigquery.tabledata().insertAll(
projectId=project_id, datasetId=dataset,
tableId=table, body=body).execute(num_retries=NUM_RETRIES)
print "streaming response: %s %s" % (datetime.datetime.now(), response)
# print "streaming response: %s %s" % (datetime.datetime.now(), response)
return response
# TODO: 'invalid field' errors can be detected here.
except Exception, e1:
print "Giving up: %s" % e1
2 changes: 1 addition & 1 deletion redis/twitter-stream.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ spec:
spec:
containers:
- name: twitter-to-redis
image: gcr.io/your-project/your-image
image: gcr.io/google-samples/redis-bq-pipe:v1
env:
- name: PROCESSINGSCRIPT
value: twitter-to-redis
Expand Down

0 comments on commit da4fe64

Please sign in to comment.