# Stack Overflow NLP Analysis

Stack Overflow is a collaboratively edited question-and-answer site originally focused on programming topics. Because of the variety of features tracked, including a variety of feedback metrics, it allows for some open-ended analysis of user behavior on the site.

Stack Exchange (the parent organization) provides an anonymized [data dump](https://archive.org/details/stackexchange), and we'll use Spark to perform data manipulation, analysis, and machine learning on this data set. As a side note, there's also an online data explorer which allows you to query the data interactively.

## Accessing the data


The stats directory contains the data and there are three sub-folders, `allUsers`, `allPosts`, and `allVotes`, which contain Gzipped XML with the following format:

```
<row Body="&lt;p&gt;I always validate my web pages, and I recommend you do the same BUT many large company websites DO NOT and cannot validate because the importance of the website looking exactly the same on all systems requires rules to be broken. &lt;/p&gt;&#10;&#10;&lt;p&gt;In general, valid websites help your page look good even on odd configurations (like cell phones) so you should always at least try to make it validate.&lt;/p&gt;&#10;" CommentCount="0" CreationDate="2008-10-12T20:26:29.397" Id="195995" LastActivityDate="2008-10-12T20:26:29.397" OwnerDisplayName="Eric Wendelin" OwnerUserId="25066" ParentId="195973" PostTypeId="2" Score="0" />
```

The full schema is available as a text file as `stack_exchange_schema.txt`.

## Data input and parsing


Some rows are split across multiple lines; these can be discarded. Incorrectly formatted XML can also be ignored. It is enough to simply skip problematic rows, the loss of data will not significantly impact our results on this large data sets.

We will need to handle XML parsing using `lxml.etree` in Python. We are going to take several shortcuts to speed up and simplify our computations.  First, our parsing function only attempts to parse rows that start with `  <row` as these denote actual data entries. This should be done in Spark as the data is being read in from disk, without any pre-Spark processing. We are going to use this cleaned dataset for the rest of the notebook.

In [2]:
from pyspark import SparkContext
import os, time

def localpath(path):
    return 'file://' + os.path.join(os.path.abspath(os.path.curdir), path)

sc = SparkContext("local[*]", "temp")

corpus = sc.textFile(localpath('stats-data/allPosts/'))
corpus_user = sc.textFile(localpath('stats-data/allUsers/*'))
corpus_votes = sc.textFile(localpath('stats-data/allVotes/*'))

In [3]:
import lxml.etree as ET

def xml_parser(row):
    if row[:6] == '  <row':
        try:
            tree = ET.fromstring(row)
            return 0
        except:
            return 1
    else:
        return 0

In [4]:
def clean_rows(row):
    if '<row' in row:
        try:
            tree = ET.fromstring(row)
            return True
        except:
            return False
    
corpus = corpus.filter(lambda x : clean_rows(x))
corpus_user = corpus_user.filter(lambda x : clean_rows(x))
corpus_votes = corpus_votes.filter(lambda x : clean_rows(x))

## Favorites and scores

We're interested in looking for useful patterns in the data. We're going to start by looking to see if there is a relationship between the number of times a post was favorited (the `FavoriteCount`) and the `Score`.  The score is the number of times the post was upvoted minus the number of times it was downvoted, so it is a measure of how much a post was liked.  We'd expect posts with a higher number of favorites to have better scores, since they're both measurements of how good the post is.

Let's aggregate posts by the number of favorites, and find the average score for the lowest 50 number of favorites.

In [5]:
def ZeroNone(x):
    if x: return int(x)
    else: return 0
    

def AggregateScore(x):
    score = ZeroNone(ET.fromstring(x).get('Score'))
    favorite = ZeroNone(ET.fromstring(x).get('FavoriteCount'))
    return (favorite, (score, 1))


favorite_score = corpus.map(lambda x : AggregateScore(x))\
                          .reduceByKey(lambda x,y : (x[0]+y[0], x[1]+y[1]))\
                           .map(lambda x: (x[0], x[1][0]/x[1][1]))\
                            .sortByKey()

In [6]:
favorite_score.collect()[:50]

[(0, 2.3398827696988396),
 (1, 2.7334613999279624),
 (2, 4.481914893617021),
 (3, 6.350249584026622),
 (4, 7.656934306569343),
 (5, 8.941888619854721),
 (6, 11.263779527559056),
 (7, 12.916666666666666),
 (8, 13.345864661654135),
 (9, 15.754237288135593),
 (10, 17.0),
 (11, 17.52542372881356),
 (12, 18.793650793650794),
 (13, 20.083333333333332),
 (14, 23.58823529411765),
 (15, 22.594594594594593),
 (16, 25.48148148148148),
 (17, 26.333333333333332),
 (18, 25.814814814814813),
 (19, 25.944444444444443),
 (20, 29.636363636363637),
 (21, 35.333333333333336),
 (22, 32.46153846153846),
 (23, 31.76923076923077),
 (24, 29.142857142857142),
 (25, 38.55555555555556),
 (26, 38.25),
 (27, 39.55555555555556),
 (28, 34.166666666666664),
 (29, 45.75),
 (30, 43.4),
 (31, 40.875),
 (32, 33.0),
 (33, 36.833333333333336),
 (34, 41.0),
 (35, 41.0),
 (36, 53.25),
 (37, 54.5),
 (38, 63.6),
 (39, 40.0),
 (40, 39.666666666666664),
 (41, 51.0),
 (42, 52.0),
 (44, 76.0),
 (45, 64.0),
 (47, 90.5),
 (48, 51.5),

## Answer percentage


We can investigate the correlation between a user's reputation and the type of their posts. For 100 users with the highest reputation, let's single out posts which are either questions or answers and look at the percentage of these posts that are answers: *(answers / (answers + questions))*.

This can be done with either rdds or dataframes. Since we used rdd in the previous task, let's try dataframes.

In [7]:
from pyspark.sql import SQLContext
from pyspark.sql.functions import udf
from pyspark.sql.types import DoubleType
from pyspark.sql.functions import col

sqlContext = SQLContext(sc)

def q_count(row):
    #this function counts the questions
    tree = ET.fromstring(row)
    if tree.attrib.get('PostTypeId') == '1' and tree.attrib.get('OwnerUserId'):
        return (int(tree.attrib.get('OwnerUserId')), 1)
    else:
        return -1000
    
def a_count(row):
    #this function counts the answers
    tree = ET.fromstring(row)
    if tree.attrib.get('PostTypeId') == '2' and tree.attrib.get('OwnerUserId'):
        return (int(tree.attrib.get('OwnerUserId')), 1)
    else:
        return -1000 

def user_rep(row):
    tree = ET.fromstring(row)
    if tree.attrib.get('Reputation') and tree.attrib.get('Id'):
        return (int(tree.attrib.get('Id')), int(tree.attrib.get('Reputation')))
    else:
        return -1000

#We have 3 tables, first one is the count of questions, second one 
#is the count of answers and the last one has reputations per id
reps_per_user = corpus_user.map(lambda x: user_rep(x)).filter(lambda x: x != -1000)\
                            .toDF(['rep_id','reputation'])#.sort(col("reputation").desc()).limit(100)

q_count_id = corpus.map(lambda x: q_count(x)).filter(lambda x: x != -1000).reduceByKey(lambda x, y: x + y)\
                    .reduceByKey(lambda x, y: x + y).toDF(['q_id','question_count'])

a_count_id = corpus.map(lambda x: a_count(x)).filter(lambda x: x != -1000)\
                    .reduceByKey(lambda x, y: x + y).toDF(['a_id','answer_count'])


In [8]:
#Let's join these tables
first_join = q_count_id.join(a_count_id, q_count_id.q_id == a_count_id.a_id,'full')
first_join = first_join.drop('a_id')
full_join = first_join.join(reps_per_user, first_join.q_id == reps_per_user.rep_id,'full')
full_join = full_join.drop('q_id')
full_join = full_join.na.fill(0)

In [9]:
#Let's calculate the fraction of (answers / (answers + questions))
full_join = full_join.sort(col("reputation").desc())
percentage_df = full_join.withColumn('frac', full_join.answer_count / (full_join.answer_count + full_join.question_count))
percentage_df.show(100)

+--------------+------------+------+----------+--------------------+
|question_count|answer_count|rep_id|reputation|                frac|
+--------------+------------+------+----------+--------------------+
|             4|        1206|   919|    100976|   0.996694214876033|
|             9|        2227|   805|     92624|  0.9959749552772809|
|            31|        1543|   686|     47334|  0.9803049555273189|
|             7|         856|  7290|     46907|  0.9918887601390498|
|             8|         430|   930|     32283|  0.9817351598173516|
|             0|           0|  4505|     27599|                null|
|             5|         549|  4253|     25406|  0.9909747292418772|
|            75|         418|   183|     23610|   0.847870182555781|
|            12|         953| 11032|     23102|  0.9875647668393782|
|            18|         552| 28746|     22706|   0.968421052631579|
|             8|         382|   887|     20315|  0.9794871794871794|
|             8|         287|   15

## First question

We'd expect the first **question** a user asks to be indicative of their future behavior.  We'll dig more into that further in the notebook, but for now let's see the relationship between reputation and how long it took each person to ask their first question.

For each user that asked a question, let's find the difference between when their account was created (`CreationDate` for the User) and when they asked their first question (`CreationDate` for their first question).  We will return this time difference in days (round down, so 2.7 days counts as 2 days) for the 100 users with the highest reputation.

In [10]:
from datetime import datetime

def user_dates(row):
    tree = ET.fromstring(row)
    if tree.attrib.get('OwnerUserId') and tree.attrib.get('CreationDate') and tree.attrib.get('PostTypeId') == '2':
        return (tree.attrib.get('OwnerUserId'), 
                datetime.strptime(tree.attrib.get('CreationDate')[:-4], '%Y-%m-%dT%H:%M:%S'))
    else:
        return 0

min_date_per_user = corpus.map(lambda x: user_dates(x)).filter(lambda x: x != 0)\
                        .reduceByKey(lambda x, y: min(x,y)).toDF(['id','first_q_date'])

In [11]:
first_q_date = min_date_per_user.join(percentage_df, min_date_per_user.id == percentage_df.rep_id,'left')
columns_to_drop = ['id', 'frac']
first_q_date = first_q_date.drop(*columns_to_drop)

In [12]:
def user_creation(row):
    tree = ET.fromstring(row)
    if tree.attrib.get('Id') and tree.attrib.get('CreationDate'):
        return (tree.attrib.get('Id'), 
                datetime.strptime(tree.attrib.get('CreationDate')[:-4], '%Y-%m-%dT%H:%M:%S'))
    else:
        return 0
    
creation_date_per_user = corpus_user.map(lambda x: user_creation(x)).filter(lambda x: x != 0)\
                                    .toDF(['id','creation_date'])

In [13]:
#We have two tables together with the initial table we constructed in the previous section.
#First one is the ids and the creation dates table and the second one is the first question date table.
soln_table = first_q_date.join(creation_date_per_user, first_q_date.rep_id == creation_date_per_user.id, 'left')
soln_table = soln_table.drop('id')

In [14]:
from pyspark.sql.functions import unix_timestamp
#Let's add a column that is the time between the creation time and the first question time
soln_table = soln_table.withColumn('first_q_dif', (unix_timestamp('first_q_date') - unix_timestamp('creation_date'))/86400)
soln_table = soln_table.sort(col("reputation").desc())
soln_table.show()

+-------------------+--------------+------------+------+----------+-------------------+--------------------+
|       first_q_date|question_count|answer_count|rep_id|reputation|      creation_date|         first_q_dif|
+-------------------+--------------+------------+------+----------+-------------------+--------------------+
|2010-08-13 15:48:29|             4|        1206|   919|    100976|2010-08-13 15:29:47|0.012986111111111111|
|2010-08-09 00:37:45|             9|        2227|   805|     92624|2010-08-07 08:40:07|  1.6650231481481481|
|2010-08-03 19:51:33|            31|        1543|   686|     47334|2010-08-03 19:42:40|0.006168981481481482|
|2011-11-10 04:08:06|             7|         856|  7290|     46907|2011-11-09 04:43:15|  0.9755902777777777|
|2010-08-19 08:50:44|             8|         430|   930|     32283|2010-08-13 20:50:47|   5.499965277777778|
|2011-05-08 02:49:16|             0|           0|  4505|     27599|2011-05-07 13:44:25|  0.5450347222222223|
|2011-04-20 12:59:0

## Identify veterans


It can be interesting to think about what factors influence a user to remain active on the site over a long period of time. In order not to bias the results towards older users, we'll define a time window between 100 and 150 days after account creation. If the user has made a post in this time, we'll consider them active and well on their way to being veterans of the site; if not, they are inactive and were likely brief users.

For each group separately, we can average the score, views, number of answers, and number of favorites of the users' **first question**. Let's see if there are differences between the first ever question posts of "veterans" vs. "brief users". 

In [15]:
def user_creation(row):
    tree = ET.fromstring(row)
    if tree.attrib.get('Id') and tree.attrib.get('CreationDate'):
        return (tree.attrib.get('Id'), 
                datetime.strptime(tree.attrib.get('CreationDate')[:-4], '%Y-%m-%dT%H:%M:%S'))
    else:
        return 0

creation_date_dict = {key: value for key, value in corpus_user.map(lambda x : user_creation(x)).collect()}

In [16]:
def vet_brief(row):
    #this function identifies whether the user is a veteran or a brief user
    tree = ET.fromstring(row)
    if tree.attrib.get('OwnerUserId') in creation_date_dict.keys():
        if (datetime.strptime(tree.attrib.get('CreationDate')[:-4], '%Y-%m-%dT%H:%M:%S') - \
        creation_date_dict[tree.attrib.get('OwnerUserId')]).total_seconds() / 86400 >= 100  \
        and (datetime.strptime(tree.attrib.get('CreationDate')[:-4], '%Y-%m-%dT%H:%M:%S') - \
        creation_date_dict[tree.attrib.get('OwnerUserId')]).total_seconds() / 86400 <= 150 :
            
            return (tree.attrib.get('OwnerUserId'), 1)
        
        else:
            
            return (tree.attrib.get('OwnerUserId'), 0)
    else:
        return 0

vet_brief_dict = {key: value for key, value in corpus.map(lambda x: vet_brief(x)).filter(lambda x: x != 0)
                  .reduceByKey(lambda x, y: max(x,y)).collect()}

In [17]:
def all_variables(row):
    tree = ET.fromstring(row)
    if tree.attrib.get('OwnerUserId') in vet_brief_dict.keys() and tree.attrib.get('PostTypeId') == '1':
        return (tree.attrib.get('OwnerUserId'), 
                (datetime.strptime(tree.attrib.get('CreationDate')[:-4], '%Y-%m-%dT%H:%M:%S'),
                int(tree.attrib.get('Score') or 0), int(tree.attrib.get('ViewCount') or 0), 
                int(tree.attrib.get('FavoriteCount') or 0),int(tree.attrib.get('AnswerCount') or 0),1))

    else:
        return 0

def variable_count(tple):
    if vet_brief_dict[tple[0]] == 1:
        return ('veteran',tple[1][1:])
    else:
        return ('brief',tple[1][1:])

 
all_variables_dict = corpus.map(lambda x: all_variables(x)).filter(lambda x: x != 0)\
                        .reduceByKey(lambda x,y: x if(x[0] < y[0]) else y).map(lambda x: variable_count(x))\
                        .reduceByKey(lambda x,y: ((x[0]+y[0]),x[1]+y[1],x[2]+y[2],x[3]+y[3],x[4]+y[4])).collect()

In [18]:
identify_veterans = {}
for el in all_variables_dict:
    if el[0] == 'veteran':
        identify_veterans['vet_score'] = float(el[1][0])/el[1][4]
        identify_veterans['vet_views'] = float(el[1][1])/el[1][4]
        identify_veterans['vet_answers'] = float(el[1][3])/el[1][4]
        identify_veterans['vet_favorites'] = float(el[1][2])/el[1][4]
    else:
        identify_veterans['brief_score'] = float(el[1][0])/el[1][4]
        identify_veterans['brief_views'] = float(el[1][1])/el[1][4]
        identify_veterans['brief_answers'] = float(el[1][3])/el[1][4]
        identify_veterans['brief_favorites'] = float(el[1][2])/el[1][4]

identify_veterans

{'vet_score': 3.5434543454345433,
 'vet_views': 926.3982398239824,
 'vet_answers': 1.2981298129812981,
 'vet_favorites': 1.300880088008801,
 'brief_score': 2.1008600836584104,
 'brief_views': 553.5200921182497,
 'brief_answers': 0.9707195563284298,
 'brief_favorites': 0.5758800582788927}

## Word2vec


Word2Vec is an alternative approach for vectorizing text data. The vectorized representations of words in the vocabulary tend to be useful for predicting other words in the document, hence the famous example "vector('king') - vector('man') + vector('woman') ~= vector('queen')".

Let's see how good a Word2Vec model we can train using the tags of each Stack Exchange post as documents (this uses the full data set). We can use the implementation of Word2Vec from Spark ML (this will require using DataFrames) to return a list of the top 25 closest synonyms to "ggplot2" and their similarity score in tuple format ("string", number).


In [19]:
def get_tags(row):    
    tree = ET.fromstring(row)
    if tree.attrib.get('Tags'):
        tag = tree.attrib.get('Tags')
        return tag

corpus_tags = corpus.map(lambda x : get_tags(x)).filter(lambda x: x != None)\
                    .map(lambda x: (x.replace('>', ' ').replace('<', ' ').split(), 1)).toDF(['text', 'score'])

In [20]:
from pyspark.ml.feature import Word2Vec

w2v = Word2Vec(inputCol="text", outputCol="vectors", vectorSize=100, minCount=10, seed=42)
model = w2v.fit(corpus_tags)
result = model.transform(corpus_tags)
vectors = model.getVectors().rdd.map(lambda x: (x.word, x.vector))

In [21]:
word2vec_rdd = model.findSynonyms('ggplot2', 25).rdd.take(25)

word2vec = []
for i,el in enumerate(word2vec_rdd):
    word2vec.append((el.word,el.similarity))

word2vec

[('dataframe', 0.838861882686615),
 ('data-visualization', 0.8181682825088501),
 ('latent-class', 0.8178013563156128),
 ('nls', 0.7952163815498352),
 ('traminer', 0.7847820520401001),
 ('package', 0.772851288318634),
 ('gam', 0.7597593069076538),
 ('plm', 0.7597370743751526),
 ('mlogit', 0.756221354007721),
 ('survival', 0.7530490159988403),
 ('glmnet', 0.7521149516105652),
 ('lm', 0.7484762668609619),
 ('marginal-effect', 0.7238023281097412),
 ('quantile-regression', 0.7158777117729187),
 ('dlm', 0.7119066119194031),
 ('loess', 0.7080047130584717),
 ('constrained-regression', 0.6954183578491211),
 ('vecm', 0.6798256039619446),
 ('scatterplot', 0.6721137762069702),
 ('propensity-scores', 0.636638879776001),
 ('stepwise-regression', 0.6130045056343079),
 ('growth-mixture-model', 0.6100594401359558),
 ('interpolation', 0.6031655073165894),
 ('correlated-predictors', 0.5945377945899963),
 ('latex', 0.590451717376709)]

## Classification


We'd like to see if we can predict the tags of a question from its body text. Instead of predicting specific tags, we will instead try to predict if a question contains one of the top ten most common tags. To do this, the dataset is separated out as a train and a test set.

This will involve two steps: first, find the ten most common tags for questions in the training data set (the tags have been removed from the test set). Then train a model to predict from the text of the question (the `Body` attribute) if it has one of those ten tags in it - we need to process the question text with NLP techniques such as splitting the text into tokens.

Since we can't reliably pickle Spark models, instead we will return a list of the predictions, sorted by the question's `Id`.

In [22]:
sc.stop()
def localpath(path):
    return 'file://' + os.path.join(os.path.abspath(os.path.curdir), path)


sc = SparkContext("local[*]", "temp")

classification_train = sc.textFile(localpath('stats-data/train/'))
classification_test = sc.textFile(localpath('stats-data/test/'))

In [23]:
def clean_rows(row):
    if '<row' in row:
        try:
            tree = ET.fromstring(row)
            return True
        except:
            return False
    
rows = classification_train.filter(lambda x : clean_rows(x))
rows_test = classification_test.filter(lambda x : clean_rows(x))

In [24]:
def get_tags(row):    
    tree = ET.fromstring(row)
    if tree.attrib.get('Tags'):
        tag = tree.attrib.get('Tags')
        return tag
    
tags = rows.map(lambda x : get_tags(x)).filter(lambda x: x != None)\
            .flatMap(lambda x: x.replace('>', ' ').replace('<', ' ').split()).map(lambda x: (x,1))\
            .reduceByKey(lambda x, y: x + y).collect()


In [25]:
tags_sorted = sorted(tags, key=lambda x: x[1], reverse=True)[:10]
top_ten_tags = [x[0] for x in tags_sorted]
top_ten_tags

['r',
 'regression',
 'time-series',
 'machine-learning',
 'probability',
 'hypothesis-testing',
 'distributions',
 'self-study',
 'logistic',
 'correlation']

In [26]:
def get_id_body_label(row):
    tree = ET.fromstring(row)
    if tree.attrib.get('Body') and tree.attrib.get('Tags'):
        row_tags = tree.attrib.get('Tags').replace('>', ' ').replace('<', ' ').split()
        for el in row_tags:
            if el in top_ten_tags:
                return (int(tree.attrib.get('Id')), tree.attrib.get('Body'), 1)

        return (int(tree.attrib.get('Id')), tree.attrib.get('Body'), 0)

id_body_label = rows.map(lambda x : get_id_body_label(x)).filter(lambda x: x != None).collect()
id_body_label_test = rows_test.map(lambda x : get_id_body_label(x)).filter(lambda x: x != None).collect()

body = [x[1] for x in id_body_label]
labels = [x[2] for x in id_body_label]
body_test = [x[1] for x in id_body_label_test]
labels_test = [x[2] for x in id_body_label_test]

In [27]:
from sklearn.pipeline import Pipeline
from sklearn.linear_model import RidgeClassifier
from sklearn.feature_extraction.text import TfidfVectorizer

normalized_est = Pipeline([
    ('tfidfvec', TfidfVectorizer()),
    ('regressor',RidgeClassifier())
])
normalized_est.fit(body, labels)
normalized_est.predict(body_test)

array([0, 1, 0, ..., 0, 0, 0])