In [106]:
import codecs, re, json
from pyspark import SparkContext, SparkConf
from pyspark.mllib.fpm import FPGrowth
from pyspark.sql import SQLContext, Row
from pyspark.ml import Pipeline
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.feature import HashingTF, Tokenizer, IDF, StopWordsRemover

<h1>Create Spark and SQL context:</h1>

In [107]:
conf = SparkConf().setAppName("Text Classifier")
if not sc:
    sc = SparkContext(conf=conf)

In [108]:
sqlContext = SQLContext(sc)

<h1>Parse Tweets to tweet_id and tweet_text:</h1>

In [55]:
def parseTweet(line):
    """
    Parses a tweet record having the following format collectionId-tweetId<\t>tweetString
    """
    fields = line.strip().split("\t")
    if len(fields) == 2:
        # The following regex just strips of an URL (not just http), any punctuations, 
        # User Names or Any non alphanumeric characters
        # http://goo.gl/J8ZxDT
        text = re.sub("(@[A-Za-z0-9]+)|([^0-9A-Za-z \t])|(\w+:\/\/\S+)"," ",fields[1]).strip()
        # remove terms <= 2 characters
        text = ' '.join(filter(lambda x: len(x) > 2, text.split(" ")))
        # return tuple of (collectionId-tweetId, text)
        return (fields[0], text)

In [56]:
def loadConfig(config_file):
    """
    Load collection configuration file.
    """
    with open(config_file) as data_file:    
        config_data = json.load(data_file)
    return config_data

In [57]:
base_dir = "~/spring2016_ir_project/data/"
FP_dir = base_dir + "FPGrowth/"
config_file = "collections_config.json"
config_data = loadConfig(base_dir + config_file)

<h1>Load tweets from file into DataFrame:</h1>

In [58]:
collection_id = config_data["collections"][0]["Id"]
tweets_file = base_dir + "small_data/z_" + collection_id
tweets = sc.textFile(tweets_file) \
          .map(parseTweet) \
          .filter(lambda x: x is not None) \
          .map(lambda x: Row(id=x[0], text=x[1])) \
          .toDF() \
          .cache()

<h1>Tokenize and remove stop words from tweet text:</h1>

In [60]:
tokenizer = Tokenizer(inputCol="text", outputCol="words")
tweets = tokenizer.transform(tweets)
remover = StopWordsRemover(inputCol="words", outputCol="filtered")
tweets = remover.transform(tweets)

<h1>Save unique tokens:</h1>

In [61]:
tweets = (tweets
  .rdd
  .map(lambda x : (x.id, x.text, list(set(filter(None, x.filtered)))))
  .toDF()
  .withColumnRenamed("_1","id")
  .withColumnRenamed("_2","text")
  .withColumnRenamed("_3","filtered")).cache()

<h2>Run Frequent Pattern Mining algorithm and save to output file:</h2>

In [64]:
model = FPGrowth.train(tweets.select("filtered").rdd.map(lambda x: x[0]), minSupport=0.02)
result = sorted(model.freqItemsets().collect(), reverse=True)
# sort the result in reverse order
sorted_result = sorted(result, key=lambda item: int(item.freq), reverse=True)

# save output to file
with codecs.open(FP_dir + collection_id + '.txt', 'w',encoding='utf-8') as file:
    for item in sorted_result:
        file.write("%s %s\n" % (item.freq, ' '.join(item.items)))

<h1>Manually choose frequent patterns and write it in the configuration file.</h1>
<h1>Reload the configuration file:</h1>

In [65]:
# get FP from config file
config_data = loadConfig(base_dir + config_file)
freq_patterns = config_data["collections"][0]["FP"]

<h1>Create positive and negative samples as DataFrame</h1>

In [66]:
# Tweets contains the frequent pattern terms will be considered as positive samples

positive_tweets = (tweets
  .rdd
  .filter(lambda x: set(freq_patterns).issubset(x.filtered))
  .map(lambda x : (x[0], x[1], x[2], 1.0))
  .toDF()
  .withColumnRenamed("_1","id")
  .withColumnRenamed("_2","text")
  .withColumnRenamed("_3","filtered")
  .withColumnRenamed("_4","label"))

# calculate a fraction of positive samples to extract equivalent number of negative samples
positive_fraction = float(positive_tweets.count()) / tweets.count()

# Negative samples will be randomly selected from non_positive samples
negative_tweets = (tweets
  .rdd
  .filter(lambda x: not set(freq_patterns).issubset(x[2]))                   
  .sample(False, positive_fraction, 12345)
  .map(lambda x : (x[0], x[1], x[2], 0.0))
  .toDF()
  .withColumnRenamed("_1","id")
  .withColumnRenamed("_2","text")
  .withColumnRenamed("_3","filtered")
  .withColumnRenamed("_4","label"))

<h1>Create training data by joining positive and negative samples:</h1>

In [110]:
training_data = positive_tweets.unionAll(negative_tweets)

<h1> Train LogisticRegression Classifier:</h1>

In [111]:
# Configure an ML pipeline, which consists of the following stages: hashingTF, idf, and lr.
hashingTF = HashingTF(inputCol="filtered", outputCol="TF_features")
idf = IDF(inputCol=hashingTF.getOutputCol(), outputCol="features")
pipeline1 = Pipeline(stages=[hashingTF, idf])

# Fit the pipeline1 to training documents.
model1 = pipeline1.fit(training_data)

# TODO: more hyperparameter tuning is required
lr = LogisticRegression(maxIter=10, regParam=0.3, elasticNetParam=0.8)
pipeline2 = Pipeline(stages=[model1, lr])

# Fit the pipeline2 to training documents.
model2 = pipeline2.fit(training_data)

<h1>Evaluating LogisticRegression model on training data:</h1>

In [112]:
training_prediction = model2.transform(training_data)
selected = training_prediction.select("label", "prediction").rdd.map(lambda x: (x[0], x[1]))
training_error = selected.filter(lambda (label, prediction): label != prediction).count() / float(tweets.count())
print("Training Error = " + str(training_error))

Training Error = 7.55988508975e-05


<h1> TODO: More classifier to be added:</h1>

<h1>Prepare Test Data:</h1>

In [71]:
testing_data = (tweets
                .rdd
                .map(lambda x: Row(id=x[0], filtered=x[2]))
                .toDF())

In [114]:
# Make predictions on test documents and print columns of interest.
prediction = model2.transform(testing_data)
selected = prediction.select("id", "prediction")
selected.take(1)

[Row(id=u'602-581046908535353344', prediction=0.0)]

<h1>prediction DataFrame contians "prediction" column to be filled in Hbase</h1>