# Twitter Sentiment Analysis Part 2
## Enrich the data with Sentiment and most relevant extracted entity 

In [38]:
# uncomment and run the line below to install tweepy if needed
# !pip install tweepy

## Set up twitter authentication
Make sure to fill in the tokens below before running this cell

In [1]:
from tweepy import OAuthHandler

# Go to http://apps.twitter.com and create an app.
# The consumer key and secret will be generated for you after
consumer_key="XXXXX"
consumer_secret="XXXXX"

# After the step above, you will be redirected to your app's page.
# Create an access token under the the "Your access token" section
access_token="XXXXX"
access_token_secret="XXXXX"

auth = OAuthHandler(consumer_key, consumer_secret)
auth.set_access_token(access_token, access_token_secret)

# Set up a Natural Language Understanding client instance

In [2]:
from watson_developer_cloud import NaturalLanguageUnderstandingV1
from watson_developer_cloud.natural_language_understanding_v1 import Features, SentimentOptions, EntitiesOptions

nlu = NaturalLanguageUnderstandingV1(
    version='2017-02-27',
    username='XXXXX',
    password='XXXXX'
)

## Create the Twitter Stream

In [3]:
from tweepy.streaming import StreamListener
from pixiedust.utils import Logger
from tweepy import Stream
from six import iteritems
import json
import csv
import shutil
from bs4 import BeautifulSoup as BS
from pyspark.sql.types import StructType, StructField, StringType, DateType

def ensure_dir(dir, delete_tree = False):
    if not os.path.exists(dir):
        os.makedirs(dir)
    elif delete_tree:
        shutil.rmtree(dir)
        os.makedirs(dir)
    return os.path.abspath(dir)

root_dir = ensure_dir("output", delete_tree = True)
output_dir = ensure_dir(os.path.join(root_dir, "raw"))
field_metadata = [
    {"name": "created_at","type": DateType()},
    {"name": "text", "type": StringType()},
    {"name": "source", "type": StringType(), 
         "transform": lambda s: BS(s, "html.parser").text.strip()
    },
    {"name": "sentiment", "type": StringType()},
    {"name": "entity", "type": StringType()},
    {"name": "entity_type", "type": StringType()}
]
fieldnames = [f["name"] for f in field_metadata]
transforms = { 
    item['name']:item['transform'] for item in field_metadata if "transform" in item
}

@Logger()
class RawTweetsListener(StreamListener):
    def __init__(self):
        self.buffered_data = []
        self.counter = 0

    def flush_buffer_if_needed(self):
        "Check the buffer capacity and write to a new file if needed"
        length = len(self.buffered_data)
        if length > 0 and length % 10 == 0:
            with open(os.path.join( output_dir, "tweets{}.csv".format(self.counter)), "w") as fs:
                self.counter += 1
                csv_writer = csv.DictWriter( fs, fieldnames = fieldnames)
                for data in self.buffered_data:
                    csv_writer.writerow(data)
            self.buffered_data = []
            
    def enrich(self, data):
        try:
            response = nlu.analyze( 
                text = data['text'],
                features=Features(sentiment=SentimentOptions(), entities=EntitiesOptions())
            )
            data["sentiment"] = response["sentiment"]["document"]["label"]
            top_entity = response["entities"][0] if len(response["entities"]) > 0 else None
            data["entity"] = top_entity["text"] if top_entity is not None else ""
            data["entity_type"] = top_entity["type"] if top_entity is not None else ""
            return data
        except Exception as e:
            self.warn("Error from Watson service while enriching data: {}".format(e))

    def on_data(self, data):
        def transform(key, value):
            return transforms[key](value) if key in transforms else value
        data = self.enrich(json.loads(data))
        if data is not None:
            self.buffered_data.append(
                {key:transform(key,value) \
                     for key,value in iteritems(data) \
                     if key in fieldnames}
            )
            self.flush_buffer_if_needed()
        return True

    def on_error(self, status):
        print("An error occured while receiving streaming data: {}".format(status))
        return False

Pixiedust database opened successfully


