In [73]:
import codecs, re, json, os, time
from pyspark import SparkContext, SparkConf
from pyspark.mllib.fpm import FPGrowth
from pyspark.sql import SQLContext, Row
from pyspark.ml import Pipeline
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.feature import HashingTF, Tokenizer, IDF, StopWordsRemover
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
from pyspark.ml.evaluation import BinaryClassificationEvaluator

<h1>Create Spark and SQL context:</h1>

In [74]:
conf = SparkConf().setAppName("Text Classifier")
# if not sc:
sc = SparkContext(conf=conf)

ValueError: Cannot run multiple SparkContexts at once; existing SparkContext(app=Text Classifier, master=local[*]) created by __init__ at <ipython-input-2-eea46be77db3>:3 

In [75]:
sqlContext = SQLContext(sc)

<h1>Load Configuration File</h1>

In [76]:
def load_config(config_file):
    """
    Load collection configuration file.
    """
    with open(config_file) as data_file:    
        config_data = json.load(data_file)
    return config_data

<h1>Parse Tweets to tweet_id and tweet_text</h1>

In [77]:
def parse_tweet(line):
    """
    Parses a tweet record having the following format collectionId-tweetId<\t>tweetString
    """
    fields = line.strip().split("\t")
    if len(fields) == 2:
        # The following regex just strips of an URL (not just http), any punctuations, 
        # or Any non alphanumeric characters
        # http://goo.gl/J8ZxDT
        text = re.sub("(@[A-Za-z0-9]+)|([^0-9A-Za-z \t])|(\w+:\/\/\S+)"," ",fields[1]).strip()
        # remove terms <= 2 characters
        text = ' '.join(filter(lambda x: len(x) > 2, text.split(" ")))
        # return tuple of (collectionId-tweetId, text)
        return (fields[0], text)

<h1>Load tweets from file into DataFrame:</h1>

In [78]:
def Load_tweets(collection_id):
    tweets_file = os.path.join(base_dir , data_dir , "z_" + collection_id)
    print("Loading " + tweets_file) 
    if not os.path.isdir(tweets_file):
        print(tweets_file + " folder doesn't exist.")
        return False
    tweets = sc.textFile(tweets_file) \
              .map(parse_tweet) \
              .filter(lambda x: x is not None) \
              .map(lambda x: Row(id=x[0], text=x[1])) \
              .toDF() \
              .cache()
    return tweets

<h1>Tokenize and remove stop words from tweet text:</h1>

In [79]:
def preprocess_tweets(tweets):
    tokenizer = Tokenizer(inputCol="text", outputCol="words")
    tweets = tokenizer.transform(tweets)
    remover = StopWordsRemover(inputCol="words", outputCol="filtered")
    tweets = remover.transform(tweets)
    return tweets

<h1>Save unique tokens:</h1>

In [80]:
# Frequent pattern mining expect each row to have a unique set of tokens
def save_unique_token(tweets):
    tweets = (tweets
      .rdd
      .map(lambda x : (x.id, x.text, list(set(filter(None, x.filtered)))))
      .toDF()
      .withColumnRenamed("_1","id")
      .withColumnRenamed("_2","text")
      .withColumnRenamed("_3","filtered")).cache()
    return tweets

<h2>Run Frequent Pattern Mining algorithm and save to output file:</h2>

In [81]:
def run_FPM(tweets, collection):
    model = FPGrowth.train(tweets.select("filtered").rdd.map(lambda x: x[0]), minSupport=0.02)
    result = sorted(model.freqItemsets().collect(), reverse=True)
    # sort the result in reverse order
    sorted_result = sorted(result, key=lambda item: int(item.freq), reverse=True)

    # save output to file
    with codecs.open(FP_dir + time.strftime("%Y%m%d-%H%M%S") + '_'
                            + collection["Id"] + '_' 
                            + collection["name"] + '.txt', 'w',encoding='utf-8') as file:
        for item in sorted_result:
            file.write("%s %s\n" % (item.freq, ' '.join(item.items)))

