<div><img src="https://www.ibm.com/blogs/bluemix/wp-content/uploads/2017/02/NLU.png", width=270, height=270, align = 'right'> 

<img src="https://upload.wikimedia.org/wikipedia/commons/thumb/5/51/IBM_logo.svg/640px-IBM_logo.svg.png", width = 90, height = 90, align = 'right', style="margin:0px 25px"></div>

# Extract Insights from Social Media with Watson Developer Cloud and Data Science Experience

___________

## Table of Contents

1.  [Load Required Libraries](#loadlibraries)
2.  [Load Data from DashDB](#loaddata)
3.  [Exploratory Data Analysis](#exploredata)
4.  [Take a Sample of Data](#takesample)
5.  [Read Credentials for NLU, Personality Insights, and Twitter](#getcredentials)
6.  [Enrich Data with Watson NLU](#enrichnlu)
7.  [Visualizing Sentiment and Keywords](#sentiment)
8.  [Enrich Data with Watson Personality Insights](#enrichpi)
9.  [Spark Machine Learning for User Segmentation](#sparkml)
10. [Visualizing User Segmentation](#clusters)

__________________

<a id="loadlibraries"> </a>
### Step 1: Load required libraries

**watson-developer-cloud** is the Python SDK for Watson Developer Cloud services: https://github.com/watson-developer-cloud/python-sdk

**tweepy** is a Python library for accessing Twitter API: http://www.tweepy.org/

**wordcloud** is a Python library for generating Word Clouds: https://github.com/amueller/word_cloud/

**plotly** is a Python library for making plots and charts: https://pypi.python.org/pypi/plotly

In [None]:
!pip install --upgrade watson-developer-cloud
!pip install tweepy==3.3.0
!pip install plotly --upgrade
!pip install wordcloud

________________

<a id="loaddata"> </a>
## Step 2: Load data from dashDB

First step is to load the data. This notebook assumes you have tweets already in a dashDB database. For more details on how to collect relevant tweets into a dashDB database, check out [this](https://github.com/joe4k/twitterstreams) github repository.

If you're familiar with how creating data connections, you can skip this step.

1. Click on **Data Services** tab and select **Connections**.
2. Click **Create new connection** (+ sign)
3. Provide a connection name (dashdbsoftdrinktweets) and select **Data Service** for **Service Category**.
4. Select the dashDB service instance where the tweets are stored.
5. Select **BLUDB** database and press **Create**.

Next you need to make the Data Connection we created accessible to your project. 
* Navigate to your project.
* Click the **Find and Add Data** icon (top right).
* Click the **Connections** tab.
* Select the `dashdbsoftdrinktweets` connection.
* Click **Apply** ==> This makes that data connection accessible to your project.

Lastly, you need to add code to your notebook to read the data from dashDB.
* Create a new cell in your notebook.
* Click the **Find and Add Data** icon.
* Click the **Connections** tab.
* Find `dashdbsoftdrinktweets` connection and click on **Insert to code** under `dashdbsoftdrinktweets` connection.
This adds code into the notebook to load data from dashDB database. 

Note that you would select a specific table to load. In our case, we load **DSX_CLOUDANT_SOFTDRINK_TWEETS**.

In [None]:
# The code was removed by DSX for sharing.

In [None]:
# copy data into sodaTweetsDF dataframe for processing
sodaTweetsDF = data_df_1

___________

<a id="exploredata"> </a>
## Step 3: Exploratory Data Analysis

In [None]:
# Return top 2 rows of Spark DataFrame
sodaTweetsDF.limit(2).toPandas()

In [None]:
# Print the schema of the loaded data
sodaTweetsDF.printSchema()

In [None]:
## Drop unneeded columns
sodaTweetsDF = sodaTweetsDF.drop('_ID','_REV')

Extract day from the CREATED_AT field. This is useful to plot tweet trends over time.

In [None]:
import datetime
from datetime import date
from dateutil import parser

def getDay(date):
    print 'input date: ', date
    day = parser.parse(str(date))
    day = day.date()
    return day

# Add a field for the day the tweet was created (ignoring hour/minute/second)
from pyspark.sql.functions import udf
from pyspark.sql.types import DateType

udfGetDay = udf(getDay, DateType())

sodaTweetsDF = sodaTweetsDF.withColumn('DAY',udfGetDay('CREATED_AT'))

# Verify added field is as expected
sodaTweetsDF.select("DAY").limit(5).toPandas()

____________

<a id="takesample"> </a>
## Step 4: Take a Sample
For purposes of this tutorial, we will work with a small sample of the data. In practice, you want to use large data sets for our analysis to capture as many insights as we can. However, to illustrate the approach, we can work with a small dataset.

Furthermore, we want to restrict the number of API calls to the free plan of [Watson Developer Cloud](https://www.ibm.com/watson/developercloud/) services so users can run through the notebook successfully.

In [None]:
## Take a sample of the data
## Limit to 1000 records as Watson NLU allows 1000 free calls per day
import random

num_records = sodaTweetsDF.count()
#sample_num_records = 950
sample_num_records = 50
fraction = float(sample_num_records)/float(num_records)

seed = random.randint(1, 100)
print 'Number of records: ', num_records, ' Sample size: ', sample_num_records, ' Fraction: ', fraction, ' Seed: ', seed
sodaTweetsSampleDF = sodaTweetsDF.sample(False, fraction, seed)

print 'Number of records to send to NLU:', sodaTweetsSampleDF.count()

In [None]:
## Take a sample of the data
## Limit to 1000 records as Watson NLU allows 1000 free calls per day
## Use sampleByKey to get equal distribution of samples per day

num_records = sodaTweetsDF.count()
sample_num_records = 950
fraction = float(sample_num_records)/float(num_records)

fractionList = sodaTweetsDF.rdd.map(lambda x: x['DAY']).distinct().map(lambda x: (x,fraction)).collectAsMap()

keybyday = sodaTweetsDF.rdd.keyBy(lambda x: x['DAY'])

# Returns RDD with length of 2, first col is the key (day) and second col is the original row for the key
# Take only the actual data (column 1)
sodaTweetsDFrdd = keybyday.sampleByKey(False,fractionList).map(lambda x: x[1])
sodaTweetsSampleDF = spark.createDataFrame(sodaTweetsDFrdd,sodaTweetsDF.schema)

print 'Number of records to send to NLU:', sodaTweetsSampleDF.count()

In [None]:
tmp = sodaTweetsSampleDF.groupBy('DAY')\
                              .agg(F.count('ID')\
                              .alias('NUM_TWEETS_PER_DAY'))
tmp.show()

____________

<a id="getcredentials"> </a>
## Step 5: Read Credentials for NLU, Personality Insights, and Twitter
Upload a json file (for example, sample_creds.json) which has the credentials for NLU, Personality Insights and Twitter to your Object Storage instance.


* Click on the Find and Add Data icon 
* Click **Files** tab
* Either drop your file in the box or click **browse** to select the file with credentials information from your local disk
* After selecting the file, press **Open** to upload the file to Object Storage.
Here is the format for the credentials file:
```
{
	"nlu_username": "YOUR NLU username",
	"nlu_password": "YOUR NLU password",
	"nlu_version": "NLU version",
	"twitter_consumer_key": "YOUR Twitter App consumer key",
	"twitter_consumer_secret": "YOUR Twitter App consumer secret",
	"twitter_access_token": "YOUR Twitter App access token",
	"twitter_access_token_secret": "YOUR Twitter App access token secret",
	"pi_username": "YOUR Personality Insights username",
	"pi_password": "YOUR Personality Insights password",
	"pi_version": "Personality Insights version"
}
```
* Once uploaded, you should see the sample_creds.json file under Files tab under Find and Add Data column.

Next, read the sample_creds.json credentials file and set the credentials for NLU, Personality Insights, and Twitter.
* Create a new cell in your notebook.
* If not open, click the **Find and Add Data** icon.
* Click the **Files** tab.
* Find the ```sample_creds.json``` file and click **Insert to code** under the file name 
* Choose **Insert Credentials** ==> this inserts code into your notebook for setting the credentials to read that file from Object Storage.

In [None]:
# The code was removed by DSX for sharing.

In [None]:
from io import BytesIO  
import requests  
import json 

def get_data(credentials):  
    """This functions returns a StringIO object containing
    the file content from Bluemix Object Storage V3."""

    url1 = ''.join(['https://identity.open.softlayer.com', '/v3/auth/tokens'])
    data = {'auth': {'identity': {'methods': ['password'],
            'password': {'user': {'name': credentials['username'],'domain': {'id': credentials['domain_id']},
            'password': credentials['password']}}}}}
    headers1 = {'Content-Type': 'application/json'}
    resp1 = requests.post(url=url1, data=json.dumps(data), headers=headers1)
    resp1_body = resp1.json()
    for e1 in resp1_body['token']['catalog']:
        if(e1['type']=='object-store'):
            for e2 in e1['endpoints']:
                        if(e2['interface']=='public'and e2['region']=='dallas'):
                            url2 = ''.join([e2['url'],'/', credentials['container'], '/', credentials['filename']])
    s_subject_token = resp1.headers['x-subject-token']
    headers2 = {'X-Auth-Token': s_subject_token, 'accept': 'application/json'}
    resp2 = requests.get(url=url2, headers=headers2)
    return json.loads(resp2.content)


In [None]:
# Note that we need to reference the credentials object returned by Insert to code (in this example, it is credentials_2)
credentials_json = get_data(credentials_4)
print 'Credentials for NLU'
print 'NLU username: ', credentials_json['nlu_username']
print 'NLU password: ', credentials_json['nlu_password']
print 'NLU version: ', credentials_json['nlu_version']

print 'Credentials for Twitter '
print 'Twitter consumer key: ', credentials_json['twitter_consumer_key']
print 'Twitter consumer secret: ', credentials_json['twitter_consumer_secret']
print 'Twitter access token: ', credentials_json['twitter_access_token']
print 'Twitter access token secret: ', credentials_json['twitter_access_token_secret']

print 'Credentials for Personality Insights'
print 'Personality Insights username: ', credentials_json['pi_username']
print 'Personality Insights password: ', credentials_json['pi_password']
print 'Personality Insights version: ', credentials_json['pi_version']

__________

<a id="enrichnlu"> </a>
## Step 6: Enrich Data with  [Watson Natural Language Understanding (NLU)](https://www.ibm.com/watson/developercloud/natural-language-understanding.html) 

Watson NLU allows us to extract sentiment and keywords from text.  In our case, the text will be user tweets.

In [None]:
import watson_developer_cloud
import watson_developer_cloud.natural_language_understanding.features.v1 as features

## Define credentials for NLU service
nlu_username=credentials_json['nlu_username']
nlu_password=credentials_json['nlu_password']
nlu_version=credentials_json['nlu_version']
nlu = watson_developer_cloud.NaturalLanguageUnderstandingV1(version = nlu_version,
                                                            username = nlu_username,
                                                            password = nlu_password)

## Send text to NLU and extract Sentiment and Keywords
## Make sure text is utf-8 encoded
def enrichNLU(text):
    utf8text = text.encode("utf-8")
    try:
        result = nlu.analyze(text = utf8text, features = [features.Sentiment(),features.Keywords()])
        sentiment = result['sentiment']['document']['score']
        sentiment_label = result['sentiment']['document']['label']
        keywords = list(result['keywords'])
    except Exception:
        result = None
        sentiment = 0.0
        sentiment_label = None
        keywords = None
    return sentiment, sentiment_label, keywords

Since we are now working with a smaller data set, we'll convert our dataframe to a Pandas dataframe and enrich the tweets with the results of Watson NLU.

In [None]:
# Create a Pandas frame and augment with Sentiment analysis and Keywords using Watson NLU
sodaTweetsSamplePandasDF = sodaTweetsSampleDF.toPandas()

**This next cell will send the records to the Watson API, so be aware of the number of calls that are being made.  If you are on the free plan which is limited to 1000 API calls per day, then you will only be able to run this cell once.  After that the server will respond with a notice indicating you have reached the maximum number of API calls for the day.**  Also, this could take 60+ seconds so some patience is encouraged. 

In [None]:
## This calls the enrichNLU function which accesses the Watson NLU API
sodaTweetsSamplePandasDF['SENTIMENT'],sodaTweetsSamplePandasDF['SENTIMENT_LABEL'],\
sodaTweetsSamplePandasDF['KEYWORDS'] = zip(*sodaTweetsSamplePandasDF['TEXT_CLEAN'].map(enrichNLU))

In [None]:
# view top two records to verify Sentiment and Keywords enrichments are applied as expected
print 'Rows x Columns for sodaTweetsSampleDF:', sodaTweetsSamplePandasDF.shape
sodaTweetsSamplePandasDF[:10] 

In [None]:
from pyspark.sql.types import StringType
from pyspark.sql.types import FloatType
from pyspark.sql.types import ArrayType

schema = sodaTweetsSampleDF.schema
schema1 = StructType([
    StructField("relevance", FloatType(), True),
    StructField("text", StringType(), True),                            
])

keywordschema = StructType.fromJson(schema1.jsonValue())
added_fields = [StructField("SENTIMENT", FloatType(), True),StructField("SENTIMENT_LABEL",StringType(),True),\
                StructField("KEYWORDS",ArrayType(keywordschema),True)] 

newfields = StructType(schema.fields + added_fields)

In [None]:
## Push the enriched data back out to Spark to continue working within the Spark API
#enrichedSodaTweetsDF = spark.createDataFrame(sodaTweetsSamplePandasDF)
enrichedSodaTweetsDF = spark.createDataFrame(sodaTweetsSamplePandasDF,newfields)

# Print the schema of the Spark DataFrame to verify we have all the expected fields
enrichedSodaTweetsDF.printSchema()

____________

<a id="sentiment"> </a>
## Step 7: Visualizing Sentiment and Keywords

Tweet trends, sentiment, and keywords give a brand manager a view into consumers' perceptions regarding the brand. These insights can be very useful to the brand manager. 

In this step we will visualize some of the data we received from NLU.  Let's look at tweet trends and sentiment for the **Coke** and **Pepsi** brands.  In addition, let's show the main keywords tweeted for both brands.

In [None]:
# Separate tweets by brand
# To do so, check if the tweet includes which brand and add a column to represent that
brandList =['coke','pepsi']
def addBrand(text):
    for brand in brandList:
        if brand in text.lower():
            return brand
    return None

In [None]:
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType

udfAddBrand = udf(addBrand, StringType())

# For purposes of separating tweets by brand, we need to run it against original TEXT and not the TEXT_CLEAN
# This is because in several cases, the brand is referenced with a handle
enrichedBrandsDF = enrichedSodaTweetsDF.withColumn('BRAND',udfAddBrand('TEXT'))

# view top records to verify brand column extracted as expected
enrichedBrandsDF.limit(2).toPandas()


To visualize the difference in sentiment between brands, we'll create a separate dataframe for each brand.


In [None]:
from pyspark.sql.functions import col

## Create one DF for Coke, one for Pepsi
cokeTweetsDF = enrichedBrandsDF.where(col('BRAND') == 'coke')
pepsiTweetsDF = enrichedBrandsDF.where(col('BRAND') == 'pepsi')

In [None]:
cokeTweetsDF.head(2)

Now, find the number of tweets with different sentiment labels (Positive, Negative, Neutral) for both Coke and Pepsi.

In [None]:
from pyspark.sql import functions as F
from pyspark.sql.functions import col

## First for Coke
cokeTweetsDF = cokeTweetsDF.where(col('SENTIMENT_LABEL').isNotNull())
cokeSentimentDF = cokeTweetsDF.groupBy('SENTIMENT_LABEL')\
                              .agg(F.count('ID')\
                              .alias('NUM_TWEETS'))

## Take a look
cokeSentimentDF.show()

In [None]:
## Now for Pepsi
pepsiTweetsDF = pepsiTweetsDF.where(col('SENTIMENT_LABEL').isNotNull())
pepsiSentimentDF = pepsiTweetsDF.groupBy('SENTIMENT_LABEL')\
                                .agg(F.count('ID')\
                                .alias('NUM_TWEETS'))

pepsiSentimentDF.show()

#### Compare Sentiment Between Coke and Pepsi

In [None]:
## Pull aggregated back to driver and convert to Pandas dataframe for plotting
cokeSentimentDF = cokeSentimentDF.toPandas()
pepsiSentimentDF = pepsiSentimentDF.toPandas()

import matplotlib.pyplot as plt

# Plot sentiment
%matplotlib inline
plot1_labels = cokeSentimentDF['SENTIMENT_LABEL']
plot1_values = cokeSentimentDF['NUM_TWEETS']
plot1_colors = ['green', 'gray', 'red']

plot2_labels = pepsiSentimentDF['SENTIMENT_LABEL']
plot2_values = pepsiSentimentDF['NUM_TWEETS']
plot2_colors = ['green', 'gray', 'red']

fig, axes = plt.subplots(nrows = 1, ncols = 2, figsize = (23, 10))
axes[0].pie(plot1_values,  labels = plot1_labels, colors = plot1_colors, autopct = '%1.1f%%')
axes[0].set_title('Percentage of Sentiment Values in all Coke Tweets')
axes[0].set_aspect('equal')
axes[0].legend(loc = "upper right", labels=plot1_labels)

axes[1].pie(plot2_values,  labels = plot2_labels, colors = plot2_colors, autopct = '%1.1f%%')
axes[1].set_title('Percentage of Sentiment Values in all Pepsi Tweets')
axes[1].set_aspect('equal')
axes[1].legend(loc = "upper right", labels = plot2_labels)
fig.subplots_adjust(hspace = 1)
plt.show()

#### View Sentiment for Coke Over Time

In [None]:
## Get aggregated sentiment for each day
cokeOverTimeDF = cokeTweetsDF\
                    .groupBy('DAY', 'SENTIMENT_LABEL')\
                    .agg(F.count('TEXT_CLEAN').alias('NUM_TWEETS'))\
                    .orderBy('DAY', ascending = True)

## Get total tweets each day
cokeTweetsPerDayDF = cokeOverTimeDF.groupBy('DAY')\
                        .agg(F.sum('NUM_TWEETS').alias('NUM_TWEETS'))\
                        .orderBy('DAY', ascending = True)
        
## Convert back to Pandas
cokeOverTimeDF = cokeOverTimeDF.toPandas()
cokeTweetsPerDayDF = cokeTweetsPerDayDF.toPandas()

## Identify rows with positive sentiment for each day
positiveIndex = cokeOverTimeDF['SENTIMENT_LABEL'] == 'positive'
positiveTweetsDF = cokeOverTimeDF[positiveIndex]

## Identify rows with negative sentiment for each day
negativeIndex = cokeOverTimeDF['SENTIMENT_LABEL'] == 'negative'
negativeTweetsDF = cokeOverTimeDF[negativeIndex]

## Check results
positiveTweetsDF[:2]

In [None]:
## Define values for matplotlib
x = cokeTweetsPerDayDF['DAY']
y = cokeTweetsPerDayDF['NUM_TWEETS']
py = positiveTweetsDF['NUM_TWEETS']
ny = negativeTweetsDF['NUM_TWEETS']

fig, axes = plt.subplots(nrows=3, ncols=1, figsize=(20, 10))
axes[0].plot(range(len(y)), y, linewidth=2)
axes[0].set_xticks(x.index.tolist())
axes[0].set_xticklabels([date.strftime("%Y-%m-%d") for date in x])
axes[0].margins = 0
axes[0].set_xlabel('Date/Time')
axes[0].set_ylabel('Num of Tweets')
axes[0].set_title('Number of Tweets Over Time - ALL TWEETS')
axes[0].set_xlim(0, len(y))
axes[0].legend(loc="upper right", labels=['All Tweets'])

axes[1].plot(range(len(y)), y, linewidth=2, color='blue')
axes[1].plot(range(len(py)), py, linewidth=2, color='green')
axes[1].plot(range(len(ny)), ny, linewidth=2, color='red')
axes[1].set_xticks(x.index.tolist())
axes[1].set_xticklabels([date for date in x])
axes[1].margins = 0
axes[1].set_xlabel('Date/Time')
axes[1].set_ylabel('Num of Tweets')
axes[1].set_title('Number of Tweets Over Time - All, Positive and Negative')
axes[1].set_xlim(0, len(y))
axes[1].legend(loc="upper right", labels=['All Tweets', 'Positive', 'Negative', 'Undefined Sentiment'])

axes[2].plot(range(len(py)), py, linewidth=2, color='green')
axes[2].plot(range(len(ny)), ny, linewidth=2, color='red')
axes[2].set_xticks(x.index.tolist())
axes[2].set_xticklabels([date for date in x])
axes[2].margins = 0
axes[2].set_xlabel('Date')
axes[2].set_ylabel('Num of Tweets')
axes[2].set_title('Number of Tweets Over Time - Positive and Negative')
axes[2].set_xlim(0, len(y))
axes[2].legend(loc="upper right", labels=['Positive', 'Negative'])

## Rotate x-axes for legibility.
for ax in fig.axes:
    plt.sca(ax)
    plt.xticks(rotation = 45)

fig.subplots_adjust(hspace=1)
plt.show()

## Top Keywords
Extract and disaply top keywords expressed in the tweets referring to Coke and Pepsi.

In [None]:
from pyspark.sql.functions import explode

# Explode keywords
cokeKeywordsDF = cokeTweetsDF.select(explode('KEYWORDS').alias('TOPKEYWORDS'))
pepsiKeywordsDF = pepsiTweetsDF.select(explode('KEYWORDS').alias('TOPKEYWORDS'))

cokeTopKeywordsDF = cokeKeywordsDF.select('TOPKEYWORDS').rdd.map(lambda row: row[0]).toDF()
pepsiTopKeywordsDF = pepsiKeywordsDF.select('TOPKEYWORDS').rdd.map(lambda row: row[0]).toDF()


In [None]:
# UDF to filter profanity words
profanityList = ['fuck', 'cunt', 'eh', 'twat', 'shit', 'beep', 'shouty twat']
def filter_profanity(word):
    if word in profanityList:
        return None
    if "http" in word:
        return None
    return word

# UDF to return lower case of word
def toLowerCase(word):
    return word.lower()

In [None]:
# Process extracted keywords to filter profanity and change to lower case
udfLowerCase = udf(toLowerCase, StringType())
cokeTopKeywordsDF = cokeTopKeywordsDF.withColumn('TOPKEYWORDS',udfLowerCase('text'))
pepsiTopKeywordsDF = pepsiTopKeywordsDF.withColumn('TOPKEYWORDS',udfLowerCase('text'))

udfFilterProfanity = udf(filter_profanity, StringType())
cokeTopKeywordsDF = cokeTopKeywordsDF.withColumn('TOPKEYWORDS',udfFilterProfanity('TOPKEYWORDS'))
pepsiTopKeywordsDF = pepsiTopKeywordsDF.withColumn('TOPKEYWORDS',udfFilterProfanity('TOPKEYWORDS'))


In [None]:
# Group by TOPKEYWORDS and computer average relevance per keyword and also number of tweets for each keyword
cokeKwdsNumDF = cokeTopKeywordsDF.groupBy('TOPKEYWORDS').agg(F.count('TOPKEYWORDS').alias('KWDSNUMTWEETS'))
pepsiKwdsNumDF = pepsiTopKeywordsDF.groupBy('TOPKEYWORDS').agg(F.count('TOPKEYWORDS').alias('KWDSNUMTWEETS'))

cokeKwdsRelDF = cokeTopKeywordsDF.groupBy('TOPKEYWORDS').agg(F.avg('relevance').alias('KWDSAVGRELEVANCE'))
pepsiKwdsRelDF = pepsiTopKeywordsDF.groupBy('TOPKEYWORDS').agg(F.avg('relevance').alias('KWDSAVGRELEVANCE'))

In [None]:
# join dataframes into one
cokeTweetsKeywordsDF = cokeKwdsNumDF.join(cokeKwdsRelDF,'TOPKEYWORDS','outer')
pepsiTweetsKeywordsDF = pepsiKwdsNumDF.join(pepsiKwdsRelDF,'TOPKEYWORDS','outer')

# Define keyword score as product of number of tweets expressing that keyword and average relevance
cokeTweetsKeywordsDF = cokeTweetsKeywordsDF.withColumn('KEYWORD_SCORE',cokeTweetsKeywordsDF.KWDSNUMTWEETS * cokeTweetsKeywordsDF.KWDSAVGRELEVANCE)
pepsiTweetsKeywordsDF = pepsiTweetsKeywordsDF.withColumn('KEYWORD_SCORE',pepsiTweetsKeywordsDF.KWDSNUMTWEETS * pepsiTweetsKeywordsDF.KWDSAVGRELEVANCE)

# Sort dataframe in descending order of KEYWORD_SCORE
cokeTweetsKeywordsDF = cokeTweetsKeywordsDF.orderBy('KEYWORD_SCORE',ascending=False)
pepsiTweetsKeywordsDF = pepsiTweetsKeywordsDF.orderBy('KEYWORD_SCORE',ascending=False)

# Remove None keywords
cokeTweetsKeywordsDF = cokeTweetsKeywordsDF.where(col('TOPKEYWORDS').isNotNull())
pepsiTweetsKeywordsDF = pepsiTweetsKeywordsDF.where(col('TOPKEYWORDS').isNotNull())

# Remove coke from coke list and pepsi from pepsi list
# Note we want to keey pepsi in coke list and coke in pepsi like
cokeTweetsKeywordsDF = cokeTweetsKeywordsDF.where(col('TOPKEYWORDS') != "coke")
pepsiTweetsKeywordsDF = pepsiTweetsKeywordsDF.where(col('TOPKEYWORDS') != "pepsi")

In [None]:
print "Coke Top Keywords"
cokeTweetsKeywordsDF.show()
#vv = cokeTopKeywordsDF.groupBy('text').agg(F.count('text') > 1)
#vv = cokeTopKeywordsDF.where(col('text') == "kiwi")


In [None]:
print "Pepsi Top Keywords"
pepsiTweetsKeywordsDF.show()

In [None]:
cokeTweetsKeywordsPandas = cokeTweetsKeywordsDF.toPandas()
pepsiTweetsKeywordsPandas = pepsiTweetsKeywordsDF.toPandas()

In [None]:
from wordcloud import WordCloud

# Process Pandas DataFrame in the right format to leverage wordcloud.py for plotting
# See documentation: https://github.com/amueller/word_cloud/blob/master/wordcloud/wordcloud.py 
def prepForWordCloud(pandasDF,n):
    kwdList = pandasDF['TOPKEYWORDS']
    sizeList = pandasDF['KEYWORD_SCORE']
    kwdSize = {}
    for i in range(n):
        kwd=kwdList[i]
        size=sizeList[i]
        kwdSize[kwd] = size
    return kwdSize

In [None]:
%matplotlib inline
maxWords = len(cokeTweetsKeywordsPandas)
nWords = 20

#Generating wordcloud. Relative scaling value is to adjust the importance of a frequency word.
#See documentation: https://github.com/amueller/word_cloud/blob/master/wordcloud/wordcloud.py
cokeKwdFreq = prepForWordCloud(cokeTweetsKeywordsPandas,nWords)
cokeWordCloud = WordCloud(max_words=maxWords,relative_scaling=0,normalize_plurals=False).generate_from_frequencies(cokeKwdFreq)

pepsiKwdFreq = prepForWordCloud(pepsiTweetsKeywordsPandas,nWords)
pepsiWordCloud = WordCloud(max_words=nWords,relative_scaling=0,normalize_plurals=False).generate_from_frequencies(pepsiKwdFreq)

fig, ax = plt.subplots(nrows = 1, ncols = 2, figsize = (23, 10))

# Set titles for images
ax[0].set_title('Top Keywords for Coke')
ax[1].set_title('Top Keywords for Pepsi')

# Plot word clouds
ax[0].imshow(cokeWordCloud)
ax[1].imshow(pepsiWordCloud)

# turn off axis and ticks
plt.axis("off")
ax[0].tick_params(axis='both',which='both',bottom='off',top='off',left='off',right='off',
                 labelbottom='off',labeltop='off',labelleft='off',labelright='off') 

ax[1].tick_params(axis='both',which='both',bottom='off',top='off',left='off',right='off',
                 labelbottom='off',labeltop='off',labelleft='off',labelright='off') 

plt.show()

_____________

<a id="enrichpi"> </a>
## Step 8: Enrich Data with [Watson Personality Insights](https://www.ibm.com/watson/developercloud/personality-insights.html)

In this tutorial, we will create personality profiles for a sample of 100 users as that is the limit of what we can run with free plan for Watson [Personality Insights](https://www.ibm.com/watson/developercloud/personality-insights.html) (PI). 

In practice, you can run personality profiles for all users, or you can choose to run personality profiles for only a select subset of users; maybe ones with most negative sentiment tweets or maybe ones with largest number of followers or posts.

In [None]:
## Aggregate users by sentiment
cokeUserSentimentDF = cokeTweetsDF\
                        .groupBy('USER_SCREEN_NAME', 'SENTIMENT_LABEL','SENTIMENT',\
                                 'USER_FAVOURITES_COUNT','USER_STATUSES_COUNT',
                                 'USER_FOLLOWERS_COUNT','USER_FRIENDS_COUNT')\
                        .count()\
                        .orderBy('SENTIMENT', ascending = True)

## Get negative and positive tweeting users
negativeTweetersDF = cokeUserSentimentDF.where(col('SENTIMENT_LABEL') == "negative")
positiveTweetersDF = cokeUserSentimentDF.where(col('SENTIMENT_LABEL') == "positive")  

# Take a random sample of 100 users
num_users = cokeUserSentimentDF.count()
sample_num_users = 95
usrfraction = float(sample_num_users)/float(num_users)
usrseed = random.randint(1, 100)

## Start off by finding the number of unique users tweeting about Coke
print 'Number of unique users tweeting about Coke: ', len(cokeTweetsDF.select('USER_SCREEN_NAME').distinct().collect())
print 'Number of negative tweeting folks: ', negativeTweetersDF.count()
print 'Number of positive tweeting folks: ', positiveTweetersDF.count()
print 'Sample size: ', sample_num_users, ' Fraction: ', usrfraction, ' Seed: ', usrseed

sodaUserSampleDF = cokeUserSentimentDF.sample(False, usrfraction, usrseed)

print 'Records in sodaUserSampleDF: ', sodaUserSampleDF.count()

In this step, we collect enough tweets for each unique user and run those tweets through Personality Insights to extract the Big 5 personality traits, also knows as OCEAN (Openness, Conscientiousness, Extraversion, Agreeableness, and Neuroticism).

In [None]:
# The code was removed by DSX for sharing.

Now we'll define several functions to programmatically paint a portrait of each user and their personality.

1. `getTweets()` - to collect tweets from a given user ID
2. `getPersonality()` - to call Personality Insights on the users' tweets
3. `extractOCEANtraits()` - to gather the OCEAN scores for each users PI data
4. `getPItraits()` - to put it all together and return a list of traits for each user

In [None]:
## Collect tweets for a given user
def getTweets(username):
    twitter_id = username    
    try:
        tweet_collection = api.user_timeline(screen_name = twitter_id, count = 100, include_rts = True)
        i = 0
        tweets = []
        for status in tweet_collection:
            i = i+1
            tweets.append(status.text)
    except Exception:
        tweets = None
    return tweets

## Call Personality Insights on the tweets for the user
def getPersonality(tweets):
    # get tweets by user
    if tweets == None:
        profile = None
    else:
        tweets_content = ' '.join(tweets)
        # UTF-8 encoding
        twt = tweets_content.encode('utf-8')
        # call PI to get personality profile
        try:
            profile = personality_insights.profile(twt, content_type = 'text/plain', content_language = None,
                                           accept ='application/json', accept_language = None, raw_scores = False,
                                           consumption_preferences = False, csv_headers = False)
        except Exception:
            profile = None
    
    return profile


## Extract OCEAN percentiles from PI data
def extractOCEANtraits(profile):
    if profile == None:
        openness = None
        conscientiousness = None
        extraversion = None
        agreeableness = None
        neuroticism = None
    else:
        personality = profile['personality']
        openness = personality[0]['percentile']
        conscientiousness = personality[1]['percentile']
        extraversion = personality[2]['percentile']
        agreeableness = personality[3]['percentile']
        neuroticism = personality[4]['percentile']
    
    return openness, conscientiousness, extraversion, agreeableness, neuroticism

## Combine function calls for a user
def getPItraits(user, verbose = F):
    # get tweets by user
    if verbose == F:
        try:
            tweets = getTweets(user)
            # run PI profile on extracted tweets
            profile = getPersonality(tweets)
            # extract OCEAN traits
            openness, conscientiousness, extraversion, agreeableness, neuroticism = extractOCEANtraits(profile)
        except Exception:
            return None
        return openness, conscientiousness, extraversion, agreeableness, neuroticism
    else:
        print 'Getting tweets for user: ', user
        try:
            tweets = getTweets(user)
            # run PI profile on extracted tweets
            profile = getPersonality(tweets)
            # extract OCEAN traits
            openness, conscientiousness, extraversion, agreeableness, neuroticism = extractOCEANtraits(profile)
        except Exception:
            return None
        return openness, conscientiousness, extraversion, agreeableness, neuroticism

Now we'll extract Personality Insights traits for the users sampled in step 7, then push the data back to Spark for machine learning.  **This cell will also send data to the API, so be mindful of the number of calls you are making and be patient for the results.**

In [None]:
## Convert to Pandas to use the Watson PI API
sodaUserSampleDF = sodaUserSampleDF.toPandas()
sodaUserSampleDF['OPENNESS'], sodaUserSampleDF['CONSCIENTIOUSNESS'],\
sodaUserSampleDF['EXTRAVERSION'], sodaUserSampleDF['AGREEABLENESS'],\
sodaUserSampleDF['NEUROTICISM'] = zip(*sodaUserSampleDF['USER_SCREEN_NAME'].map(getPItraits))

# Conver back to Spark Dataframe from Pandas dataframe
userPersonalityDF = spark.createDataFrame(sodaUserSampleDF)

## Drop rows without any PI enrichment
userPersonalityDF = userPersonalityDF.na.drop()

## Check row count and schema
print 'Number of records in userPersonalityInsightsDF: ', userPersonalityDF.count()
userPersonalityDF.printSchema()

userPersonalityDF.limit(2).toPandas()

________________

<a id="sparkml"> </a>
## Step 9: Spark Machine Learning for User Segmentation 
[Spark MLlib](https://spark.apache.org/docs/latest/ml-guide.html) includes a rich set of machine learning algorithms that are very powerful in extracting insights from data. It is quite typical to need to process the data into the right format before being able to apply these machine learning algorithms. In this step, we apply several steps for processing the data to get it ready for running [Kmeans](https://spark.apache.org/docs/latest/mllib-clustering.html#k-means) clustering algorithm including normalizing **USER_FOLLOWERS_COUNT** and **USER_STATUSES_COUNT** fields as well as extracting the relevant feature set into a Vector to be used for clustering.

We run Kmeans clustering using two different feature sets.
* **FeatureSet 1** (no Personality Traits): (SENTIMENT, USER_FOLLOWERS_COUNT, USER_STATUSES_COUNT)
* **FeatureSet 2** (with Personality Traits): (SENTIMENT, USER_FOLLOWERS_COUNT, USER_STATUSES_COUNT, OPENNESS, CONSCIENTIOUSNESS, EXTRAVERSION, AGREEABLENESS, NEUROTICISM)

First, we'll convert the follower and statuses counts to a vector as per the Spark documentation.  Then we can run [MinMaxScaler](https://spark.apache.org/docs/2.1.0/ml-features.html#minmaxscaler) and normalize the counts to a range between 0 and 1.  These are necessary preprocessing steps for the clustering algorithm to work properly.

In [None]:
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.feature import MinMaxScaler
from pyspark.ml.linalg import Vectors

## Convert back to Spark Dataframe from Pandas dataframe
userPersonalityDF = spark.createDataFrame(sodaUserSampleDF)

## Drop rows without any PI enrichment
userPersonalityDF = userPersonalityDF.na.drop()

## Define columns to be converted to vectors
followersVector = VectorAssembler(
  inputCols=["USER_FOLLOWERS_COUNT"], outputCol="USER_FOLLOWERS_COUNT_VECTOR")

statusesVector = VectorAssembler(
  inputCols=["USER_STATUSES_COUNT"], outputCol="USER_STATUSES_COUNT_VECTOR")

## Define our input and output columns for MinMaxScaler
followersScaler = MinMaxScaler(inputCol="USER_FOLLOWERS_COUNT_VECTOR", outputCol="USER_FOLLOWERS_COUNT_SCALED")
statusesScaler = MinMaxScaler(inputCol="USER_STATUSES_COUNT_VECTOR", outputCol="USER_STATUSES_COUNT_SCALED")

## Invoke the VectorAssembler transformations and select desired columns
userPersonalityDF = followersVector.transform(userPersonalityDF)
userPersonalityDF = statusesVector.transform(userPersonalityDF)

## Fit MinMaxScalerModel on our vectors and rescale
followersScalerModel = followersScaler.fit(userPersonalityDF)
statusesScalerModel = statusesScaler.fit(userPersonalityDF)

userPersonalityDF = followersScalerModel.transform(userPersonalityDF)
userPersonalityDF = statusesScalerModel.transform(userPersonalityDF)

## User-defined function to convert from vector back to float
from pyspark.sql.types import DoubleType
from pyspark.sql.functions import lit, udf

def ith_(v, i):
    try:
        return float(v[i])
    except ValueError:
        return None

udfVecToFloat = udf(ith_, DoubleType())

## Add column for floating point scaled followers
userPersonalityDF = userPersonalityDF.withColumn("SCALED_FOLLOWERS", udfVecToFloat('USER_FOLLOWERS_COUNT_SCALED', lit(0)))\
                                     .withColumn("SCALED_STATUSES", udfVecToFloat('USER_STATUSES_COUNT_SCALED', lit(0)))\
                                     .select("USER_SCREEN_NAME", "SENTIMENT_LABEL", "SENTIMENT", "SCALED_FOLLOWERS", \
                                            "SCALED_STATUSES", "OPENNESS", "CONSCIENTIOUSNESS", "EXTRAVERSION", \
                                            "AGREEABLENESS", "NEUROTICISM")
        
## Lastly, we'll have to add columns for features both with and without Personality Insights
assemblerWithPI = VectorAssembler(
    inputCols = ['SENTIMENT','SCALED_FOLLOWERS','SCALED_STATUSES','OPENNESS','CONSCIENTIOUSNESS','EXTRAVERSION',\
               'AGREEABLENESS','NEUROTICISM'],
    outputCol = "PI_ENRICHED_FEATURES")

assemblerWithoutPI = VectorAssembler(
    inputCols = ['SENTIMENT', 'SCALED_FOLLOWERS', 'SCALED_STATUSES'],
    outputCol = "BASE_FEATURES")


userPersonalityDF = assemblerWithPI.transform(userPersonalityDF)
userPersonalityDF = assemblerWithoutPI.transform(userPersonalityDF)

## View transformed DF
userPersonalityDF.limit(5).toPandas()

#### Kmeans Clustering 

Now that we have prepared the data, we can run Kmeans to assign cluster labels to each user.

In [None]:
from pyspark.ml.clustering import KMeans

## Define model parameters and set the seed
baseKMeans = KMeans(featuresCol = "BASE_FEATURES", predictionCol = "BASE_PREDICTIONS").setK(5).setSeed(206)
piKMeans = KMeans(featuresCol = "PI_ENRICHED_FEATURES", predictionCol = "PI_PREDICTIONS").setK(5).setSeed(206)

## Fit model on our feature vectors
baseClustersFit = baseKMeans.fit(userPersonalityDF.select("BASE_FEATURES"))
enrichedClustersFit = piKMeans.fit(userPersonalityDF.select("PI_ENRICHED_FEATURES"))

## Get the cluster IDs for each user
userPersonalityDF = baseClustersFit.transform(userPersonalityDF)
userPersonalityDF = enrichedClustersFit.transform(userPersonalityDF)

## Check our work
userPersonalityDF.limit(5).toPandas()

____________

<a id="clusters"> </a>
## Step 10: Visualization of User Clusters

Ok great - we have successfully clustered our users both with and without enrichment data from Watson Personality Insights.  Now we can visualize users through their cluster identity.
 
In this step, we visualize the different user clusters obtained with and without using Personality traits. This visualization illustrates the differences between two clustering approachs and shows how Watson [Personality Insights](https://www.ibm.com/watson/developercloud/personality-insights.html) can help provide finer segmentation of users.

Given this segmentation, brand managers and marketing teams can craft different messaging to target the various user segments to improve brand adoption in the marketplace.

In [None]:
## Aggregate users in each cluster and convert to Pandas for plotting
baseClusterAggDF = userPersonalityDF.groupBy('BASE_PREDICTIONS').agg(F.count('USER_SCREEN_NAME').alias('NUM_USERS')).toPandas()
enrichedClusterAggDF = userPersonalityDF.groupBy('PI_PREDICTIONS').agg(F.count('USER_SCREEN_NAME').alias('NUM_USERS')).toPandas()

%matplotlib inline
# Code courtesy of 
#pandas_softdrink_tweets_grouped_by_sentiment=df_softdrink_tweets_grouped_by_sentiment.toPandas()
#pandas_softdrink_tweets_grouped_by_sentiment.count()
plot1_labels = baseClusterAggDF['BASE_PREDICTIONS']
plot1_values = baseClusterAggDF['NUM_USERS']
plot1_colors = ['green', 'gray', 'red', 'blue', 'yellow']
plot2_labels = enrichedClusterAggDF['PI_PREDICTIONS']
plot2_values = enrichedClusterAggDF['NUM_USERS']
plot2_colors = ['green', 'gray', 'red', 'blue', 'yellow']

fig, axes = plt.subplots(nrows=1, ncols=2, figsize=(23, 10))
axes[0].pie(plot1_values,  labels=plot1_labels, colors=plot1_colors, autopct='%1.1f%%')
axes[0].set_title('Cluster distrubtion without Personality traits')
axes[0].set_aspect('equal')
axes[0].legend(loc="upper right", labels=plot1_labels)
axes[1].pie(plot2_values,  labels=plot2_labels, colors=plot2_colors, autopct='%1.1f%%')
axes[1].set_title('Cluster distrubtion with Personality traits')
axes[1].set_aspect('equal')
axes[1].legend(loc="upper right", labels=plot2_labels)
fig.subplots_adjust(hspace=1)
plt.show()

#### Scatter Plots of Clusters using Principal Components Analysis (PCA)

Typically one would visualize clusters by some plotting some aggregate measure of the data, then filling in the data points with different colors based on cluster ID.  In the absence of aggregate metrics, however, we can use Principal Components Analysis to compress our data set down to two dimensions.  Once we've performed PCA we can then plot the values of the two components on the X and Y axis to form a scatterplot.  

Let's try that now.

In [None]:
from pyspark.ml.feature import PCA

## Get the first two principal components for features with and without enrichment from Personality Insights
pcaBase = PCA(k = 2, inputCol = "BASE_FEATURES", outputCol = "pcaFeaturesBase")
pcaEnriched = PCA(k = 2, inputCol = "PI_ENRICHED_FEATURES", outputCol = "pcaFeaturesEnriched")

## Fit the model to our data
pcaBaseModel = pcaBase.fit(userPersonalityDF)
pcaEnrichedModel = pcaEnriched.fit(userPersonalityDF)

## Transform the data get our principal components
userPersonalityDF = pcaBaseModel.transform(userPersonalityDF)
userPersonalityDF = pcaEnrichedModel.transform(userPersonalityDF)

We have the principal components, but before we can plot them we have to convert them from a feature vector back to individual columns.  We'll accomplish this using a lambda function to build a RowRDD, then convert back to a DataFrame.

In [None]:
from pyspark.sql import Row

## Split the features vector into columns with the rdd-based API, then convert to DF and reorder columns
pcaDF = userPersonalityDF.select("pcaFeaturesBase", "pcaFeaturesEnriched", "USER_SCREEN_NAME", "BASE_PREDICTIONS", "PI_PREDICTIONS")\
                         .rdd.map(lambda x: Row(**{'PC1_BASE': float(x[0][0]), 
                                                   'PC2_BASE': float(x[0][1]),
                                                   'PC1_ENRICHED': float(x[1][0]),
                                                   'PC2_ENRICHED': float(x[1][1]),
                                                   'USER_SCREEN_NAME': str(x[2]),
                                                   'BASE_PREDICTIONS': int(x[3]),
                                                   'PI_PREDICTIONS': int(x[4])}))\
                         .toDF()\
                         .select("USER_SCREEN_NAME", "PC1_BASE", "PC2_BASE", "BASE_PREDICTIONS",\
                                 "PC1_ENRICHED", "PC2_ENRICHED", "PI_PREDICTIONS")
pcaDF.limit(5).toPandas()

Now that the dimensionality of our dataset has been reduced to 2, we can view the components on a typical scatter plot.  Let's see how the clusters look from this perspective.

In [None]:
import plotly 
from plotly.offline import download_plotlyjs, init_notebook_mode, plot, iplot
import plotly.graph_objs as go
plotly.offline.init_notebook_mode() 

pcaDF = pcaDF.toPandas()

## For Base Features
data = [go.Scatter(x = pcaDF.PC2_BASE, 
                   y = pcaDF.PC1_BASE, 
                   mode = 'markers',
                   name = 'BASE_PREDICTIONS',
                   marker = dict(color = pcaDF.BASE_PREDICTIONS, size = '12'),
                   text = pcaDF.BASE_PREDICTIONS
                                 )
       ]

plotly.offline.iplot(data)

What is interesting to note about the clustering without Personality Insights enrichment is that it appears to have a very clear stratification from -1 to 1.  These clusters appear to be somewhat intuitively grouped together.  What about when we run the same algorithm on the PI-enriched data?

In [None]:
## For Enriched Features
data = [go.Scatter(x = pcaDF.PC2_ENRICHED, 
                   y = pcaDF.PC1_ENRICHED, 
                   mode = 'markers',
                   name = 'Clusters with PI',
                   marker = dict(color = pcaDF.PI_PREDICTIONS, size = '12'),
                   text = pcaDF.PI_PREDICTIONS
                                 )
       ]

plotly.offline.iplot(data)

Not quite the same intuitive results.  Perhaps we should iterate over this and try a different clustering algorithm some time.

**At this point you may be wondering what the utility of all this work might be.**  Well, now that we've enriched the user data with Watson APIs and grouped them into clusters, we can track these clusters over time to see how they respond to various metrics.  This could be purchase history, click patterns, or the response to different ad campaigns.  As we build up this data set we'll be able to classify new users and what group they fall into, resulting in a stronger user segmentation model over time.

_______

## Conclusion

We've accomplished quite a lot in this notebook, so let's take a moment to review.

First we loaded our tweet data from dashDB.  Then we enriched it with several Watson APIs - Natural Language Understanding and Personality Insights.  Using Spark and its elegant API we shaped our data, explored it visually, and prepared it for machine learning.  Our approach was to discover user segmentation by assigning cluster IDs to each data point.  Finally, we plotted the results of our clustering with the aid of principal components analysis.  

While more work remains to be done - no one said data science was easy - we have seen how we can seamlessly move between Watson APIs, Spark, and Python using the Data Science Experience. 



_______


<div><br><img src="https://upload.wikimedia.org/wikipedia/commons/thumb/5/51/IBM_logo.svg/640px-IBM_logo.svg.png" width = 200 height = 200>
</div><br>