Predict a classification tag for a body of text in a all vs one strategy. The final output is a file, classification.pkl, that contains a row tuple for each of the top 100 tags in the training data set: `("some tag name", [prediction_values]*len(number of test cases))`
* Uses PySpark and Word2Vec

In [None]:
from pyspark import SparkContext
sc = SparkContext("local[*]", "pyspark_df")
print sc.version

In [None]:
import numpy as np
import random
import toolz
import time
from lxml import etree
import xml.etree.ElementTree as ET
import mwparserfromhell
import os
import re
from pyspark.ml.feature import Word2Vec
from pyspark.mllib.linalg import Vector, Vectors
from pyspark import SparkContext, SparkConf
from pyspark.sql.functions import UserDefinedFunction as udf
from pyspark.sql.types import DoubleType
from datetime import datetime, date, time
from pyspark.sql import SQLContext
sqlContext = SQLContext(sc)

In [None]:
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.feature import HashingTF, Tokenizer
import pickle
import pandas as pd

In [None]:
def xml_encode(line):
    try: 
        root = ET.fromstring(line.encode('utf-8'))
    except:
        return False
    return root
    
    
def tf_filter(x):
    if x == False:
        return False
    else:
        return True
    

In [None]:
train_posts = sc.textFile("train_dir")\
.filter(lambda line: line.strip().startswith('<row'))\
.map(lambda x: xml_encode(x))\
.filter(lambda x: tf_filter(x))\
.map(lambda line: (line.get("Body"),
                   line.get("PostTypeId"),
                   line.get("Tags")
                  ))\
.filter(lambda x: x[1]=='1')

In [None]:
all_tags = train_posts.map(lambda x: str(x[2]).strip("<").strip(">").split("><"))\
.flatMap(lambda x: x)\
.map(lambda x: (x,1))\
.aggregateByKey(0,lambda x,y: x+y, lambda x,y: x+y)\
.filter(lambda x: x[0] != None)\
.map(lambda x: (x[1],x[0]))\
.sortByKey(ascending=False)
top_tags = all_tags.take(110)
top_tags_df = pd.DataFrame.from_records(top_tags, columns = ["count", "tag"])
top_tags_df = top_tags_df.sort_values(by=['count', 'tag'], ascending=[False, True])
top_tags = top_tags_df.tag.values[0:100]
print top_tags_df

In [None]:
test_posts = sc.textFile("test_dir")\
.filter(lambda line: line.strip().startswith('<row'))\
.map(lambda x: xml_encode(x))\
.filter(lambda x: tf_filter(x))\
.map(lambda line: (line.get("Body"),
                   line.get("PostTypeId"),
                   line.get("Tags")
                  ))\
.filter(lambda x: x[1]=='1')

In [None]:
train_posts = train_posts.map(lambda x: (x[0], str(x[2]).strip("<").strip(">").split("><")))
test_posts = test_posts.map(lambda x: (x[0], str(x[2]).strip("<").strip(">").split("><")))

print train_posts.take(1)
print test_posts.take(1)

In [None]:
def tag_mapper(tag, tags):
    if tag in tags:
        return 1
    else:
        return 0

In [None]:
tag_prediction_array =  [(x,[0.0]*len(test_posts)) for x in top_tags]

n = 0
for tag in top_tags:
    print tag
    one_label_train_posts = train_posts.map(lambda x: (x[0], tag_mapper(tag, x[1])))
    training = sqlContext.createDataFrame(one_label_train_posts, ["body", "label"])

    tokenizer = Tokenizer(inputCol="body", outputCol="words")
    hashingTF = HashingTF(inputCol=tokenizer.getOutputCol(), outputCol="features")
    logreg = LogisticRegression(maxIter=25, regParam=0.03)# intercept=True )

    tokens = tokenizer.transform(training)
    hashes = hashingTF.transform(tokens)
    model = logreg.fit(hashes)

    one_label_test_posts = test_posts.map(lambda x: (x[0], tag_mapper(tag, x[1])))
    testing = sqlContext.createDataFrame(one_label_test_posts, ["body", "label"])
    
    test_tokens = tokenizer.transform(testing)
    test_hashes = hashingTF.transform(test_tokens)

    prediction = model.transform(test_hashes)
    probs = prediction.select("probability").collect()
    tag_probs = [p[0][1] for p in probs]
    tag_prediction_array[n] = (tag,tag_probs)
    n += 1
    
output = open("classification.pkl", "wb")
pickle.dump(tag_prediction_array, output)
output.close()