<h1>Global Variables</h1>

In [82]:
base_dir = "/home/hshahin/Spring2016_IR_Project/data/"
data_dir = "small_data"
predictions_dir = os.path.join(base_dir , data_dir, "predictions")
FP_dir = base_dir + "FPGrowth/"
config_file = "collections_config.json"
config_data = load_config(os.path.join(base_dir , config_file))

<h1>Phase I: Run FPM to all data sets</h1>

In [83]:
for x in config_data["collections"]:
    tweets = Load_tweets(x["Id"])
    if tweets:
        tweets = preprocess_tweets(tweets)
        tweets = save_unique_token(tweets)
        run_FPM(tweets, x)

Loading /home/hshahin/Spring2016_IR_Project/data/small_data/z_602
Loading /home/hshahin/Spring2016_IR_Project/data/small_data/z_541
Loading /home/hshahin/Spring2016_IR_Project/data/small_data/z_668
Loading /home/hshahin/Spring2016_IR_Project/data/small_data/z_700
Loading /home/hshahin/Spring2016_IR_Project/data/small_data/z_686
Loading /home/hshahin/Spring2016_IR_Project/data/small_data/z_694
Loading /home/hshahin/Spring2016_IR_Project/data/small_data/z_532
/home/hshahin/Spring2016_IR_Project/data/small_data/z_532 folder doesn't exist.


<h1>Manually choose frequent patterns and write them in the configuration file.</h1>
<h1>Reload the configuration file:</h1>

In [84]:
# get FP from config file
config_data = load_config(os.path.join(base_dir , config_file))

<h1>Create training data of positive and negative samples as DataFrame</h1>

In [85]:
def create_training_data(tweets, freq_patterns):
    # Tweets contains the frequent pattern terms will be considered as positive samples
    positive_tweets = (tweets
      .rdd
      .filter(lambda x: set(freq_patterns).issubset(x.filtered))
      .map(lambda x : (x[0], x[1], x[2], 1.0))
      .toDF()
      .withColumnRenamed("_1","id")
      .withColumnRenamed("_2","text")
      .withColumnRenamed("_3","filtered")
      .withColumnRenamed("_4","label"))

    # calculate a fraction of positive samples to extract equivalent number of negative samples
    positive_fraction = float(positive_tweets.count()) / tweets.count()

    # Negative samples will be randomly selected from non_positive samples
    negative_tweets = (tweets
      .rdd
      .filter(lambda x: not set(freq_patterns).issubset(x[2]))                   
      .sample(False, positive_fraction, 12345)
      .map(lambda x : (x[0], x[1], x[2], 0.0))
      .toDF()
      .withColumnRenamed("_1","id")
      .withColumnRenamed("_2","text")
      .withColumnRenamed("_3","filtered")
      .withColumnRenamed("_4","label"))
    training_data = positive_tweets.unionAll(negative_tweets)
    return training_data

<h1> Train LogisticRegression Classifier:</h1>

In [86]:
def train_lg(training_data):
    # Configure an ML pipeline, which consists of the following stages: hashingTF, idf, and lr.
    hashingTF = HashingTF(inputCol="filtered", outputCol="TF_features")
    idf = IDF(inputCol=hashingTF.getOutputCol(), outputCol="features")
    pipeline1 = Pipeline(stages=[hashingTF, idf])

    # Fit the pipeline1 to training documents.
    model1 = pipeline1.fit(training_data)

    lr = LogisticRegression(maxIter=10, regParam=0.3, elasticNetParam=0.8)
    pipeline2 = Pipeline(stages=[model1, lr])

    paramGrid = ParamGridBuilder() \
        .addGrid(hashingTF.numFeatures, [10, 100, 1000]) \
        .addGrid(lr.regParam, [0.1, 0.01]) \
        .build()

    crossval = CrossValidator(estimator=pipeline2,
                              estimatorParamMaps=paramGrid,
                              evaluator=BinaryClassificationEvaluator(),
                              numFolds=2)  # use 3+ folds in practice

    # Run cross-validation, and choose the best set of parameters.
    cvModel = crossval.fit(training)
    return cvModel

