In [1]:
import seaborn as sns
sns.set()

# Spark Project


Stack Overflow is a collaboratively edited question-and-answer site originally focused on programming topics. Because of the variety of features tracked, including a variety of feedback metrics, it allows for some open-ended analysis of user behavior on the site.

Stack Exchange (the parent organization) provides an anonymized [data dump](https://archive.org/details/stackexchange), and we'll use Spark to perform data manipulation, analysis, and machine learning on this data set. 



## Workflow


## Accessing the data


The data is available on S3 (`s3://dataincubator-course/spark-stack-data`). There are three sub-folders, `allUsers`, `allPosts`, and `allVotes`, which contain Gzipped XML with the following format:

``` html
<row Body="&lt;p&gt;I always validate my web pages, and I recommend you do the same BUT many large company websites DO NOT and cannot validate because the importance of the website looking exactly the same on all systems requires rules to be broken. &lt;/p&gt;&#10;&#10;&lt;p&gt;In general, valid websites help your page look good even on odd configurations (like cell phones) so you should always at least try to make it validate.&lt;/p&gt;&#10;" CommentCount="0" CreationDate="2008-10-12T20:26:29.397" Id="195995" LastActivityDate="2008-10-12T20:26:29.397" OwnerDisplayName="Eric Wendelin" OwnerUserId="25066" ParentId="195973" PostTypeId="2" Score="0" />
```

Data from the much smaller `stats.stackexchange.com` is available in the same format on S3 (`s3://dataincubator-course/spark-stats-data`). This site, Cross-Validated, will be used below in some instances to avoid working with the full data set for every question.

The full schema is available as a text file:

In [3]:
!aws s3 cp s3://dataincubator-course/spark-stats-data/stack_exchange_schema.txt .

download: s3://dataincubator-course/spark-stats-data/stack_exchange_schema.txt to ./stack_exchange_schema.txt


I can either get the data by running the appropriate S3 commands in the terminal, or by running this block for the smaller stats data set:

In [4]:
!mkdir -p spark-stats-data
!aws s3 sync --exclude '*' --include 'all*' s3://dataincubator-course/spark-stats-data/ ./spark-stats-data
!aws s3 sync --exclude '*' --include 'posts*zip' s3://dataincubator-course/spark-stats-data/ ./spark-stats-data

And to get the much larger full data set (be warned, this can take 20 or more minutes, so you may want to run it in the terminal to avoid locking up the notebook):

In [5]:
!mkdir -p spark-stack-data
!aws s3 sync --exclude '*' --include 'all*' s3://dataincubator-course/spark-stack-data/ ./spark-stack-data

## Data input and parsing


## Bad XML


First, I want to create an RDD of Post objects where each Post is a valid row of XML from the Cross-Validated (stats.stackexchange.com) `allPosts` data set.

I am are going to take several shortcuts to speed up and simplify our computations.  First, my parsing function should only attempt to parse rows that start with `<row` as these denote actual data entries. This should be done in Spark as the data is being read in from disk, without any pre-Spark processing. 

Return the total number of XML rows that started with `<row` that were subsequently **rejected** during your processing.  Note that the text is unicode, and contains non-ASCII characters.  We may need to re-encode to UTF-8 (depending on your XML parser)

In [6]:
!ls spark-stats-data/allUsers/

part-00000.xml.gz  part-00001.xml.gz


In [7]:
from lxml import etree
from pyspark import SparkContext
from pyspark import SQLContext

sc = SparkContext("local[*]", "temp")
SQLContext = SQLContext(sc)

In [8]:
tree = etree.parse('spark-stack-data/allPosts/part-00000.xml.gz')
root = tree.getroot()
firstrow = root.getchildren()[0]
firstrow.get('Score')

'322'

In [9]:
post_lines = sc.textFile('spark-stats-data/allPosts/')

In [10]:
User_lines = sc.textFile('spark-stats-data/allUsers/')

In [11]:
User_lines.take(10)

['<?xml version="1.0" encoding="UTF-8"?>',
 '<parent>',
 '  <row AccountId="5872878" CreationDate="2015-03-02T18:42:20.510" DisplayName="Lars Reeker" DownVotes="0" Id="70185" LastAccessDate="2015-03-02T18:42:20.510" ProfileImageUrl="https://lh3.googleusercontent.com/-Y7GNsydm-mc/AAAAAAAAAAI/AAAAAAAADq8/15o5t99O5IU/photo.jpg" Reputation="1" UpVotes="0" Views="0" />',
 '  ',
 '  <row AccountId="5872995" CreationDate="2015-03-02T19:04:13.380" DisplayName="Vra" DownVotes="0" Id="70186" LastAccessDate="2015-03-06T15:45:57.590" Reputation="6" UpVotes="0" Views="1" />',
 '  ',
 '  <row AboutMe="" AccountId="5873177" CreationDate="2015-03-02T19:40:16.420" DisplayName="Aroona" DownVotes="0" Id="70187" LastAccessDate="2015-03-02T19:40:16.420" ProfileImageUrl="https://www.gravatar.com/avatar/e0e90702da3203e069f0a7d957ee7ea6?s=128&amp;d=identicon&amp;r=PG&amp;f=1" Reputation="1" UpVotes="0" Views="0" WebsiteUrl="" />',
 '  ',
 '  <row AccountId="5873184" CreationDate="2015-03-02T19:46:45.400" Disp

In [11]:
def isRow(line):
    return line.strip()[:4] == '<row'

In [12]:
def isBadXML(s):
    try:
        etree.fromstring(s.strip())
        return False
    except Exception:
        return True

In [14]:
post_lines.filter(isRow).filter(isBadXML).count()

781

In [15]:
post_lines.take(10)

['<?xml version="1.0" encoding="UTF-8"?>',
 '<parent>',
 '  <row AnswerCount="0" Body="&lt;p&gt;I\'m developing an application in which users can create \'sections\' (à la subreddit in reddit), in which items/posts can be created and voted with a thumbs-up/down system.&lt;/p&gt;&#10;&#10;&lt;p&gt;&lt;a href=&quot;http://www.evanmiller.org/how-not-to-sort-by-average-rating.html&quot; rel=&quot;nofollow&quot;&gt;A great article&lt;/a&gt; guided me on how to sort these votes so that an item with a 100% positive response but with few votes won\'t get ranked over one with hundreds of votes and an acceptance of 80%. The article describes it pretty well.&lt;/p&gt;&#10;&#10;&lt;p&gt;However, I\'d like to discard the lowest-ranked items and this is where it gets tricky:&lt;/p&gt;&#10;&#10;&lt;ul&gt;&#10;&lt;li&gt;How could I know the minimum number of votes in order to discard it?&lt;/li&gt;&#10;&lt;li&gt;What is the score\'s threshold required to discard the item?&lt;/li&gt;&#10;&lt;/ul&gt;&#1

## Favorites and scores

I am interested in looking for useful patterns in the data.  If I look at the Post data again (the smaller set, `stats.stackexchange.com`), I see that many things about each post are recorded.  I'm going to start by looking to see if there is a relationship between the number of times a post was favorited (the `FavoriteCount`) and the `Score`.  The score is the number of times the post was upvoted minus the number of times it was downvoted, so it is a measure of how much a post was liked.  I'd expect posts with a higher number of favorites to have better scores, since they're both measurements of how good the post is.

Let's aggregate posts by the number of favorites, and find the average score for each number of favorites. Let's do this for the lowest 50 numbers of favorites.

**Checkpoints**

- Total score across all posts: 299469
- Mean of first 50 favorite counts (averaging the keys themselves): 24.76

In [13]:
def line_parser(s):
    try:
        tree = etree.fromstring(s.strip())
        return dict(tree.attrib)
    except Exception:
        return None

In [14]:
from collections import namedtuple

In [18]:
post_df = post_lines.filter(isRow) \
                .filter(lambda line: not isBadXML(line)) \
                .map(line_parser) \
                .toDF(sampleRatio = 0.0001)

In [19]:
def int_null_to_zero(s):
    if s is None:
        return 0
    else:
        return int(s)

In [20]:
from pyspark.sql import functions as F
from pyspark.sql.functions import udf
from pyspark.sql.types import DoubleType, StringType, IntegerType, ArrayType

In [21]:
post_df = post_df.withColumn('FavoriteCount', 
                              F.when(F.col('FavoriteCount').isNull(), 0)\
                               .otherwise(F.col('FavoriteCount').cast(IntegerType())))\
                 .withColumn('Score',
                            F.when(F.col('Score').isNull(), 0)\
                             .otherwise(F.col('Score').cast(IntegerType()))).cache()

In [22]:
post_df = post_df.withColumn('Id', 
                              F.when(F.col('Id').isNull(), 0)\
                               .otherwise(F.col('Id').cast(IntegerType())))

In [23]:
post_df

DataFrame[Body: string, CommentCount: string, CreationDate: string, Id: int, LastActivityDate: string, OwnerUserId: string, ParentId: string, PostTypeId: string, Score: int, AnswerCount: string, Tags: string, Title: string, ViewCount: string, AcceptedAnswerId: string, FavoriteCount: int, LastEditDate: string, LastEditorUserId: string]

In [24]:
ans = post_df.groupby('FavoriteCount')\
             .agg(F.avg('Score').alias('AvgScore'))\
             .sort('FavoriteCount')

In [32]:
xzz = ans.toPandas()

In [45]:
favorite_score = [(int(item[0]), item[1]) for item in ans.toPandas().to_numpy()]

In [46]:
favorite_score[:5]

[(0, 2.3398827696988396),
 (1, 2.7334613999279624),
 (2, 4.481914893617021),
 (3, 6.350249584026622),
 (4, 7.656934306569343)]

## Answer percentage


I investigate the correlation between a user's reputation and the kind of posts they make. For the 99 users with the highest reputation, single out posts which are either questions or answers and look at the percentage of these posts that are answers: *(answers / (answers + questions))*. 

#### Checkpoints

* Total questions: 52,060
* Total answers: 55,304
* Top 99 users' average reputation: 11893.464646464647

In [None]:
Id: string

In [25]:
User_df = User_lines.filter(isRow) \
                .filter(lambda line: not isBadXML(line)) \
                .map(line_parser) \
                .toDF(sampleRatio = 0.0001)

In [26]:
User_df = User_df.withColumn('Reputation', 
                              F.when(F.col('Reputation').isNull(), 0)\
                               .otherwise(F.col('Reputation').cast(IntegerType())))

In [27]:
User_df = User_df.withColumn('Id', 
                              F.when(F.col('Id').isNull(), 0)\
                               .otherwise(F.col('Id').cast(IntegerType())))

In [28]:
User_df

DataFrame[AccountId: string, CreationDate: string, DisplayName: string, DownVotes: string, Id: int, LastAccessDate: string, Reputation: int, UpVotes: string, Views: string]

In [33]:
ans_user = User_df.sort('Reputation')

In [34]:
xcc = ans_user.toPandas()

In [35]:
xc = ans_user.toPandas().to_numpy()

In [36]:
listov = xc[-100:]

In [37]:
len(xc)

50320

In [38]:
top_reputations = [(int(item[4]), item[6]) for item in ans_user.toPandas().to_numpy()]

In [51]:
top_reputations[-100:]

[(8373, 3720),
 (11852, 3732),
 (1679, 3747),
 (1108, 3805),
 (13138, 3821),
 (17908, 3957),
 (3601, 3958),
 (4862, 3971),
 (22228, 4065),
 (1005, 4080),
 (35989, 4092),
 (2074, 4127),
 (52554, 4147),
 (11887, 4149),
 (223, 4192),
 (2860, 4204),
 (1307, 4238),
 (8413, 4438),
 (5862, 4656),
 (8076, 4795),
 (307, 4934),
 (14188, 5042),
 (264, 5085),
 (8507, 5315),
 (13047, 5398),
 (334, 5444),
 (22311, 5500),
 (25, 5661),
 (364, 5739),
 (253, 5762),
 (25433, 5775),
 (795, 5849),
 (1934, 5967),
 (11981, 5970),
 (6029, 6040),
 (44269, 6127),
 (2126, 6145),
 (36041, 6149),
 (8402, 6208),
 (2669, 6352),
 (279, 6367),
 (442, 6588),
 (196, 6682),
 (4257, 6694),
 (21054, 6716),
 (1909, 6814),
 (7250, 6888),
 (8, 6948),
 (5, 6962),
 (401, 7116),
 (1352, 7552),
 (26338, 7608),
 (10849, 7725),
 (32036, 7729),
 (23853, 7765),
 (1764, 7971),
 (28666, 8013),
 (251, 8221),
 (4376, 8629),
 (3019, 8794),
 (8013, 9047),
 (3382, 9294),
 (1036, 9530),
 (1739, 9619),
 (7071, 10045),
 (4598, 10383),
 (7224, 

In [49]:
User_df.columns

['AboutMe',
 'AccountId',
 'Age',
 'CreationDate',
 'DisplayName',
 'DownVotes',
 'Id',
 'LastAccessDate',
 'Location',
 'Reputation',
 'UpVotes',
 'Views',
 'WebsiteUrl',
 'ProfileImageUrl']

In [42]:
post_df.columns

['Body',
 'CommentCount',
 'CreationDate',
 'Id',
 'LastActivityDate',
 'OwnerUserId',
 'ParentId',
 'PostTypeId',
 'Score',
 'AcceptedAnswerId',
 'AnswerCount',
 'FavoriteCount',
 'LastEditDate',
 'LastEditorUserId',
 'Tags',
 'Title',
 'ViewCount',
 'CommunityOwnedDate']

In [56]:
import pandas as pd

In [60]:
pandas_post = post_df.toPandas()

In [125]:
pandas_post['Id'].astype(int)

0         10893
1         10894
2         10897
3         10899
4         10900
          ...  
108736    73928
108737    73929
108738    73930
108739    73931
108740    73932
Name: Id, Length: 108741, dtype: int64

In [120]:
len(pandas_post)

108741

In [166]:
pandas_post[pandas_post['Id'] == 7071]

Unnamed: 0,Body,CommentCount,CreationDate,Id,LastActivityDate,OwnerUserId,ParentId,PostTypeId,Score,AcceptedAnswerId,AnswerCount,FavoriteCount,LastEditDate,LastEditorUserId,Tags,Title,ViewCount,CommunityOwnedDate
26072,<p>A white noise process is one with a mean ze...,5,2011-02-10T22:23:25.730,7071,2011-02-10T22:23:25.730,449,7070,2,10,,,0,,,,,,


In [61]:
pandas_user = User_df.toPandas()

In [163]:
pandas_post['Id'].describe()

count    108741.000000
mean      68685.006796
std       41416.350973
min           1.000000
25%       31215.000000
50%       67914.000000
75%      104838.000000
max      140829.000000
Name: Id, dtype: float64

In [160]:
pandas_user['Id'].describe()

count    50320.000000
mean     35805.445767
std      20938.418199
min         -1.000000
25%      17110.750000
50%      36342.500000
75%      54826.250000
max      70649.000000
Name: Id, dtype: float64

In [158]:
pandas_user.columns

Index(['AccountId', 'CreationDate', 'DisplayName', 'DownVotes', 'Id',
       'LastAccessDate', 'Reputation', 'UpVotes', 'Views', 'ProfileImageUrl'],
      dtype='object')

In [157]:
pandas_user.head()

Unnamed: 0,AccountId,CreationDate,DisplayName,DownVotes,Id,LastAccessDate,Reputation,UpVotes,Views,ProfileImageUrl
0,5872878,2015-03-02T18:42:20.510,Lars Reeker,0,70185,2015-03-02T18:42:20.510,1,0,0,https://lh3.googleusercontent.com/-Y7GNsydm-mc...
1,5872995,2015-03-02T19:04:13.380,Vra,0,70186,2015-03-06T15:45:57.590,6,0,1,
2,5873177,2015-03-02T19:40:16.420,Aroona,0,70187,2015-03-02T19:40:16.420,1,0,0,https://www.gravatar.com/avatar/e0e90702da3203...
3,5873184,2015-03-02T19:46:45.400,Yazeed,0,70188,2015-03-02T19:46:45.400,1,0,0,https://www.gravatar.com/avatar/f5e666cb769dfb...
4,228681,2015-03-02T19:56:37.233,Taimur,0,70189,2015-03-03T09:26:04.020,101,0,0,http://i.stack.imgur.com/PhYFp.jpg?s=128&g=1


In [128]:
dfx = pandas_post.merge(pandas_user, how='left', on = 'Id')

In [129]:
len(dfx)

108741

In [88]:
colom = ['Body','PostTypeId']
df[colom]

Unnamed: 0,Body,PostTypeId
0,<p>I am trained in frequentist statistics (eco...,2
1,<p>I have been a member of CrossValidated for ...,6
2,"<p>If I understand the question, couldn't you ...",2
3,<p>I have $Y$ measurements per several Subject...,1
4,<p>$y_i=\beta_0+\beta_1x_i+\varepsilon $ is a ...,1
...,...,...
39866,<p>I'm having trouble to fit and simulate a <c...,1
39867,<p>This may well be a textbook question - I am...,1
39868,<p>I have a time series count data by customer...,1
39869,<p>I am attempting to monitor the performance ...,1


In [131]:
dff = dfx.sort_values(by='Reputation', ascending=False)

In [132]:
type(dfff['PostTypeId'])

pandas.core.series.Series

In [133]:
cols = ['Id', 'Reputation', 'PostTypeId']

In [134]:
dfff = dff[cols] 

In [135]:
dffff = dfff.fillna(0)

In [136]:
dffff

Unnamed: 0,Id,Reputation,PostTypeId
20564,919,100976.0,2
20459,805,92624.0,1
20348,686,47334.0,2
26270,7290,46907.0,2
20573,930,32283.0,2
...,...,...,...
108736,73928,0.0,2
108737,73929,0.0,1
108738,73930,0.0,1
108739,73931,0.0,1


In [137]:
dffff['questions'] = [1 if x == '1' else 0 for x in dffff['PostTypeId']]

In [138]:
dffff

Unnamed: 0,Id,Reputation,PostTypeId,questions
20564,919,100976.0,2,0
20459,805,92624.0,1,1
20348,686,47334.0,2,0
26270,7290,46907.0,2,0
20573,930,32283.0,2,0
...,...,...,...,...
108736,73928,0.0,2,0
108737,73929,0.0,1,1
108738,73930,0.0,1,1
108739,73931,0.0,1,1


In [139]:
dffff['answers'] = [1 if x == '2' else 0 for x in dffff['PostTypeId']]

In [140]:
dffff

Unnamed: 0,Id,Reputation,PostTypeId,questions,answers
20564,919,100976.0,2,0,1
20459,805,92624.0,1,1,0
20348,686,47334.0,2,0,1
26270,7290,46907.0,2,0,1
20573,930,32283.0,2,0,1
...,...,...,...,...,...
108736,73928,0.0,2,0,1
108737,73929,0.0,1,1,0
108738,73930,0.0,1,1,0
108739,73931,0.0,1,1,0


In [153]:
dffff[dffff['Id'] == 7071]

Unnamed: 0,Id,Reputation,PostTypeId,questions,answers
26072,7071,10045.0,2,0,1


In [149]:
x = dffff.groupby(by=['Id', 'Reputation'])['questions', 'answers'].apply(lambda x : x.astype(int).sum())

  x = dffff.groupby(by=['Id', 'Reputation'])['questions', 'answers'].apply(lambda x : x.astype(int).sum())


In [151]:
x.head()

Unnamed: 0_level_0,Unnamed: 1_level_0,questions,answers
Id,Reputation,Unnamed: 2_level_1,Unnamed: 3_level_1
1,0.0,1,0
2,101.0,1,0
3,101.0,1,0
4,101.0,1,0
5,6962.0,0,1


In [152]:
x.describe(include='all')

Unnamed: 0,questions,answers
count,108741.0,108741.0
mean,0.478752,0.508585
std,0.499551,0.499929
min,0.0,0.0
25%,0.0,0.0
50%,0.0,1.0
75%,1.0,1.0
max,1.0,1.0


In [29]:
user_lines = sc.textFile('spark-stats-data/allUsers/')

In [30]:
user_df = user_lines.filter(isRow) \
                    .filter(lambda line : not isBadXML(line)) \
                    .map(line_parser)\
                    .toDF(sampleRatio=0.001)

In [31]:
user_df

DataFrame[AccountId: string, CreationDate: string, DisplayName: string, DownVotes: string, Id: string, LastAccessDate: string, ProfileImageUrl: string, Reputation: string, UpVotes: string, Views: string, AboutMe: string, Age: string, Location: string, WebsiteUrl: string]

In [32]:
cnd_count = lambda cond : F.sum(F.when(cond, 1).otherwise(0))

joined = user_df.withColumn("Reputation", user_df.Reputation.cast('int'))\
                .join(post_df,
                      user_df.Id == post_df.OwnerUserId)\
                .groupBy(post_df.OwnerUserId)\
                .agg((cnd_count(post_df["PostTypeId"] == '2')/
                     (cnd_count(post_df["PostTypeId"] == '2')+
                       cnd_count(post_df["PostTypeId"] == '1'))).alias("ans_ratio"),
                     F.min("Reputation").alias("Reputation"))\
                .sort('Reputation', ascending=False)\
                .withColumn('OwnerUserId', F.col('OwnerUserId').cast('int'))\
                .limit(99).toPandas()

In [33]:
answer_percentage = list(zip(list(joined['OwnerUserId'])[:99],list(joined['ans_ratio'])[:99]))

In [34]:
answer_percentage.append((-1, joined['ans_ratio'].mean()))

## First question

I'd expect the first **question** a user asks to be indicative of their future behavior.  I'll dig more into that in the next problem, but for now let's see the relationship between reputation and how long it took each person to ask their first question.

For each user that asked a question, let's find the difference between when their account was created (`CreationDate` for the User) and when they asked their first question (`CreationDate` for their first question). Let's return this time difference in days (round down, so 2.7 days counts as 2 days) for the 100 users with the highest reputation, in the form

`(UserId, Days)`

**Checkpoints**
- Users that asked a question: 23134
- Average number of days (round each user's days, then average): 30.1074258

In [176]:
tr = dfx['CreationDate_y'][0].split("T")[0]

In [178]:
from datetime import datetime
datetime.strptime(tr,'%Y-%m-%d').date()

datetime.date(2012, 4, 26)

In [183]:
dfx['CreationsDate'] = [datetime.strptime(x.split('T')[0], '%Y-%m-%d').date() for x in dfx['CreationDate_x']]

In [186]:
dfx

Unnamed: 0,Body,CommentCount,CreationDate_x,Id,LastActivityDate,OwnerUserId,ParentId,PostTypeId,Score,AcceptedAnswerId,...,AccountId,CreationDate_y,DisplayName,DownVotes,LastAccessDate,Reputation,UpVotes,Views,ProfileImageUrl,CreationsDate
0,<p>I'm developing an application in which user...,0,2011-05-16T17:39:58.170,10893,2011-06-02T22:10:16.550,4638,,1,2,,...,87574,2012-04-26T09:47:40.967,Ben J,0,2014-04-14T09:20:59.630,106.0,0,7,,2011-05-16
1,<p>Perhaps you could extend the idea of using ...,5,2011-05-16T18:27:42.933,10894,2011-05-16T18:27:42.933,279,10882,2,2,,...,416858,2012-04-26T10:38:14.877,Peter Lewis,0,2012-07-12T18:33:32.253,1.0,0,0,,2011-05-16
2,<p>I would like to compute the probability dis...,7,2011-05-17T16:06:19.143,10897,2011-05-18T16:41:17.197,4642,,1,6,,...,44643,2012-04-26T14:02:18.303,Goz,0,2013-06-14T10:32:37.827,108.0,1,1,,2011-05-17
3,<p>Let $\{X_i\}$ be the locations of the cuts....,2,2011-05-17T16:17:19.443,10899,2011-05-17T16:17:19.443,4637,10897,2,0,,...,1438027,2012-04-26T14:47:29.747,user10899,0,2012-07-20T02:47:58.513,11.0,0,3,,2011-05-17
4,<p>I have to generate random numbers for my al...,1,2011-05-17T17:13:30.487,10900,2012-07-13T07:12:05.503,4319,,1,3,,...,1438525,2012-04-26T17:41:34.567,user1359363,0,2014-04-23T18:47:05.450,11.0,0,3,,2011-05-17
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
108736,<p>This is an expected result. The matrix $\Pi...,0,2013-10-28T08:29:21.737,73928,2013-10-28T08:29:21.737,2116,46688,2,0,,...,,,,,,,,,,2013-10-28
108737,<p>I have a set of independent data and depend...,5,2013-10-28T08:36:36.887,73929,2013-10-28T08:40:26.223,31986,,1,2,,...,,,,,,,,,,2013-10-28
108738,<p>The stochastic process $(X_t)_{t\in T}$ is ...,0,2013-10-28T09:44:43.930,73930,2013-10-28T10:06:38.203,30496,,1,2,,...,,,,,,,,,,2013-10-28
108739,"<p>I have a very particular question, I have s...",1,2013-10-28T10:00:02.867,73931,2015-03-02T06:33:34.697,28322,,1,2,,...,,,,,,,,,,2013-10-28


In [36]:
user_df = user_df.withColumn('UserCreationDate', F.to_timestamp('CreationDate'))

In [37]:
post_df.agg((cnd_count(post_df.PostTypeId == '1'))).show()

+-------------------------------------------------+
|sum(CASE WHEN (PostTypeId = 1) THEN 1 ELSE 0 END)|
+-------------------------------------------------+
|                                            52060|
+-------------------------------------------------+



In [38]:
post_df_firstq = post_df.withColumn('PostCreationDate', F.to_timestamp('CreationDate'))\
                        .groupBy('OwnerUserId', 'PostTypeId')\
                        .agg((F.min('PostCreationDate')).alias('FirstPostDate')).alias('grouped')\
                        .filter(F.col('grouped.PostTypeId')=='1')


In [39]:
joined2= post_df_firstq.join(user_df,
                             user_df.Id == post_df_firstq.OwnerUserId)

ans2 = joined2.withColumn('DiffinSec', F.col('FirstPostDate').cast('long') - F.col('UserCreationDate').cast('long'))\
               .withColumn('DiffinDay', F.floor(F.col('DiffinSec')/3600/24))\
               .withColumn("Reputation", user_df.Reputation.cast('int'))\
               .sort('Reputation', ascending=False)\
               .withColumn('OwnerUserId', F.col('OwnerUserId').cast('int'))\
               .limit(100).toPandas()

In [40]:
ans2[['FirstPostDate','UserCreationDate','DiffinDay']]

Unnamed: 0,FirstPostDate,UserCreationDate,DiffinDay
0,2010-08-17 13:10:29.173,2010-08-13 15:29:47.140,3
1,2012-06-07 00:14:12.397,2010-08-07 08:40:07.287,669
2,2011-02-10 15:35:57.853,2010-08-03 19:42:40.907,190
3,2011-11-09 05:51:55.263,2011-11-09 04:43:15.613,0
4,2010-08-18 20:36:59.617,2010-08-13 20:50:47.397,4
...,...,...,...
95,2011-02-18 02:40:12.390,2010-07-19 19:09:39.723,213
96,2013-10-15 08:21:54.933,2011-08-14 21:52:19.277,792
97,2010-09-10 12:51:35.787,2010-08-31 10:07:27.890,10
98,2010-07-31 09:39:47.323,2010-07-26 20:58:01.777,4


In [41]:
first_question = list(zip(list(ans2['OwnerUserId'])[:100],list(ans2['DiffinDay'])[:100]))

In [42]:
first_question

[(919, 3),
 (805, 669),
 (686, 190),
 (7290, 0),
 (930, 4),
 (4253, 351),
 (183, 0),
 (11032, 1),
 (28746, 19),
 (887, 22),
 (159, 0),
 (2116, 187),
 (4856, 42),
 (5739, 147),
 (3277, 0),
 (88, 0),
 (601, 19),
 (17230, 173),
 (2392, 42),
 (1390, 117),
 (5836, 0),
 (603, 41),
 (7972, 30),
 (6633, 46),
 (2958, 63),
 (9394, 205),
 (7828, 64),
 (2817, 0),
 (7224, 145),
 (4598, 236),
 (7071, 0),
 (1739, 1517),
 (1036, 5),
 (8013, 6),
 (3019, 0),
 (4376, 58),
 (251, 2),
 (28666, 0),
 (1764, 1),
 (32036, 86),
 (10849, 23),
 (26338, 0),
 (1352, 0),
 (401, 34),
 (5, 0),
 (8, 0),
 (7250, 156),
 (1909, 87),
 (21054, 53),
 (4257, 27),
 (196, 0),
 (442, 28),
 (2669, 20),
 (8402, 0),
 (36041, 121),
 (44269, 0),
 (11981, 0),
 (1934, 0),
 (795, 0),
 (25433, 70),
 (253, 12),
 (364, 7),
 (25, 0),
 (22311, 0),
 (13047, 84),
 (8507, 82),
 (264, 70),
 (14188, 0),
 (8076, 28),
 (8413, 1153),
 (1307, 3),
 (2860, 0),
 (223, 1),
 (11887, 114),
 (52554, 0),
 (35989, 319),
 (1005, 42),
 (22228, 28),
 (4862, 7),


## Identify veterans


It can be interesting to think about what factors influence a user to remain active on the site over a long period of time. In order not to bias the results towards older users, I'll define a time window between 100 and 150 days after account creation. If the user has made a post in this time, I'll consider them active and well on their way to being veterans of the site; if not, they are inactive and were likely brief users.

Let's see if there are differences between the first ever question posts of "veterans" vs. "brief users". For each group separately, average the score, views, number of answers, and number of favorites of the users' **first question**.

#### Checkpoints

* Total brief users: 24,864
* Total veteran users: 2,027

In [42]:
post_df = post_df.withColumn('PostCreationDate', F.to_timestamp('CreationDate'))

In [43]:
joined3 = post_df.join(user_df,
                    user_df.Id == post_df.OwnerUserId)\
                .withColumn('DiffinSec', F.col('PostCreationDate').cast('long') - 
                                        F.col('UserCreationDate').cast('long'))\
                .withColumn('DiffinDay', F.floor(F.col('DiffinSec')/3600/24))

In [44]:
ans3 = joined3.withColumn('Veteran',
            F.when((joined3.DiffinDay < 150) & (joined3.DiffinDay >=100), 1).otherwise(0))
#                 .select(['PostCreationDate', 'UserCreationDate', 'DiffinDay','Veteran']).show(50)

In [45]:
ans4 = ans3.groupby('OwnerUserId')\
            .agg(F.max('Veteran').alias('Veteran'))

In [46]:
ans5 = post_df_firstq.join(post_df.drop('OwnerUserId'),
                           post_df_firstq.FirstPostDate == post_df.PostCreationDate)

In [47]:
ans5.join(ans4,
         ans5.OwnerUserId == ans4.OwnerUserId)\
    .filter(F.col('Veteran')==1)\
    .withColumn('AnswerCount',F.col('AnswerCount').cast('int'))\
    .withColumn('ViewCount',F.col('ViewCount').cast('int'))\
    .agg(F.avg("Score").alias("avg_score"), \
         F.avg("ViewCount").alias("avg_view"), \
         F.avg("AnswerCount").alias("avg_ans"), \
         F.avg("FavoriteCount").alias("avg_fav")).show()

+----------------+-----------------+------------------+------------------+
|       avg_score|         avg_view|           avg_ans|           avg_fav|
+----------------+-----------------+------------------+------------------+
|3.54370533260033|926.3982398239824|1.2981298129812981|1.3001649257833974|
+----------------+-----------------+------------------+------------------+



In [50]:
ans5.join(ans4,
         ans5.OwnerUserId == ans4.OwnerUserId)\
    .filter(F.col('Veteran')==0)\
    .withColumn('AnswerCount',F.col('AnswerCount').cast('int'))\
    .withColumn('ViewCount',F.col('ViewCount').cast('int'))\
    .agg(F.avg("Score").alias("avg_score"), \
         F.avg("ViewCount").alias("avg_view"), \
         F.avg("AnswerCount").alias("avg_ans"), \
         F.avg("FavoriteCount").alias("avg_fav")).show()

+-----------------+-----------------+------------------+------------------+
|        avg_score|         avg_view|           avg_ans|           avg_fav|
+-----------------+-----------------+------------------+------------------+
|2.100892857142857|553.4952533132813|0.9706739355202557|0.5757988721804511|
+-----------------+-----------------+------------------+------------------+



In [51]:
identify_veterans = {
    "vet_score": 3.54370533260033,
    "vet_views": 926.3982398239824,
    "vet_answers": 1.2981298129812981,
    "vet_favorites": 1.3001649257833974,
    "brief_score": 2.100892857142857,
    "brief_views": 553.4952533132813,
    "brief_answers": 0.9706739355202557,
    "brief_favorites": 0.5757988721804511
}

grader.score('spark__identify_veterans', identify_veterans)

Your score: 1.0000


## Identify veterans&mdash;full


Same as above, but on the full Stack Exchange data set.

No pre-parsed data is available for this question.


#### Checkpoints

* Total brief users: 1,848,628
* Total veteran users: 288,285

In [53]:
post_lines_Full = sc.textFile('spark-stack-data/allPosts/')

In [54]:
post_df_Full = post_lines_Full.filter(isRow) \
               .filter(lambda line : not isBadXML(line)) \
               .map(line_parser)\
               .toDF(sampleRatio=0.000001)

In [56]:
post_df_Full = post_df_Full.withColumn('FavoriteCount', 
                                F.when(F.col('FavoriteCount').isNull(), 0)\
                                 .otherwise(F.col('FavoriteCount').cast(IntegerType())))\
                    .withColumn('Score', 
                                F.when(F.col('Score').isNull(), 0)\
                                 .otherwise(F.col('Score').cast(IntegerType()))).cache()

In [57]:
user_lines_Full = sc.textFile('spark-stack-data/allUsers/')

In [58]:
user_df_Full = user_lines_Full.filter(isRow) \
                    .filter(lambda line : not isBadXML(line)) \
                    .map(line_parser)\
                    .toDF(sampleRatio=0.00001)

In [None]:
cnd_count = lambda cond : F.sum(F.when(cond, 1).otherwise(0))

In [None]:
user_df_Full = user_df_Full.withColumn('UserCreationDate', F.to_timestamp('CreationDate'))

In [None]:
post_df_firstq_Full = post_df_Full.withColumn('PostCreationDate', F.to_timestamp('CreationDate'))\
                        .groupBy('OwnerUserId', 'PostTypeId')\
                        .agg((F.min('PostCreationDate')).alias('FirstPostDate')).alias('grouped')\
                        .filter(F.col('grouped.PostTypeId')=='1')

In [75]:
joined2_Full= post_df_firstq_Full.join(user_df_Full,
                             user_df_Full.Id == post_df_firstq_Full.OwnerUserId)

In [77]:
post_df_Full = post_df_Full.withColumn('PostCreationDate', F.to_timestamp('CreationDate'))

In [80]:
joined3_Full = post_df_Full.join(user_df_Full,
                    user_df_Full.Id == post_df_Full.OwnerUserId)\
                .withColumn('DiffinSec', F.col('PostCreationDate').cast('long') - 
                                        F.col('UserCreationDate').cast('long'))\
                .withColumn('DiffinDay', F.floor(F.col('DiffinSec')/3600/24))

In [81]:
ans3_Full = joined3_Full.withColumn('Veteran',
            F.when((joined3_Full.DiffinDay < 150) & (joined3_Full.DiffinDay >=100), 1).otherwise(0))

In [82]:
ans4_Full = ans3_Full.groupby('OwnerUserId')\
            .agg(F.max('Veteran').alias('Veteran'))

In [83]:
ans5_Full = post_df_firstq_Full.join(post_df_Full.drop('OwnerUserId'),
                           post_df_firstq_Full.FirstPostDate == post_df_Full.PostCreationDate)

In [84]:
ans5_Full.join(ans4_Full,
         ans5_Full.OwnerUserId == ans4_Full.OwnerUserId)\
    .filter(F.col('Veteran')==1)\
    .withColumn('AnswerCount',F.col('AnswerCount').cast('int'))\
    .withColumn('ViewCount',F.col('ViewCount').cast('int'))\
    .agg(F.avg("Score").alias("avg_score"), \
         F.avg("ViewCount").alias("avg_view"), \
         F.avg("AnswerCount").alias("avg_ans"), \
         F.avg("FavoriteCount").alias("avg_fav")).show()

+------------------+------------------+------------------+------------------+
|         avg_score|          avg_view|           avg_ans|           avg_fav|
+------------------+------------------+------------------+------------------+
|2.2633363967407396|1843.8346396040747|1.8426235821987713|0.8659919479810351|
+------------------+------------------+------------------+------------------+



In [85]:
ans5_Full.join(ans4_Full,
         ans5_Full.OwnerUserId == ans4_Full.OwnerUserId)\
    .filter(F.col('Veteran')==0)\
    .withColumn('AnswerCount',F.col('AnswerCount').cast('int'))\
    .withColumn('ViewCount',F.col('ViewCount').cast('int'))\
    .agg(F.avg("Score").alias("avg_score"), \
         F.avg("ViewCount").alias("avg_view"), \
         F.avg("AnswerCount").alias("avg_ans"), \
         F.avg("FavoriteCount").alias("avg_fav")).show()

+-----------------+------------------+------------------+------------------+
|        avg_score|          avg_view|           avg_ans|           avg_fav|
+-----------------+------------------+------------------+------------------+
|1.132576795927856|1096.1792785068071|1.5038891466685484|0.3857949160857235|
+-----------------+------------------+------------------+------------------+



## Word2vec


Word2Vec is an alternative approach for vectorizing text data. The vectorized representations of words in the vocabulary tend to be useful for predicting other words in the document, hence the famous example "vector('king') - vector('man') + vector('woman') ~= vector('queen')".

Let's see how good a Word2Vec model we can train using the **tags** of each Stack Exchange post as documents (this uses the full data set). Let's use the implementation of Word2Vec from Spark ML (this will require using DataFrames) to return a list of the top 25 closest synonyms to "ggplot2" and their similarity score in tuple format ("string", number).

The tags appear in the data as one string, I will need to separate them into individual tags. There is no need to further parse them beyond separating them.

#### Parameters


The dimensionality of the vector space should be 100. The random seed should be 42 in `PySpark`.


#### Checkpoints

* Mean of the top 25 cosine similarities: 0.8012362027168274

In [59]:
import re

def findall_tagwords(s):
    if s is None:
        return []
    wordlist = re.split(r'><',s)
    return [word.replace(r'<','').replace(r'>','') for word in wordlist]

findall_tagwords_udf = udf(findall_tagwords, ArrayType(StringType()))

In [60]:
from pyspark.ml.feature import Word2Vec

import re

wordre = re.compile('[a-z]+')

post_df_Full = post_df_Full.withColumn('Tagwords', findall_tagwords_udf(F.col('Tags')))

In [61]:
w2v = Word2Vec(inputCol='Tagwords', outputCol='vectors', vectorSize=30, minCount=5, seed=17)

In [63]:
model = w2v.fit(post_df_Full)

In [65]:
result = model.transform(post_df_Full)

In [66]:
myresult = model.findSynonyms('ggplot2', 25).toPandas()

In [67]:
word2vec= list(zip(myresult.word[:25],myresult.similarity[:25]))

## Classification


I'd like to see if we can predict the tags of a **question** from its body text. Instead of predicting specific tags, we will instead try to predict if a question contains one of the top ten most common tags.  

To this end, I have separated out a train and a test set from the original data. The training and tests sets were downloaded with the stats data at the beginning of the notebook.  You can also get them from S3:
  * `s3://dataincubator-course/spark-stats-data/posts_train.zip`
  * `s3://dataincubator-course/spark-stats-data/posts_test.zip`

This will involve two steps: first, to find the ten most common tags for questions in the training data set (the tags have been removed from the test set). Then to train a learner to predict from the text of the question (the `Body` attribute) if it should have one of those ten tags in it - you will need to process the question text with NLP techniques such as splitting the text into tokens.

Since I can't reliably pickle Spark models, instead return a list of your predictions, sorted by the question's `Id`. This sorting is very important.

As an example, if my top tags include `spark` and `python`, and we had the following questions:

```
<row Body="..." Id="1740" Tags="<machine-learning><spark><regression>" ... />
<row Body="..." Id="723" Tags="<statistics><neurons>" ... />
<row Body="..." Id="2740" Tags="<functional><python><spark><pyspark>" ... />
```

I would expect to return `[0, 1, 1]` (for the order `[723, 1740, 2740]`).  

#### Checkpoints

- Number of training posts with a tag in the top 10: `22525`
- Number without: `19540`

In [69]:
!unzip -d spark-stats-data/train spark-stats-data/posts_train.zip
!unzip -d spark-stats-data/test spark-stats-data/posts_test.zip

Archive:  spark-stats-data/posts_train.zip
replace spark-stats-data/train/part-00001? [y]es, [n]o, [A]ll, [N]one, [r]ename: ^C
Archive:  spark-stats-data/posts_test.zip
replace spark-stats-data/test/part-00001? [y]es, [n]o, [A]ll, [N]one, [r]ename: ^C


In [70]:
train_lines = sc.textFile('spark-stats-data/train/')
test_lines = sc.textFile('spark-stats-data/test/')

In [71]:
train_lines.take(3)

['<?xml version="1.0" encoding="UTF-8"?>',
 '<parent>',
 '  <row Body="&lt;p&gt;A very well described question.  In mathematical terms we have two binomial populations:&lt;/p&gt;&#10;&#10;&lt;p&gt;$$x|npI\\sim bin(n,p)$$&lt;/p&gt;&#10;&#10;&lt;p&gt;$$y|mqI\\sim bin(m,q)$$&lt;/p&gt;&#10;&#10;&lt;p&gt;Where $x$ is the number of deaths in the control group with $n$ observations, $y$ is the number of deaths in the group of size $m$ which received the enhancer, and $I$ is the prior information - which includes the binomial assumption.  The hypothesis is that $q&amp;gt;p$.  The alternative which is most obvious is $q\\leq p$ (i.e. we are not quesitoning the binomial model).  I presume your &quot;naive&quot; approach is to consider $\\hat{p}=\\frac{x}{n}$ as fixed and then test if $q&amp;gt;\\hat{p}$ using the cummulative binomial distribution.&lt;/p&gt;&#10;&#10;&lt;p&gt;To do the test and account for the uncertainty in $p$, the best approach is to integrate them out using a prior distributi

In [72]:
train_df = train_lines.filter(isRow) \
               .filter(lambda line : not isBadXML(line)) \
               .map(line_parser)\
               .toDF(sampleRatio=0.001)

In [73]:
train_df = train_df.withColumn('Tagwords', findall_tagwords_udf(F.col('Tags')))

In [74]:
train_df.select(['PostTypeId']).show(100)

+----------+
|PostTypeId|
+----------+
|         2|
|         2|
|         2|
|         1|
|         2|
|         1|
|         1|
|         2|
|         2|
|         1|
|         2|
|         1|
|         1|
|         2|
|         1|
|         5|
|         4|
|         2|
|         2|
|         2|
|         2|
|         1|
|         2|
|         2|
|         5|
|         4|
|         2|
|         2|
|         1|
|         2|
|         1|
|         2|
|         1|
|         2|
|         1|
|         2|
|         2|
|         1|
|         2|
|         1|
|         1|
|         1|
|         2|
|         2|
|         2|
|         2|
|         1|
|         1|
|         2|
|         1|
|         1|
|         2|
|         2|
|         2|
|         2|
|         2|
|         2|
|         1|
|         2|
|         1|
|         2|
|         1|
|         1|
|         2|
|         2|
|         2|
|         1|
|         1|
|         5|
|         4|
|         1|
|         2|
|         1|
|         2|

In [75]:
train_df_qs = train_df.filter(F.col('PostTypeId')=='1')

In [76]:
train_df_qs.count()

42065

In [77]:
train_df.count()

89357

In [78]:
counts = train_df_qs.select(F.explode('Tagwords').alias('col')).groupBy('col').count().collect()

In [79]:
counts

[Row(col='interaction', count=512),
 Row(col='online', count=54),
 Row(col='overdispersion', count=44),
 Row(col='likelihood', count=120),
 Row(col='clogit', count=18),
 Row(col='clustered-standard-errors', count=33),
 Row(col='counterbalancing', count=6),
 Row(col='distributions', count=1807),
 Row(col='prediction-limit', count=6),
 Row(col='cohens-d', count=28),
 Row(col='gini', count=17),
 Row(col='nlme', count=43),
 Row(col='uninformative-prior', count=13),
 Row(col='anosim', count=2),
 Row(col='intercept', count=28),
 Row(col='chemistry', count=1),
 Row(col='geometric-mean', count=1),
 Row(col='repeatability', count=10),
 Row(col='contrasts', count=80),
 Row(col='logistic', count=1627),
 Row(col='gaussian-mixture', count=100),
 Row(col='confirmatory-factor', count=67),
 Row(col='residual-analysis', count=28),
 Row(col='dag', count=5),
 Row(col='gamm4', count=7),
 Row(col='statistical-bias', count=6),
 Row(col='stochastic-ordering', count=2),
 Row(col='neuroimaging', count=7),
 Row

In [80]:
top10 = {item.col for item in sorted(counts, key= lambda cnt: cnt['count'], reverse = True)[:10]}

In [81]:
top10

{'correlation',
 'distributions',
 'hypothesis-testing',
 'logistic',
 'machine-learning',
 'probability',
 'r',
 'regression',
 'self-study',
 'time-series'}

In [85]:
from pyspark.sql.functions import udf
from pyspark.sql.types import DoubleType, StringType, IntegerType, ArrayType, BooleanType
from pyspark.sql import functions as F

In [86]:
def ifintop10(li):
    flag = False
    for item in li:
        if item in top10:
            flag = True
    return flag

ifintop10_udf = udf(ifintop10, BooleanType())

In [87]:
train_df_qs = train_df_qs.withColumn('label' , F.when(ifintop10_udf(F.col('Tagwords')), 1).otherwise(0))

In [88]:
train_df_qs.agg(cnd_count(F.col('label')==1)).show()

+--------------------------------------------+
|sum(CASE WHEN (label = 1) THEN 1 ELSE 0 END)|
+--------------------------------------------+
|                                       22525|
+--------------------------------------------+



In [89]:
from pyspark.ml.classification import LogisticRegression, RandomForestClassifier
from pyspark.ml.feature import HashingTF, Tokenizer

In [90]:
tokenizer = Tokenizer(inputCol='Body', outputCol='words')
hashingTF = HashingTF(inputCol=tokenizer.getOutputCol(), outputCol='features', numFeatures = 10000)
logreg = LogisticRegression(maxIter=100, regParam=0.1)
ranfor = RandomForestClassifier(maxDepth=5,numTrees=3)

In [91]:
tokens = tokenizer.transform(train_df_qs)

In [92]:
hashes = hashingTF.transform(tokens)

In [93]:
model = logreg.fit(hashes)

In [94]:
test_df = test_lines.filter(isRow) \
               .filter(lambda line : not isBadXML(line)) \
               .map(line_parser)\
               .toDF(sampleRatio=0.001)

In [96]:
test_df_qs = test_df.filter(F.col('PostTypeId')=='1')

In [97]:
test_df_qs.count()

4649

In [98]:
test_tokens = tokenizer.transform(test_df_qs)
test_hashes = hashingTF.transform(test_tokens)

In [99]:
test_hashes

DataFrame[AnswerCount: string, Body: string, CommentCount: string, CreationDate: string, Id: string, LastActivityDate: string, OwnerUserId: string, PostTypeId: string, Score: string, Tags: string, Title: string, ViewCount: string, LastEditDate: string, LastEditorUserId: string, ParentId: string, AcceptedAnswerId: string, OwnerDisplayName: string, LastEditorDisplayName: string, words: array<string>, features: vector]

In [100]:
test_predictions = model.transform(test_hashes)

In [101]:
mydf = test_predictions.withColumn('Id',F.col('Id').cast('int'))\
                        .sort('Id')\
                        .withColumn('prediction',F.col('prediction').cast('int'))\
                        .select(['prediction']).toPandas()

In [102]:
classification = list(mydf['prediction'])