In [1]:
import apache_beam as beam
import glob
import os
import sys

import pandas as pd
import numpy as np
import google.auth

from typing import Dict
from datetime import datetime
from apache_beam import pvalue
from apache_beam.options import pipeline_options
from apache_beam.options.pipeline_options import GoogleCloudOptions, StandardOptions
from apache_beam.runners import DataflowRunner

from google.cloud import bigquery
from google.cloud import language
from apache_beam.ml.gcp import naturallanguageml as nlp

import apache_beam.runners.interactive.interactive_beam as ib
from apache_beam.runners import DataflowRunner
from apache_beam.runners import DirectRunner


from apache_beam import DoFn, GroupByKey, io, ParDo, Pipeline, PTransform, WindowInto, WithKeys

In [2]:
# Setting up the Apache Beam pipeline options.
options = pipeline_options.PipelineOptions(streaming=True, save_main_session=True)

# Sets the project to the default project in your current Google Cloud environment.
_, options.view_as(GoogleCloudOptions).project = google.auth.default()

# Sets the Google Cloud Region in which Cloud Dataflow runs.
options.view_as(GoogleCloudOptions).region = 'us-east1'

options.view_as(GoogleCloudOptions).job_name = f'sa-{datetime.now().strftime("%Y%m%d-%H%M%S")}'

dataflow_gcs_location = f'gs://text-analysis-323506/{options.view_as(GoogleCloudOptions).job_name}'

# The directory to store the output files of the job.
output_gcs_location = f"{dataflow_gcs_location}/output"

# Dataflow Staging Location. This location is used to stage the Dataflow Pipeline and SDK binary.
options.view_as(GoogleCloudOptions).staging_location = f"{dataflow_gcs_location}/staging"

# Dataflow Temp Location. This location is used to store temporary files or intermediate results before finally outputting to the sink.
options.view_as(GoogleCloudOptions).temp_location = f"{dataflow_gcs_location}/temp"

In [3]:
def load_tweet(message: bytes):
    import json
    message = json.loads(message.decode("utf-8"))
    
    return message

In [4]:
def preprocess_tweet(message: Dict):
    from apache_beam.ml.gcp import naturallanguageml as nlp
    import re
    import string

    line = message['content']
    
    # Remove extra spaces, hastags and new line characters
    line = line.strip()
    line = line.replace('\n','')
    line = line.replace('\\','')
    line = line.replace('#','')
    line = ' '.join(line.split())
    
    # Remove @ mentions and URLs
    line = re.sub(r"(?:\@|http?\://|https?\://|www)\S+", "", line)
    line = " ".join(line.split())
    
    # Expanding short forms
    contraction_dict = {"ain't": "are not", "'s":" is", "aren't": "are not", "don't": "do not", "didn't": "did not", "won't": "will not", 
                   "can't": "cannot"}
    
    words = line.split()
    for i in range(len(words)):
        if words[i] in contraction_dict:
            words[i] = contraction_dict[words[i]]
    line = ' '.join(words)
    
    # Remove special characters
    line = re.sub('[-+.^:,]','',line)
    
    message['preprocessed'] = line
    return message

In [5]:
client = language.LanguageServiceClient()

def detect_sentiments(message: Dict):
    global client
    line = message['preprocessed']
    line = nlp.Document(line, type='PLAIN_TEXT')
    
    try:
        message['response'] = client.analyze_sentiment(document={'content': line.content, 'type': line.type})
    except Exception:
        message['response'] = None
        
    return message

In [6]:
def prepare_results(message):
    import json
    
    response = message['response']
    
    if response:
        message['score'] = response.document_sentiment.score
        message['sentiment'] = 'hate' if message['score'] < -0.25 else 'normal'
    else:
        message['score'] = np.nan
        message['sentiment'] = 'NA'
        
    del message['preprocessed']
    del message['response']
    
    return message

In [7]:
# Create pipeline object
pipeline = beam.Pipeline(options=options)

### Build Pipeline

In [8]:
# Get tweet sentiments
results =  (
        pipeline
        | 'Consume Messages' >> io.gcp.pubsub.ReadFromPubSub(topic='projects/text-analysis-323506/topics/tweets')
        | 'Load Tweets' >> beam.Map(load_tweet)
        | 'Preprocess Tweets' >> beam.Map(preprocess_tweet)
        | 'Detect Sentiments' >> beam.Map(detect_sentiments)
        | 'Prepare Results' >> beam.Map(prepare_results)
    )

### Separate Results into Hate speech or Normal speech

In [9]:
class ResultsFilter(beam.DoFn):
    
    OUTPUT_TAG_HATE = 'Hate speech'
    OUTPUT_TAG_NORM = 'Normal speech'
    
    def process(self, result):
        import json
        sentiment = result['sentiment']
        
        if sentiment == 'hate':
            yield pvalue.TaggedOutput(ResultsFilter.OUTPUT_TAG_HATE, result)
        else:
            yield pvalue.TaggedOutput(ResultsFilter.OUTPUT_TAG_NORM, result)
            
separated_results = (results | beam.ParDo(ResultsFilter()).with_outputs(ResultsFilter.OUTPUT_TAG_HATE, ResultsFilter.OUTPUT_TAG_NORM))

### Results to pubsub
In this example we are sending only hate speech results to result pubsub topic.

In [10]:
def convert_to_bytes(result):
    import json
    return json.dumps(result).encode("utf-8")

# Hate speech results to PubSub topic
hate_speech_pubsub = (
                    separated_results[ResultsFilter.OUTPUT_TAG_HATE]
                    | 'Bytes Conversion' >> beam.Map(convert_to_bytes)
                    | 'To PubSub Topic' >> beam.io.WriteToPubSub(topic='projects/text-analysis-323506/topics/sa-results')
                )

### Results to Bigquery

We will send normal speech and hate speech to separate tables. These can then be used for analysis.

In [11]:
schema = [
        bigquery.SchemaField("datetime", "TIMESTAMP", mode="REQUIRED"),
        bigquery.SchemaField("id", "BIGNUMERIC", mode="REQUIRED"),
        bigquery.SchemaField("username", "STRING", mode="REQUIRED"),
        bigquery.SchemaField("content", "STRING", mode="REQUIRED"),
        bigquery.SchemaField("score", "FLOAT", mode="REQUIRED"),
        bigquery.SchemaField("sentiment", "STRING", mode="REQUIRED")
    ]

hate_speech_bq = (
                separated_results[ResultsFilter.OUTPUT_TAG_HATE]
                | 'To hate speech table' >> beam.io.WriteToBigQuery(table='hate_speeches', dataset='tweets_analysis', project='text-analysis-323506')
            )

normal_speech_bq = (
                separated_results[ResultsFilter.OUTPUT_TAG_NORM]
                | 'To norm speech table' >> beam.io.WriteToBigQuery(table='normal_speeches', dataset='tweets_analysis', project='text-analysis-323506')
            )

  is_streaming_pipeline = p.options.view_as(StandardOptions).streaming


### Direct Runner

In [None]:
pipeline_result = DirectRunner().run_pipeline(pipeline, options=options).wait_until_finish()

### DataFlow Runner

In [None]:
pipeline_result = DataflowRunner().run_pipeline(pipeline, options=options)