In [1]:
ls

[0m[01;34m__archive[0m/                     [01;32mREADME.md[0m*
[01;34m_docker[0m/                       Spark_overview.md
[01;34mimgs[0m/                          StreamingPlot.py
logs.txt                       tweet_feature_learning_SparkStreaming.ipynb
pyspark_installation_guide.md  TweetsListener.py


# Streaming Tweets Feature Learning & Visualization

### Xian Lai 

xlai4@fordham.edu   
Dec.2017

================================================================
![](imgs/tweet_feature_learning.gif)
![](imgs/logos.png)


#### As the diagram showing above, this project implements a pipeline that learns predictive features from streaming tweets and visualizes the result in real-time.


- **Receive streaming tweets on master machine**:    
    Running the TweetsListener.py script in the background, a tweets stream with 3 tracks-"NBA", "NFL" and "MBL" are pulled from Tweepy API. Inside this stream, each tweet has a topic about one of those 3 tracks and is seperated by delimiter "><". 


- **Analysis tweets on distributed machines**:    
    This stream is directed into Spark Streaming API through TCP connection and distributed onto cluster. Under the Spark Streaming API, the distrbuted stream is abstracted as a data type called DStream. A series of operations are then applied on this DStream in real time and transform it into other DStreams containing intermediate or final analysis results. 
    
    1. preprocess each tweet into a label and a list of clean words which contains only numbers and alphabets.
    2. count the frequencies of words in all tweets and take the top 5000 most frequent ones as features.
    3. encode the tweets in last half minute into a structured dataset using features mentioned above.
    4. calculate the conditional probability given label and the predictiveness of each feature word.
    
    
- **Visualize results on master machine**:   
    At last we select the tweets and features with higher predictiveness, collect their label, sum of predictiveness and 2 tsne features back onto the master machine and visualize them as a scatter plot. This visualization can be used as a informal validation of predictiveness defined above. If the scatter of different are well seperated, then the features selected by this predictiveness measure are valid.
    
    1. keep only 300 most predictive features and discard other non-predictive features.
    2. calculate the sum of predictiveness of each word in tweet.
    3. take 60 tweets with the highest sum of predictiveness under each label.
    4. apply TSNE learning on these 300 data points to reduce dimentionality from 100 to 2 for visualization.


#### The reasons using Spark Streaming for this project are:
1. Spark streaming handles the in-coming stream in small batches. It is very flexible for either incremental or recomputational calculations(In our case, the word counts are updated on previous result and all others are recomputed from scratch every 5 seconds). 
2. Streaming computing and batch computing are well intergrated thus the algorithm is scalable. 
3. We can easily apply other existing RDD operations under Spark framework like machine learning and SQL querying on the DStream. 
4. Although there are many other tools can handle streaming data in distributed machines, Spark's core abstraction RDD, which caches data in memory, saves time for data I/O from external storage system and thus makes iterative calculation much more efficient.

In [2]:
from re import sub
from subprocess import Popen
from datetime import datetime
import numpy as np
from sklearn.manifold import TSNE
from pyspark import SparkContext, SparkConf
from pyspark.streaming import StreamingContext
import StreamingPlot as sp

## Global functions & variables:

In [3]:
def timeHeader():
    """Create a header containing starting time for each 
    round of analysis."""
    
    global t_0
    t_0 = datetime.now()
    
    return "\n------------------------- Time: " + \
           str(t_0) + " -------------------------\n"


def minMaxScale(arr, bounds=(0, 1)):
    """ Scale the given numeric sequence into given range.
    
    Inputs:
    -------
    arr: sequence to be scaled
    bounds: the lower and upper bounds as a tuple
    
    Output:
    -------
    scaled sequence as an 1-d numpy array
    """
    if type(arr) is not np.ndarray: arr = np.array(arr)
      
    # calculate the range between 2 bounds
    # scale the array into 0-1 bounds
    # scale the array into given bounds
    rng = bounds[1] - bounds[0]  
    arr = (arr - min(arr))/(max(arr) - min(arr))
    arr = (arr * rng) + bounds[0]
    
    return arr


# global variables in order:
# the tags(labels) for tweets
# the colors for each tag
# run the tweets listener in the background
# init a streaming plot object
# init a empty plotting data source
tags = ['nba', 'nfl', 'mlb']                                
cols = ['red', 'blue', 'green']                             
_    = Popen(["python", "TweetsListener.py"])               
plt  = sp.StreamingPlot(interval=5, width=900, height=900)  
pds  = {'x':[], 'y':[], 'color':[], 'tags':[], 'size':[], 'alpha':[]}  

## Receive tweets:

After we run the listerner.py file as a background app, the incoming tweets will be available at IP:172.17.0.2, port:5555.   
\*Here the IP address is the docker container's IP, not local machine's.

On the Spark end, the stream is directed into a data abstraction called DStream which is a sequence of RDD's. Given interval time, each RDD holds tweets came in through socket during last interval and are distributed on different machines in cluster. And when we apply operations on the DStream, we are actually manipulating the RDD underlying.

Because the in coming stream is messy, there are situations when multiple tweets in one line or one tweet in multiple line. When we receive tweets, we first organize them by delimiter so each row in RDD will be a single tweet.

    example raw tweets:
	Here's every Tom Brady Postseason TD! #tbt #NFLPlayoffs https://t.co/2CIHBpz2OW...
	RT @ChargersRHenne: This guy seems like a class act.  I will root for him
	RT @NBA: Kyrie ready! #Celtics #NBALondon https://t.co/KgZVsREGUK...
	RT @NBA: The Second @NBAAllStar Voting Returns! https://t.co/urTwnGQNKl...
	...


In [4]:
# set up spark configuration using local mode with all CPU's
# create a spark context with this configuration
# create a streaming context and set the interval to 5 second.
# set log level for spark context
# set checkpoint directory for spark streaming context
conf = SparkConf().setMaster("local[*]")
sc   = SparkContext(conf=conf)
ssc  = StreamingContext(sc, batchDuration=5)
sc.setLogLevel("ERROR")
ssc.checkpoint("checkpoint")

# create a DStream by listening to socket
# For each RDD in stream:
# - split the tweets by delimiter "><"
# - flat map all the rows contains list
# get n rows: (tweet)
rawTweets = ssc.socketTextStream('172.17.0.2',5555)\
    .map(lambda x: x.split("><"))\
    .flatMap(lambda x: x)\
    
def logRawTweets(rdd):
    """ Save the first 5 raw tweets in external text file."""
    global t_1
    with open("logs.txt", 'a') as file:
        file.write(timeHeader())
        file.write("rawTweet:\n\t{} raw tweets received from port:\n".format(rdd.count()))
        for row in rdd.take(5): 
            file.write('\t' + str(row[:80]) + '...\n')
        file.write("...\n")
        
        # calculate how much time spent on this step
        t_1 = datetime.now()
        file.write("time for receiving raw tweets: "+str(t_1 - t_0)+"\n")

rawTweets.foreachRDD(logRawTweets)

## Analyze tweets:
Then we will apply function on this rawTweets DStream and transform it into new streams.

![](imgs/DStreams.png)


### 1. preprocessing
In the raw stream, each tweet is a line of string consist of alphabets, numbers, punctuation marks and possibly symbols. In this preprocessing step, we will split the string into a list of clean words, remove symbols like punctuation marks and emojis and assign a tag for each tweet.

    example cleaned tweets after preprocessing:
	tag:1, words:['rt', 'chargersrhenne', 'this', 'guy', ...],
	tag:0, words:['rt', 'debruynekev', 'amp', 'ilkayguendogan', ...],
	tag:0, words:['rt', 'commissioner', 'adam', 'silver', ...],
	tag:0, words:['rt', 'spurs', 'all', 'star', ...],
	tag:0, words:['nbaallstar', 'karlanthony', 'towns', 'nbavote', ...],
    ...

In [5]:
def assignTag(tweet):
    """ Assign a tag for given tweet and remove the tag word from this tweet. 
    If a tweet has none of the tag words, it is discarded. If a tweet has
    more than one tag words, it is assign a tag in order: NBA, NFL, MLB.
    """
    for i, tag in enumerate(tags):
        if tag in tweet:
            return (i, [word for word in tweet if word != tag])

        
# For each RDD in stream:
# - split the words
# - strip the symbols and keep only numbers and alphabets
# - lower case the words
# - remove empty strings
# - assign tags for each tweet and remove tag words
# get n rows: (tag, [word_0, word_1, ..., word_m])
cleanTweets = rawTweets\
    .map(lambda x: x.split())\
    .map(lambda xs: [sub('[^A-Za-z0-9]+', '', x) for x in xs])\
    .map(lambda xs: [x.lower() for x in xs])\
    .map(lambda xs: [x for x in xs if x != ''])\
    .map(assignTag)\
    .filter(lambda x: x != None)

    
def logCleanTweets(rdd):
    """ Save first 5 cleaned tweets and time spent in this step in log file."""
    global t_1
    with open("logs.txt", 'a') as file:
        file.write("\ncleanTweet:\n\tcleaned tweets after preprocessing:\n")
        for row in rdd.take(5):
            w = str(row[1][:4]).rstrip(']') + ', ...],\n'
            file.write('\ttag:'+str(row[0])+", words:" + w)
        file.write("...\n")
        
        file.write("time for cleaning tweets: "+str(datetime.now()-t_1)+"\n")
        t_1 = datetime.now()
    
cleanTweets.foreachRDD(logCleanTweets)

### 2. feature extraction
Next, we count the frequency of each word ever appeared in the streaming tweets and keep updating the counts when new data comes in.

    wordCount:
	('rt', 196)
	('the', 174)
	('in', 85)
	('for', 62)
	('to', 59)
    ...

In [6]:
def updateWordCount(newValues, oldCount):
    """ Update the word count with new data.
    
    Inputs:
    -------
    newValues: (word, 1) tuples in this round
    oldCount: (word, count) tuples upto last round.
    
    Output:
    -------
    (word, count) tuples upto this round.
    """
    return sum(newValues) + (oldCount or 0)

# For each RDD in stream:
# - remove the tag
# - flatten the list of words so each row has one word
# - add 1 count to each word
# - use the new word count to update old word count incrementally
# - sort the word and count tuple by the count in descending order
# get n rows: (word, count)
wordCount = cleanTweets\
    .map(lambda x: x[1])\
    .flatMap(lambda x: x)\
    .map(lambda x: (x, 1))\
    .updateStateByKey(updateWordCount)\
    .transform(lambda rdd: rdd.sortBy(lambda x: -x[1]))

From the word counts, we take the 5000 most frequent words as features for tweets dataset. Each feature value in dataset is either True or False indicating whether corresponding tweet contains this feature word.

In [7]:
# For each RDD in stream:
# - zip rows with index
# - filter the first 5000 rows
# - discard the index
# get 5000 rows: (word, count)
features = wordCount\
    .transform(lambda rdd: rdd.zipWithIndex())\
    .filter(lambda x: x[1] < 5000)\
    .map(lambda x: x[0])

# For each RDD in stream:
# - take only the word
# get 5000 rows: (word)
words = features.map(lambda x: x[0])

# For each RDD in stream:
# - take only the count
# get 5000 rows: (count)
counts = features.map(lambda x: x[1])


def logFeatureWords(rdd):
    """ Save first 5 word count and time spent in this step in log file."""
    global t_1
    with open("logs.txt", 'a') as file:
        file.write("\nfeatures:\n\twords used as features and their counts:\n")
        for row in rdd.take(num=5): file.write('\t' + str(row) + "\n")
        file.write("...\n")
        
        file.write("time for generating and extracting features:" \
                   + str(datetime.now()-t_1) + "\n")
        t_1 = datetime.now()
    
features.foreachRDD(logFeatureWords)

### 3. encode dataset
With the feature words avaliable, we can transform the tweets into a structured dataset for further analysis. Because the amount of tweets in 5 seconds is limited. Here we use window slicing to retrieve and combine tweets in a longer range of time. To be specific, we will fetch all tweets in last 15 seconds and combine them as one RDD every 5 second.

	example encoded dataset:
	tag: 0, features: [1, 0, 0, 0, 0, 0, 0, 0, 0, 0, ...],
	tag: 1, features: [1, 0, 0, 0, 0, 0, 0, 0, 0, 0, ...],
	tag: 2, features: [1, 0, 0, 0, 0, 0, 0, 0, 0, 0, ...],
	tag: 0, features: [0, 1, 1, 1, 1, 1, 0, 0, 0, 0, ...],
	tag: 1, features: [0, 0, 1, 0, 0, 0, 1, 1, 1, 1, ...],
    ...

In [8]:
def encodeDataSet(rdd_a, rdd_b):
    """ Encode given tweets by given feature words.
    If a feature word appears in this tweet, the corresponding
    feature is 1, otherwise 0.
    """
    words_  = rdd_b.collect()
    rdd_new = rdd_a.mapValues(lambda x: \
        ([int(word in x) for word in words_], 1))
    return rdd_new

# For each RDD in stream:
# - get tweets in 15s window every 5s
# - encoded the tweets using features and append value 1
#   onto each tweet to count the number of tweets later.
# get rows: (tag, ([1, 0, ..., 1], 1))
dataset = cleanTweets\
    .window(15, 5)\
    .transformWith(encodeDataSet, words)

def logDataset(rdd):
    """ Save first 10 rows in encoded dataset and time spent 
    in this step in log file."""
    global t_1
    with open("logs.txt", 'a') as file:
        file.write("\nDataset:\n\tencoded dataset:\n")
        for row in rdd.take(10):
            t = str(row[0])
            f = str(row[1][0][:10]).rstrip(']') + ', ...],\n'
            file.write("\ttag: " + t + ", features: " + f)
        file.write("...\n")
        
        file.write("time for encoding dataset:" + str(datetime.now()-t_1) + "\n")
        t_1 = datetime.now()
    
dataset.foreachRDD(logDataset)

### 4. calculate "predictiveness" for features

**p(feature|tag)**:   
Since all the feature values are either 0 or 1, we can easily get the probability of one feature conditioned on any tag by summing up the values of this column under corresponding tag and divide it by the total count of tweets under this tag.

**predictiveness**:  
Then we calculate the predictiveness of each feature from this dataset. The predictiveness of one feature quantifies how well can a feature discriminate the label of a tweet from other 2 labels.

    1. If we have only 2 labels, it can be defined by the bigger ratio of conditional probabilities of this feature given tags.
    
$$pdtn(label_1, label_2) = arg max\left( \frac{p(word|label_1)}{p(word|label_2)}, \frac{p(word|label_2)}{p(word|label_1)}\right)$$

\*Note that this measure is symmetric. In other words, how much a feature can distinguish label 1 from label 2 is the same as how much it can distinguish label 2 from label 1.
    
    2. When we have more than 2 labels, we can take the average of the predictivenesses of all label combinations.
    
$$pdtn(label_1, label_2, label_3) = \frac{pdtn(label_1, label_2) + pdtn(label_1, label_3) + pdtn(label_2, label_3)}{3}$$

    3. At last this predictiveness of feature word should be weighted by the count of this word. The more frequent this word appears, the more reliable this predictiveness is.

$$pdtn = pdtn \times count$$

    most predictive words:
	word        : (cp0, cp1, cp2, pdtn)
	don         : (0.15934065934065933, 0.0078125, 0.05555555555555555, 283.4985066666667)
	allstar     : (0.14835164835164835, 0.0078125, 0.05555555555555555, 249.3439)
	the         : (0.46153846153846156, 0.4765625, 0.3333333333333333, 244.9167533333333)
	alabama     : (0.005494505494505495, 0.140625, 0.05555555555555555, 216.67129)
	rt          : (0.5879120879120879, 0.625, 0.5555555555555556, 211.0108)
	fitzpatrick : (0.005494505494505495, 0.1328125, 0.05555555555555555, 195.5925333333333)
	voting      : (0.12637362637362637, 0.0078125, 0.05555555555555555, 187.45217333333335)
	minkah      : (0.005494505494505495, 0.125, 0.05555555555555555, 175.55554999999998)
	draft       : (0.016483516483516484, 0.171875, 0.1111111111111111, 149.7176)
	know        : (0.16483516483516483, 0.0234375, 0.05555555555555555, 131.95061333333334)
    ...

In [9]:
def elementWiseAdd(list_1, list_2):
    """ Add up 2 lists element-wisely"""
    return [a + b for a, b in zip(list_1, list_2)]

def calcPDTN(cps):
    """ takes in the conditional probablities of one feature 
    and calculate the predictiveness of this feature.
    """
    cp0, cp1, cp2 = cps
    f    = lambda a, b: round(max(a/b, b/a), 5)
    pdtn = (f(cp0, cp1) + f(cp0, cp2) + f(cp1, cp2))/3

    return (cp0, cp1, cp2, pdtn)


def weightByCount(rdd_a, rdd_b):
    """ Weight the predictiveness of features by their counts."""
    # collect the counts as a list
    # for each row in rdd:
    # weight the pdtn by corresponding count
    # get 5000 rows: (index, (cp0_m, cp1_m, cp2_m, weightedPDTN_m))
    cnts    = rdd_b.collect()
    rdd_new = rdd_a.map(lambda x: (x[0], (x[1][0], x[1][1], \
        x[1][2], x[1][3]*cnts[x[0]])))
    
    return rdd_new
        

# For each RDD in stream:
# - reduce the RDD by key so that each RDD has only two rows,
#   each row has 3 elements: the tag, counts of appearances 
#   of each feature word in the tweets and the count of tweets
# get 3 rows corresponding to 3 tags: 
#   (tag, ([featCnt_0, featCnt_1, ..., featCnt_4999], tweetCnt))
# - divide the count of each feature word by the count of tweets
#   to get the conditional probability. We add 1 to the count 
#   to avoid zero divide error.
# get 3 rows: (tag, [cp_0, cp_1, ..., cp_4999])
# - for each row, combine each cp with tag
# get 3 rows: ((0, cp_0), (1, cp_1), ..., (4999, cp_4999))
# - flatmap these 3 rows
# get 15000 rows: (m, cp_m)
# - reduce by key to combine cps for different tags
# get 5000 rows: (index, (cp0_m, cp1_m, cp2_m))
# - for each row, calculate predictiveness using conditional prob
# get 5000 rows: (index, (cp0_m, cp1_m, cp2_m, pdtn_m))
# - weight the pdtn by the count of the words
# get 5000 rows: (index, (cp0_m, cp1_m, cp2_m, weightedPDTN_m))
featureStats = dataset\
    .reduceByKey(lambda a, b: (elementWiseAdd(a[0], b[0]), a[1]+b[1]))\
    .mapValues(lambda x: [(count+1)/x[1] for count in x[0]])\
    .map(lambda x: [(i, cp) for i, cp in enumerate(x[1])])\
    .flatMap(lambda x: x)\
    .mapValues(lambda x: [x])\
    .reduceByKey(lambda a, b: a + b)\
    .map(lambda x: (x[0], calcPDTN(x[1])))\
    .transformWith(weightByCount, counts)
    
def zipWithWords(rdd_a, rdd_b):
    """ Replace the feature index with word.
    """
    words_ = rdd_b.collect()
    return rdd_a.map(lambda x: (words_[x[0]], x[1]))
    
def logFeatureStats(rdd):
    """ Save top 10 most predictive words and their stats to log file."""
    global t_1
    rdd_new = rdd.sortBy(lambda x: -x[1][3])
    with open("logs.txt", 'a') as file:
        file.write("\nmost predictive words:\n\tword : (cp0, cp1, cp2, pdtn)\n")
        for row in rdd_new.take(10): 
            file.write("\t" + row[0] + " : " + str(row[1]) + "\n")
        file.write("...\n")
        
        file.write("time for calculating predictiveness:" + str(datetime.now()-t_1) + "\n")
        t_1 = datetime.now()
    
featureStats.transformWith(zipWithWords, words)\
    .foreachRDD(logFeatureStats)

## Visualize predictive features

To validate whether this feature predictiveness makes sense, we take top 300 most predictive features out of 5000. Under each label, 60 tweets are selected for a 2-d scatter plot. 

In the plotting, each circle represents a tweet. The color is identified by the tweet label-NBA tweets will be red, NFL tweets will be blue and MLB ones will be green. The size and alpha of circle is identified by the sum of predictivenesses of words in each tweet. And the x, y values are 2 dimensions learned from those 300 features of tweets using t-distributed stochastic neighbor embedding.

\*All the numeric attributes are standardized using min-max scaler to produce more readable result.

    plotting data source:
	Total number of tweets:138
	Number of NBA tweets; NFL tweets; MLB tweets : [60, 60, 18]
	x     : [ 0.17929186  0.18399966  0.63295108  0.17661807...]
	y     : [ 0.62392987  0.66881742  0.69876889  0.36454208...]
	color : ['red', 'red', 'red', 'red'...]
	tags  : ['nba', 'nba', 'nba', 'nba'...]
	size  : [0.0042378419396584864, 0.0042378419396584864, 0.0042378419396584864, 0.0042378419396584864...]
	alpha : [ 0.42378419  0.42378419  0.42378419  0.42378419...]

If the tweets are distinguishable and the feature predictiveness really captures the information inside tweets, the circles in plotting will be well seperated.

### 1. Select highly predictive features and tweets

In [10]:
def fetchFeatsTweets(rdd_a, rdd_b):
    """ Given predictiveness of each feature word, fetch 300 most
    predictive features and 180 most predictable tweets under 3 tags
    from encoded dataset.
    """
    # For each RDD:
    # - take the index and pdtn of features
    # get 5000 rows: (index, pdtn_i)
    # - sort by pdtn
    # get 5000 rows: (index, pdtn_i)
    # - discard the pdtn and take the first 300 indices
    featIndex = rdd_b\
        .map(lambda x: (x[0], x[1][3]))\
        .sortBy(lambda x: -x[1])\
        .map(lambda x: x[0])\
        .take(num=300)
        
    # collect the predictiveness of each feature word as a list
    pdtns = rdd_b.map(lambda x: x[1][3]).collect()
    sumPDTN = lambda a, b: sum([i*k for i, k in zip(a, b)])
    
    # for each row in rdd:
    # has n rows: (tag, ([feat_0, ..., feat_4999], 1))
    # - keeps only high predictive features
    # get n rows: (tag, ([feat_0, ..., feat_299], 1))
    # - calculate the sum pdtn
    # get n rows: (tag, [feat_0, ..., feat_299], sumPDTN)
    # - sort by sumPDTN
    # get n rows: (tag, [feat_0, ..., feat_299], sumPDTN)
    # - filter the first 60 rows:
    # get 100 rows: (tag, [feat_0, ..., feat_299], sumPDTN)
    rdd_new = rdd_a\
        .map(lambda x: (x[0], [x[1][0][i] for i in featIndex]))\
        .map(lambda x: (x[0], x[1], sumPDTN(x[1], pdtns)))\
        .sortBy(lambda x: -x[2])\
        .zipWithIndex()\
        .filter(lambda x: x[1] < 60)\
        .map(lambda x: x[0])\
    
    return rdd_new

# for each tag, filter dataset DStream with corresponding tag
# fetch high predictive features and predictable tweets.
ds_0 = dataset.filter(lambda x: x[0] == 0)\
    .transformWith(fetchFeatsTweets, featureStats)
ds_1 = dataset.filter(lambda x: x[0] == 1)\
    .transformWith(fetchFeatsTweets, featureStats)
ds_2 = dataset.filter(lambda x: x[0] == 2)\
    .transformWith(fetchFeatsTweets, featureStats)
      
# concatenate 3 DStreams together
# get 300 rows: (tag, [feat_0, ..., feat_99], sumPDTN)
selectedTweets = ds_0.union(ds_1).union(ds_2)


def logSelectedTweets(rdd):
    """ Save first 5 selected tweets and features to log file."""
    global t_1
    cnt = rdd.count()
    with open("logs.txt", 'a') as file:
        file.write("\nselected {} tweets and 100 features for plotting:\n".format(cnt))
        for row in rdd.take(5): 
            feats = str(row[1][:20]).rstrip(']') + ", ...],"
            file.write("\t" + str(row[0]) + ", " + feats + str(row[2]) + "\n")
        file.write("...\n")
        
        file.write("time for filtering tweets and features:" + str(datetime.now()-t_1) + "\n")
        t_1 = datetime.now()
    
selectedTweets.foreachRDD(logSelectedTweets)

### 2. collect plotting data source and dimensionality reduction

Since the dataset we have are sparse and have binary values, here we use non-linear method t-SNE to learn the 2-d manifold the dataset lies on to obtain x values and y values for our each tweet.

In [11]:
def collectPDS(rdd):
    """ collect the tag, feature values and predictiveness sum 
    and fit-transform the features with TSNE.
    """
    global tsne_0, tsne_1, color, label, alpha, t_1
    
    # transform tags indices to colors, and tags
    # collect data to master machine
    pData = rdd.map(lambda x: (cols[x[0]], tags[x[0]], x[2], x[1]))
    pData = pData.collect()
    
    # reduce the dimensionality to 2 using t-SNE
    # scale the learned features to range [0, 1]
    array  = np.array([x[3] for x in pData])
    if len(array.shape) == 1: 
        array = array.reshape(1, -1)
    embed  = TSNE(n_components=2).fit_transform(array)
    tsne_0 = minMaxScale(embed[:, 0])
    tsne_1 = minMaxScale(embed[:, 1])
    
    color = [x[0] for x in pData]
    label = [x[1] for x in pData]
    alpha = [x[2] for x in pData]
    
    with open("logs.txt", 'a') as file:
        file.write("\ntime for TSNE learning:" + str(datetime.now()-t_1) + "\n")
    t_1 = datetime.now()
    
selectedTweets.foreachRDD(collectPDS)


def printPDS(pds):
    """ Save plotting data source to log file."""
    global t_1
    lens = [len([x for x in pds['color'] if x == i]) for i in cols]
    
    with open("logs.txt", 'a') as file:
        file.write("\nplotting data source:\n")
        file.write("\tTotal number of tweets:" + str(len(pds['x'])) + "\n")
        file.write("\tNumber of NBA tweets; NFL tweets; MLB tweets : " + str(lens) + "\n")
        for item in pds.items(): 
            file.write("\t" + item[0] + " : " + str(item[1][:4]).rstrip(']') + "...]\n")
        file.write("...\n")
        
        file.write("time for collecting plotting data source:" + str(datetime.now()-t_1) + "\n")
        t_1 = datetime.now()
        file.write("\nTiming for this round: {}\n".format(t_1 - t_0))

        
def updatePDS(rdd):
    """ update the dictionary containing data for plotting.
    """
    global alpha
    alpha = minMaxScale(alpha, (0.2, 0.8))
    sizes = [i/100 for i in alpha]
    
    global pds
    pds['x']     = tsne_0
    pds['y']     = tsne_1
    pds['color'] = color
    pds['tags']  = label
    pds['alpha'] = alpha
    pds['size']  = sizes
    
    printPDS(pds)
     
selectedTweets.foreachRDD(updatePDS)

With the computation pipeline set, we start the Spark Streaming context and show the plotting.

In [12]:
ssc.start()
plt.start(pds)

KeyboardInterrupt: 