In [None]:
!pip install --upgrade google-cloud-bigquery

# Tweets analyzer

In [None]:
import json
from datetime import datetime
from concurrent.futures import TimeoutError

from google.cloud import pubsub_v1
from google.cloud import bigquery

from nltk.tokenize import TweetTokenizer

In [None]:
DATE_FORMAT = '%a %b %d %H:%M:%S +0000 %Y'

In [None]:
# https://www.nltk.org/_modules/nltk/tokenize/casual.html
tokenizer = TweetTokenizer(preserve_case = False, reduce_len = True)

### Utils functions

In [None]:
# Function to be called when a new message is received
def callback(message):
    data = json.loads(message.data.decode('utf-8'))
    
    # Extracting entities
    entities = extract_entities(data['full_text'])
    
    # Appending datetime of creation
    created_at = datetime.strptime(data['created_at'], DATE_FORMAT).strftime('%Y-%m-%d %H:%M:%S.%f')
    entities = [ dict(e, **{ 'created_at': created_at }) for e in entities ]
    print(entities)
    
    # Loading entity records to BigQuery
    if len(entities) > 0:
        errors = bigquery_client.insert_rows_json(TABLE, entities)
        if not (errors == []):
            print("Encountered errors while inserting rows into BigQuery: {}".format(errors))
    
    # ACK message
    message.ack()

In [None]:
# Function for analyzing the tweet and extract hashtags and mentions
def extract_entities(text):
    hashtags = [t for t in tokenizer.tokenize(text) if (t.startswith('#') and len(t) > 1)]
    mentions = [t for t in tokenizer.tokenize(text) if (t.startswith('@') and len(t) > 1)]
    
    return [ { 'entity': 'hashtag', 'value': h } for h in hashtags ] + [ { 'entity': 'mention', 'value': m } for m in mentions ]

### Connecting to Google Cloud Pub/Sub

In [None]:
PROJECT_ID = 'unbosque'
SUBSCRIPTION = 'new-tweet-notify-sub'

In [None]:
# Creating the Pub/Sub subscriber client
subscriber_client = pubsub_v1.SubscriberClient.from_service_account_json('unbosque.json')

In [None]:
# Instantiating the subscription path
subscription_path = subscriber_client.subscription_path(PROJECT_ID, SUBSCRIPTION)

In [None]:
# Initializing the streaming pull for the subscription
streaming_pull_future = subscriber_client.subscribe(subscription_path, callback = callback)

### Connecting to Google Cloud BigQuery

In [None]:
TABLE = 'unbosque.trends.history'

In [None]:
# Creating the BigQuery client
bigquery_client = bigquery.Client.from_service_account_json('unbosque.json')

### Listening for new messages

In [None]:
with subscriber_client:
    try:
        # Streaming data
        streaming_pull_future.result()
    except TimeoutError:
        streaming_pull_future.cancel()  # Trigger the shutdown
        streaming_pull_future.result()  # Block until the shutdown is complete