<h1>Evaluating LogisticRegression model on training data:</h1>

In [87]:
def get_training_score_lg(lg_model, training_data):
    training_prediction = lg_model.transform(training_data)
    selected = training_prediction.select("label", "prediction").rdd.map(lambda x: (x[0], x[1]))
    training_error = selected.filter(lambda (label, prediction): label != prediction).count() / float(tweets.count())
    print("Training Error = " + str(training_error))

<h1>Prepare testing data:</h1>

In [88]:
def create_testing_data(tweets):
    testing_data = (tweets
                    .rdd
                    .map(lambda x: Row(id=x[0], filtered=x[2]))
                    .toDF())
    return testing_data

<h1>Prediction Fucntion</h1>

In [89]:
def lg_prediction(lg_model, testing_data, collection):
    # Perfom predictions on test documents and save columns of interest to a file.
    prediction = lg_model.transform(testing_data)
    selected = prediction.select("id", "prediction")
    prediction_path = os.path.join(predictions_dir , time.strftime("%Y%m%d-%H%M%S") + '_'
                            + collection["Id"] + '_' 
                            + collection["name"])
    print(prediction_path)
    def saveData(data):
        with open(prediction_path, 'a') as f:
            f.write(data.id+"\t"+str(data.prediction)+"\n")
    selected.foreach(saveData)    
    # selected.rdd.saveAsTextFile(prediction_path)

<h1>Phase II: Train classifier and perform prediction</h1>

In [90]:
for x in config_data["collections"]:
    tweets = Load_tweets(x["Id"])
    if tweets:
        freq_patterns = x["FP"]
        tweets = preprocess_tweets(tweets)
        training_data = create_training_data(tweets, freq_patterns)
        lg_model = train_lg(training_data)
        get_training_score_lg(lg_model, training_data)
        testing_data = create_testing_data(tweets)
        lg_prediction(lg_model, testing_data, x)

Loading /home/hshahin/Spring2016_IR_Project/data/small_data/z_602