In [4]:
def start_stream(queries):
    "Asynchronously start a new Twitter stream"
    stream = Stream(auth, RawTweetsListener())
    stream.filter(track=queries, async=True)
    return stream

## Start a new Twitter stream filtered by the keyword "baseball"

In [5]:
stream = start_stream(["baseball"])

In [6]:
# uncomment this line to stop the twitter stream
#stream.disconnect()

## Create a Spark Streaming DataFrame

In [7]:
schema = StructType(
    [StructField(f["name"], f["type"], True) for f in field_metadata]
)
csv_sdf = spark.readStream \
    .csv(
        output_dir,
        schema=schema,
        multiLine = True,
        dateFormat = 'EEE MMM dd kk:mm:ss Z y',
        ignoreTrailingWhiteSpace = True,
        ignoreLeadingWhiteSpace = True
    )
csv_sdf.isStreaming

True

In [8]:
csv_sdf.printSchema()

root
 |-- created_at: date (nullable = true)
 |-- text: string (nullable = true)
 |-- source: string (nullable = true)
 |-- sentiment: string (nullable = true)
 |-- entity: string (nullable = true)
 |-- entity_type: string (nullable = true)



## Create an Run Spark Structured Queries
1. Print to console for debugging
2. Write to parquet database

In [9]:
console_streaming_query = csv_sdf.writeStream\
  .outputMode("append")\
  .format("console")\
  .trigger(processingTime='2 seconds')\
  .start()

In [10]:
# uncomment to stop the structured query
#console_streaming_query.stop()

In [11]:
tweet_streaming_query = csv_sdf \
  .writeStream \
  .format("parquet") \
  .option("path", os.path.join(root_dir, "output_parquet")) \
  .trigger(processingTime="2 seconds") \
  .option("checkpointLocation", os.path.join(root_dir, "output_chkpt")) \
  .start()

In [12]:
# uncomment to stop the structured query
#tweet_streaming_query.stop()

## Monitoring the Streaming Queries

In [13]:
print(spark.streams.active)

[<pyspark.sql.streaming.StreamingQuery object at 0x135e4d978>, <pyspark.sql.streaming.StreamingQuery object at 0x135e4d4a8>]


In [14]:
import json
for query in spark.streams.active:
    print("-----------")
    print("id: {}".format(query.id))
    print(json.dumps(query.lastProgress, indent=2, sort_keys=True))

-----------
id: fe2cf4e9-add1-49e9-ac70-44e1db9dc62a
{
  "batchId": 2,
  "durationMs": {
    "getOffset": 3,
    "triggerExecution": 4
  },
  "id": "fe2cf4e9-add1-49e9-ac70-44e1db9dc62a",
  "inputRowsPerSecond": 0.0,
  "name": null,
  "numInputRows": 0,
  "processedRowsPerSecond": 0.0,
  "runId": "221ef433-1b6a-4ad3-951f-da66eba6e1be",
  "sink": {
    "description": "FileSink[/Users/dtaieb/cdsdev/notebookdev/Pixiedust/book/Chapter7/output/output_parquet]"
  },
  "sources": [
    {
      "description": "FileStreamSource[file:/Users/dtaieb/cdsdev/notebookdev/Pixiedust/book/Chapter7/output/raw]",
      "endOffset": {
        "logOffset": 1
      },
      "inputRowsPerSecond": 0.0,
      "numInputRows": 0,
      "processedRowsPerSecond": 0.0,
      "startOffset": {
        "logOffset": 1
      }
    }
  ],
  "stateOperators": [],
  "timestamp": "2018-04-14T17:41:08.000Z"
}
-----------
id: caa738e6-934e-4bd1-b707-0df728a5f3fe
{
  "batchId": 2,
  "durationMs": {
    "getOffset": 2,
    "trig



## Creating a batch DataFrame from the parquet files

In [17]:
parquet_batch_df = spark.sql("select * from parquet.`{}`".format(os.path.join(root_dir, "output_parquet")))

## Display the data using PixieDust

In [19]:
display(parquet_batch_df)