Py4JJavaError: An error occurred while calling o6133.count.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 1 in stage 589.0 failed 1 times, most recent failure: Lost task 1.0 in stage 589.0 (TID 1434, localhost): java.lang.OutOfMemoryError: Unable to acquire 262144 bytes of memory, got 185624
	at org.apache.spark.memory.MemoryConsumer.allocateArray(MemoryConsumer.java:91)
	at org.apache.spark.unsafe.map.BytesToBytesMap.allocate(BytesToBytesMap.java:735)
	at org.apache.spark.unsafe.map.BytesToBytesMap.<init>(BytesToBytesMap.java:197)
	at org.apache.spark.unsafe.map.BytesToBytesMap.<init>(BytesToBytesMap.java:212)
	at org.apache.spark.sql.execution.UnsafeFixedWidthAggregationMap.<init>(UnsafeFixedWidthAggregationMap.java:103)
	at org.apache.spark.sql.execution.aggregate.TungstenAggregationIterator.<init>(TungstenAggregationIterator.scala:483)
	at org.apache.spark.sql.execution.aggregate.TungstenAggregate$$anonfun$doExecute$1$$anonfun$2.apply(TungstenAggregate.scala:95)
	at org.apache.spark.sql.execution.aggregate.TungstenAggregate$$anonfun$doExecute$1$$anonfun$2.apply(TungstenAggregate.scala:86)
	at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$20.apply(RDD.scala:710)
	at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$20.apply(RDD.scala:710)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:73)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
	at org.apache.spark.scheduler.Task.run(Task.scala:89)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
	at java.lang.Thread.run(Thread.java:745)

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1431)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1419)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1418)
	at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1418)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:799)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:799)
	at scala.Option.foreach(Option.scala:236)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:799)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1640)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1599)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1588)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:620)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:1832)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:1845)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:1858)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:1929)
	at org.apache.spark.rdd.RDD$$anonfun$collect$1.apply(RDD.scala:927)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:111)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:316)
	at org.apache.spark.rdd.RDD.collect(RDD.scala:926)
	at org.apache.spark.sql.execution.SparkPlan.executeCollect(SparkPlan.scala:166)
	at org.apache.spark.sql.execution.SparkPlan.executeCollectPublic(SparkPlan.scala:174)
	at org.apache.spark.sql.DataFrame$$anonfun$org$apache$spark$sql$DataFrame$$execute$1$1.apply(DataFrame.scala:1538)
	at org.apache.spark.sql.DataFrame$$anonfun$org$apache$spark$sql$DataFrame$$execute$1$1.apply(DataFrame.scala:1538)
	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:56)
	at org.apache.spark.sql.DataFrame.withNewExecutionId(DataFrame.scala:2125)
	at org.apache.spark.sql.DataFrame.org$apache$spark$sql$DataFrame$$execute$1(DataFrame.scala:1537)
	at org.apache.spark.sql.DataFrame.org$apache$spark$sql$DataFrame$$collect(DataFrame.scala:1544)
	at org.apache.spark.sql.DataFrame$$anonfun$count$1.apply(DataFrame.scala:1554)
	at org.apache.spark.sql.DataFrame$$anonfun$count$1.apply(DataFrame.scala:1553)
	at org.apache.spark.sql.DataFrame.withCallback(DataFrame.scala:2138)
	at org.apache.spark.sql.DataFrame.count(DataFrame.scala:1553)
	at sun.reflect.GeneratedMethodAccessor87.invoke(Unknown Source)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:606)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:231)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:381)
	at py4j.Gateway.invoke(Gateway.java:259)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:133)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:209)
	at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.OutOfMemoryError: Unable to acquire 262144 bytes of memory, got 185624
	at org.apache.spark.memory.MemoryConsumer.allocateArray(MemoryConsumer.java:91)
	at org.apache.spark.unsafe.map.BytesToBytesMap.allocate(BytesToBytesMap.java:735)
	at org.apache.spark.unsafe.map.BytesToBytesMap.<init>(BytesToBytesMap.java:197)
	at org.apache.spark.unsafe.map.BytesToBytesMap.<init>(BytesToBytesMap.java:212)
	at org.apache.spark.sql.execution.UnsafeFixedWidthAggregationMap.<init>(UnsafeFixedWidthAggregationMap.java:103)
	at org.apache.spark.sql.execution.aggregate.TungstenAggregationIterator.<init>(TungstenAggregationIterator.scala:483)
	at org.apache.spark.sql.execution.aggregate.TungstenAggregate$$anonfun$doExecute$1$$anonfun$2.apply(TungstenAggregate.scala:95)
	at org.apache.spark.sql.execution.aggregate.TungstenAggregate$$anonfun$doExecute$1$$anonfun$2.apply(TungstenAggregate.scala:86)
	at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$20.apply(RDD.scala:710)
	at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$20.apply(RDD.scala:710)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:73)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
	at org.apache.spark.scheduler.Task.run(Task.scala:89)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
	... 1 more


<h1>prediction DataFrame contians "prediction" column to be filled in Hbase</h1>

In [None]:
# Make predictions on test documents and print columns of interest.
prediction = model2.transform(testing_data)
selected = prediction.select("id", "prediction")
def saveData(data):
    with open(base_dir+'FPGrowth/700_classified.txt', 'a') as f:
        f.write(data.id+"\t"+str(data.prediction)+"\n")
selected.foreach(saveData)