In [19]:
pip install autopep8

Collecting autopep8
  Using cached autopep8-1.7.0-py2.py3-none-any.whl (45 kB)
Collecting pycodestyle>=2.9.1
  Using cached pycodestyle-2.9.1-py2.py3-none-any.whl (41 kB)
Installing collected packages: pycodestyle, autopep8
Successfully installed autopep8-1.7.0 pycodestyle-2.9.1
Note: you may need to restart the kernel to use updated packages.


In [46]:
import seaborn as sns
sns.set()

# Spark Miniproject


Stack Overflow is a collaboratively edited question-and-answer site originally focused on programming topics. Because of the variety of features tracked, including a variety of feedback metrics, it allows for some open-ended analysis of user behavior on the site.

Stack Exchange (the parent organization) provides an anonymized [data dump](https://archive.org/details/stackexchange), and we'll use Spark to perform data manipulation, analysis, and machine learning on this data set. As a side note, there's also an online data explorer which allows you to query the data interactively.

*Consider*: Do we need to use Spark to work with this data set? What are our alternatives?

## Workflow


Most questions can be done locally, however in some cases you may want to use cloud services. See the appropriate lecture notebooks for information on how to use cloud services.

Python example:

1. Edit source code in your `main.py` file, classes in a separate `classes.py` (class definitions need to be written in a separate file and then included at runtime).
1. Run locally on a chunk using eg. `$SPARK_HOME/bin/spark-submit --py-files src/classes.py src/main.py data/stats results/stats/`
1. Run on GCP once your testing and development are done.

General tips:
* Try `cat output_dir/* | sort -n -t , -k 1.2 -o sorted_output` to concatenate your output files, which will also be in `part-xxxxx` format.
* You can access an interactive PySpark shell with `$SPARK_HOME/bin/pyspark`.

## Accessing the data


The data is available on S3 (`s3://dataincubator-course/spark-stack-data`). There are three sub-folders, `allUsers`, `allPosts`, and `allVotes`, which contain Gzipped XML with the following format:

``` html
<row Body="&lt;p&gt;I always validate my web pages, and I recommend you do the same BUT many large company websites DO NOT and cannot validate because the importance of the website looking exactly the same on all systems requires rules to be broken. &lt;/p&gt;&#10;&#10;&lt;p&gt;In general, valid websites help your page look good even on odd configurations (like cell phones) so you should always at least try to make it validate.&lt;/p&gt;&#10;" CommentCount="0" CreationDate="2008-10-12T20:26:29.397" Id="195995" LastActivityDate="2008-10-12T20:26:29.397" OwnerDisplayName="Eric Wendelin" OwnerUserId="25066" ParentId="195973" PostTypeId="2" Score="0" />
```

Data from the much smaller `stats.stackexchange.com` is available in the same format on S3 (`s3://dataincubator-course/spark-stats-data`). This site, Cross-Validated, will be used below in some instances to avoid working with the full data set for every question.

The full schema is available as a text file:

In [48]:
# !aws s3 cp s3://dataincubator-course/spark-stats-data/stack_exchange_schema.txt .

You can either get the data by running the appropriate S3 commands in the terminal, or by running this block for the smaller stats data set:

In [49]:
# !mkdir -p spark-stats-data
# !aws s3 sync --exclude '*' --include 'all*' s3://dataincubator-course/spark-stats-data/ ./spark-stats-data
# !aws s3 sync --exclude '*' --include 'posts*zip' s3://dataincubator-course/spark-stats-data/ ./spark-stats-data

And to get the much larger full data set (be warned, this can take 20 or more minutes, so you may want to run it in the terminal to avoid locking up the notebook):

In [50]:
# !mkdir -p spark-stack-data
# !aws s3 sync --exclude '*' --include 'all*' s3://dataincubator-course/spark-stack-data/ ./spark-stack-data

## Data input and parsing


Some rows are split across multiple lines; these can be discarded. Incorrectly formatted XML can also be ignored. It is enough to simply skip problematic rows, the loss of data will not significantly impact our results on this large data sets.

You will need to handle XML parsing yourself.  Our solution uses `lxml.etree` in Python.

To make your code more flexible, it's also recommended to incorporate command-line arguments that specify the location of the input data and where output should be written.

The goal should be to have a parsing function that can be applied to the input data to access any XML element desired. It is suggested to use a class structure so that you can create RDDs of Posts, Votes, Users, etc.

``` python
# Command line arguments using sysv or argparse in Python
if __name__ == '__main__':
    main(ARGS.input_dir, ARGS.output_dir)
```
https://towardsdatascience.com/a-simple-guide-to-command-line-arguments-with-argparse-6824c30ab1c3

## Questions

## Question 1: Bad XML


A simple question to test your parsing code. Create an RDD of Post objects where each Post is a valid row of XML from the Cross-Validated (stats.stackexchange.com) `allPosts` data set.

We are going to take several shortcuts to speed up and simplify our computations.  First, your parsing function should only attempt to parse rows that start with `<row` as these denote actual data entries. This should be done in Spark as the data is being read in from disk, without any pre-Spark processing. 

Return the total number of XML rows that started with `<row` that were subsequently **rejected** during your processing.  Note that the text is unicode, and contains non-ASCII characters.  You may need to re-encode to UTF-8 (depending on your XML parser)

Note that this cleaned data set will be used for all subsequent questions.

*Question*: Can you figure out what filters you need to put in place to avoid throwing parsing errors entirely?

In [51]:
from pyspark import SparkContext
import xml.etree.ElementTree as ET
import os
sc = SparkContext("local[*]", "temp")

ValueError: Cannot run multiple SparkContexts at once; existing SparkContext(app=temp, master=local[*]) created by __init__ at /tmp/ipykernel_53/1279373348.py:8 

In [52]:
def parse_line(line):
    try:
        parsed_line = ET.fromstring(line)
        return parsed_line
    except:
        return 0


def filter_xml_lines(input_location):
    lines = sc.textFile(input_location)
    filtered_lines = lines.filter(lambda x: '<row' in x)
    return filtered_lines


def parse_xml(lines):
    return lines.map(parse_line).filter(lambda x: x != 0)


def save_parsed_lines(parsed_lines, output_location):
    try:
        parsed_lines.saveAsTextFile(output_location)
    except:
        raise EEXIST

In [53]:
before_parsing = filter_xml_lines('./spark-stats-data/allPosts')

In [54]:
after_parsing = parse_xml(before_parsing)

In [55]:
# save_parsed_lines(after_parsing, "Q_1_files")

In [1]:
bad_xml = before_parsing.count() - after_parsing.count()


NameError: name 'before_parsing' is not defined

## Question 2: Favorites and scores

We're interested in looking for useful patterns in the data.  If we look at the Post data again (the smaller set, `stats.stackexchange.com`), we see that many things about each post are recorded.  We're going to start by looking to see if there is a relationship between the number of times a post was favorited (the `FavoriteCount`) and the `Score`.  The score is the number of times the post was upvoted minus the number of times it was downvoted, so it is a measure of how much a post was liked.  We'd expect posts with a higher number of favorites to have better scores, since they're both measurements of how good the post is.

Let's aggregate posts by the number of favorites, and find the average score for each number of favorites.  Do this for the lowest 50 numbers of favorites.

If any field in the Posts or Users is missing, such as the `FavoriteCount`, you should assume it is zero. Make this assumption for all questions going forward.

_Note:_ Before submitting, take a look at the numbers.  Do they follow the trend you expect?

**Checkpoints**

- Total score across all posts: 299469
- Mean of first 50 favorite counts (averaging the keys themselves): 24.76

In [57]:
def fav_score_filter(parsed_line):
    if "Score" in parsed_line.attrib:
        score = parsed_line.attrib['Score']
    else:
        score = 0
    if "FavoriteCount" in parsed_line.attrib:
        fav_count = parsed_line.attrib['FavoriteCount']
    else:
        fav_count = 0

    return (int(fav_count), int(score))

In [58]:
score_fav = after_parsing.map(fav_score_filter)

In [59]:
score_fav.take(5)

[(0, 1), (0, 5), (0, 0), (0, 7), (0, 1)]

In [60]:
total = score_fav.sortByKey(lambda x: x[0])\
    .reduceByKey(lambda x, y: x+y)

count = score_fav.map(lambda x: x[0]).map(lambda x: (x, 1))\
    .sortByKey(lambda x: x[0])\
    .reduceByKey(lambda x, y: x+y)

In [61]:

# sorted(list(total.join(count).map(lambda x: (x[0],x[1][0]/x[1][1])).collect()))[:50]

In [2]:
favorite_score = sorted(list(total.join(count).map(
    lambda x: (x[0], x[1][0]/x[1][1])).collect()))[:50]


NameError: name 'total' is not defined

## Question 3: Answer percentage


Investigate the correlation between a user's reputation and the kind of posts they make. For the 99 users with the highest reputation, single out posts which are either questions or answers and look at the percentage of these posts that are answers: *(answers / (answers + questions))*. 

Return a tuple of their **user ID** and this fraction.

You should also return (-1, fraction) to represent the case where you average over all users (so you will return 100 entries total).

Again, you only need to run this on the statistics overflow set.


#### Checkpoints

* Total questions: 52,060
* Total answers: 55,304
* Top 99 users' average reputation: 11893.464646464647

In [63]:
users_data = sc.textFile('./spark-stats-data/allUsers')

In [64]:
users_data.count()

100425

In [65]:
def filter_user_data(line):
    if '<row' in line:
        try:
            root = ET.fromstring(line)
            if "Id" in root.attrib:
                user_id = root.attrib['Id']
                if 'Reputation' in root.attrib:
                    reputation = root.attrib['Reputation']
                else:
                    reputation = 0
                return (int(user_id), int(reputation))
            else:
                return ('empty')

        except:
            return ('empty')
    else:
        return ('empty')

In [66]:
valid_users_data = users_data.map(filter_user_data).filter(
    lambda x: x != 'empty').sortBy(lambda x: -x[1])

In [67]:
valid_users_data.count()

50320

In [68]:
def filter_posts_with_id_and_type(parsed_line):
    try:
        if "PostTypeId" and "OwnerUserId" in parsed_line.attrib:
            user_id = parsed_line.attrib['OwnerUserId']
            ppst_type = int(parsed_line.attrib['PostTypeId'])
            if ppst_type == 1 or ppst_type == 2:
                return (int(user_id), ppst_type)
            else:
                return ('empty')
        else:
            return ('empty')
    except:
        return ('empty')

In [69]:
user_id_and_post_type = after_parsing.map(
    filter_posts_with_id_and_type).filter(lambda x: x != 'empty')

In [70]:
questions = user_id_and_post_type.filter(
    lambda x: x[1] == 1).reduceByKey(lambda x, y: x+y)

answers = user_id_and_post_type.filter(lambda x: x[1] == 2).map(
    lambda x: (x[0], 1)).reduceByKey(lambda x, y: x+y)

In [71]:
q3_peocessed_rdd = valid_users_data.leftOuterJoin(answers).leftOuterJoin(questions).map(lambda x: (
    x[0], x[1][0][0], x[1][0][1] if x[1][0][1] else 0, x[1][1] if x[1][1] else 0)).sortBy(lambda x: -x[1])

In [72]:
q3_final = q3_peocessed_rdd.map(lambda x: (
    x[0], x[1], x[2], x[3], x[2]/(x[2]+x[3] if x[2] or x[3] else 0)))

In [73]:
total, n_count = valid_users_data.leftOuterJoin(answers).leftOuterJoin(questions)\
    .map(lambda x: (x[0], x[1][0][0], x[1][0][1] if x[1][0][1] else 0, x[1][1] if x[1][1] else 0))\
    .map(lambda x: (x[0], x[2]/(x[2]+x[3]) if x[2] or x[3] else 0, 1))\
    .map(lambda x: (x[1], x[2]))\
    .reduce(lambda x, y: (x[0]+y[0], x[1]+y[1]))

In [74]:
answer_percentage = q3_final.map(lambda x: (x[0], x[4])).take(99)
answer_percentage.append((-1, total/n_count))

In [3]:
answer_percentage = answer_percentage


NameError: name 'answer_percentage' is not defined

## Question 4: First question

We'd expect the first **question** a user asks to be indicative of their future behavior.  We'll dig more into that in the next problem, but for now let's see the relationship between reputation and how long it took each person to ask their first question.

For each user that asked a question, find the difference between when their account was created (`CreationDate` for the User) and when they asked their first question (`CreationDate` for their first question).  Return this time difference in days (round down, so 2.7 days counts as 2 days) for the 100 users with the highest reputation, in the form

`(UserId, Days)`

**Checkpoints**
- Users that asked a question: 23134
- Average number of days (round each user's days, then average): 30.1074258

In [77]:
# sc.stop()

In [78]:
# from datetime import timedelta, datetime

# from pyspark import SparkContext
# # import xml.etree.ElementTree as ET   
# import os
# from lxml import etree as Et

# sc = SparkContext("local[*]", "temp")

# from pyspark.sql import SQLContext

# sqlContext = SQLContext(sc)

In [79]:
def filter_user_creation_date_data(line):
    if '<row' in line:
        try:
            root = ET.fromstring(line)
            if "Id" in root.attrib:
                user_id = root.attrib['Id']
                if 'Reputation' in root.attrib:
                    reputation = root.attrib['Reputation']
                else:
                    reputation = 0
                if 'CreationDate' in root.attrib:
                    creation_date = datetime.strptime(
                        root.attrib['CreationDate'], '%Y-%m-%dT%H:%M:%S.%f')
                else:
                    creation_date = None
                return (int(user_id), (int(reputation), creation_date))
            else:
                return ('empty')

        except:
            return ('empty')
    else:
        return ('empty')

In [80]:
users_data = sc.textFile('./spark-stats-data/allUsers')

In [81]:
used_data_with_creation_date = users_data.map(
    filter_user_creation_date_data).filter(lambda x: x != 'empty')

In [82]:
used_data_with_creation_date.take(5)

[(70185, (1, datetime.datetime(2015, 3, 2, 18, 42, 20, 510000))),
 (70186, (6, datetime.datetime(2015, 3, 2, 19, 4, 13, 380000))),
 (70187, (1, datetime.datetime(2015, 3, 2, 19, 40, 16, 420000))),
 (70188, (1, datetime.datetime(2015, 3, 2, 19, 46, 45, 400000))),
 (70189, (101, datetime.datetime(2015, 3, 2, 19, 56, 37, 233000)))]

In [83]:
def post_first_question(line):
    if '<row' in line:
        try:
            parsed_line = ET.fromstring(line)
            if "PostTypeId" and "OwnerUserId" and 'CreationDate' in parsed_line.attrib:
                user_id = parsed_line.attrib['OwnerUserId']
                post_type = int(parsed_line.attrib['PostTypeId'])
                creation_date = datetime.strptime(
                    parsed_line.attrib['CreationDate'], '%Y-%m-%dT%H:%M:%S.%f')
                if post_type == 1:
                    return (int(user_id), creation_date)
                else:
                    return ('empty')
            else:
                return ('empty')
        except:
            return ('empty')
    else:
        return ('empty')

In [84]:
first_post_info = before_parsing.map(post_first_question).filter(
    lambda x: x != 'empty').reduceByKey(lambda x, y: min(x, y))

In [85]:
def time_delta(after, before):
    if before is not None and after is not None:
        diff = (after - before).days
        return diff if diff > 0 else 0
    else:
        return None

In [86]:
first_question = used_data_with_creation_date.join(first_post_info)\
    .map(lambda x: (x[0], x[1][0][0], x[1][0][1], x[1][1]))\
    .map(lambda x: (x[0], x[1], time_delta(x[3], x[2])))\
    .sortBy(lambda x: -x[1])\
    .map(lambda x: (x[0], x[2]))\
    .take(100)

In [88]:
def filter_rows(line):
    if '<row' not in line:
        return None
    
    try:
        root = ET.fromstring(line)
        if root.tag == 'row':
            return dict(root.attrib)
    except ET.ParseError:
        pass
    
    return None       

In [89]:
users_data = sc.textFile('./spark-stats-data/allUsers')
df = users_data.map(filter_rows).filter(lambda x: x!= None).toDF()

In [90]:
df.show()

+---------+--------------------+-----------------+---------+-----+--------------------+--------------------+----------+-------+-----+
|AccountId|        CreationDate|      DisplayName|DownVotes|   Id|      LastAccessDate|     ProfileImageUrl|Reputation|UpVotes|Views|
+---------+--------------------+-----------------+---------+-----+--------------------+--------------------+----------+-------+-----+
|  5872878|2015-03-02T18:42:...|      Lars Reeker|        0|70185|2015-03-02T18:42:...|https://lh3.googl...|         1|      0|    0|
|  5872995|2015-03-02T19:04:...|              Vra|        0|70186|2015-03-06T15:45:...|                null|         6|      0|    1|
|  5873177|2015-03-02T19:40:...|           Aroona|        0|70187|2015-03-02T19:40:...|https://www.grava...|         1|      0|    0|
|  5873184|2015-03-02T19:46:...|           Yazeed|        0|70188|2015-03-02T19:46:...|https://www.grava...|         1|      0|    0|
|   228681|2015-03-02T19:56:...|           Taimur|        0|70

In [91]:
df.printSchema()

root
 |-- AccountId: string (nullable = true)
 |-- CreationDate: string (nullable = true)
 |-- DisplayName: string (nullable = true)
 |-- DownVotes: string (nullable = true)
 |-- Id: string (nullable = true)
 |-- LastAccessDate: string (nullable = true)
 |-- ProfileImageUrl: string (nullable = true)
 |-- Reputation: string (nullable = true)
 |-- UpVotes: string (nullable = true)
 |-- Views: string (nullable = true)



In [92]:
from pyspark.sql.functions import to_timestamp
from pyspark.sql.functions import col

In [93]:
df.withColumn("CreationDate_ts",
        to_timestamp("CreationDate")).printSchema()

root
 |-- AccountId: string (nullable = true)
 |-- CreationDate: string (nullable = true)
 |-- DisplayName: string (nullable = true)
 |-- DownVotes: string (nullable = true)
 |-- Id: string (nullable = true)
 |-- LastAccessDate: string (nullable = true)
 |-- ProfileImageUrl: string (nullable = true)
 |-- Reputation: string (nullable = true)
 |-- UpVotes: string (nullable = true)
 |-- Views: string (nullable = true)
 |-- CreationDate_ts: timestamp (nullable = true)



## Question 5: Identify veterans


It can be interesting to think about what factors influence a user to remain active on the site over a long period of time. In order not to bias the results towards older users, we'll define a time window between 100 and 150 days after account creation. If the user has made a post in this time, we'll consider them active and well on their way to being veterans of the site; if not, they are inactive and were likely brief users.

*Consider*: What other parameterizations of "activity" could we use, and how would they differ in terms of splitting our user base?

*Consider*: What other biases are still not dealt with, after using the above approach?

Let's see if there are differences between the first ever question posts of "veterans" vs. "brief users". For each group separately, average the score, views, number of answers, and number of favorites of the users' **first question**. Remember, if the score, views, answers, or favorites is missing, you should assume it is zero.

*Consider*: What story could you tell from these numbers? How do the numbers support it?


#### Checkpoints

* Total brief users: 24,864
* Total veteran users: 2,027

In [3]:
# sc.stop()

NameError: name 'sc' is not defined

In [4]:
# from pyspark import SparkContext
# import xml.etree.ElementTree as ET   
# import os
# sc = SparkContext("local[*]", "temp")

In [94]:
posts_stats_data = sc.textFile('./spark-stats-data/allPosts')
user_stats_data = sc.textFile('./spark-stats-data/allUsers')

In [95]:
def filter_user_cr_date_no_rep_data(line):
    if '<row' in line:
        try:
            root = ET.fromstring(line)
            if "Id" in root.attrib:
                user_id = root.attrib['Id']
                if 'CreationDate' in root.attrib:
                    creation_date = datetime.strptime(
                        root.attrib['CreationDate'], '%Y-%m-%dT%H:%M:%S.%f')
                else:
                    creation_date = None
                return (int(user_id), creation_date)
            else:
                return ('empty')

        except:
            return ('empty')
    else:
        return ('empty')

In [96]:
valid_users = user_stats_data.map(
    filter_user_cr_date_no_rep_data).filter(lambda x: x != 'empty')

In [97]:
def post_user_cre_date(line):
    if '<row' in line:
        try:
            parsed_line = ET.fromstring(line)
            if "OwnerUserId" and 'CreationDate' in parsed_line.attrib:
                user_id = parsed_line.attrib['OwnerUserId']
                creation_date = datetime.strptime(
                    parsed_line.attrib['CreationDate'], '%Y-%m-%dT%H:%M:%S.%f')
                return (int(user_id), creation_date)
            else:
                return ('empty')
        except:
            return ('empty')
    else:
        return ('empty')

In [98]:
valid_posts = posts_stats_data.map(
    post_user_cre_date).filter(lambda x: x != 'empty')

In [99]:
def time_delta(after, before):
    if before is not None and after is not None:
        diff = (after - before).days
        return diff if diff > 0 else 0
    else:
        return None


def validity_check(days):
    if days is None:
        return 0
    if int(days) >= 100 and int(days) <= 150:
        return 1
    else:
        return 0

In [100]:
all_user_cat = valid_users.join(valid_posts)\
    .map(lambda x: (x[0], time_delta(x[1][1], x[1][0])))\
    .map(lambda x: (x[0], validity_check(x[1])))\
    .reduceByKey(lambda x, y: max(x, y))

In [101]:
veterans = all_user_cat.filter(lambda x: x[1] == 1)
brief_views = all_user_cat.filter(lambda x: x[1] == 0)

In [102]:
def post_info_data(line):
    if '<row' in line:
        try:
            parsed_line = ET.fromstring(line)
            if "PostTypeId" and "OwnerUserId" and 'CreationDate' in parsed_line.attrib:
                if int(parsed_line.attrib['PostTypeId']) == 1:
                    user_id = parsed_line.attrib['OwnerUserId']
                    creation_date = datetime.strptime(
                        parsed_line.attrib['CreationDate'], '%Y-%m-%dT%H:%M:%S.%f')
                    answer_count = parsed_line.attrib.get("AnswerCount", 0)
                    view_count = parsed_line.attrib.get("ViewCount", 0)
                    fav_score = parsed_line.attrib.get("FavoriteCount", 0)
                    score = parsed_line.attrib.get("Score", 0)
                    return (int(user_id), creation_date, int(score), int(view_count), int(answer_count), int(fav_score))
                else:
                    return ('empty')

            else:
                return ('empty')
        except:
            return ('empty')
    else:
        return ('empty')

In [103]:
all_question = posts_stats_data.map(lambda x: post_info_data(x))\
    .filter(lambda x: x != 'empty')\
    .map(lambda x: ((x[0], x[1]), (x[2], x[3], x[4], x[5])))

In [104]:
first_ques = all_question.map(lambda x: (x[0][0], x[0][1]))\
    .reduceByKey(lambda x, y: min(x, y))\
    .map(lambda x: ((x[0], x[1]), 1))

In [105]:
all_user_first_q_stats = first_ques.join(all_question)\
    .map(lambda x: (x[0][0], (x[1][1][0], x[1][1][1], x[1][1][2], x[1][1][3])))

In [106]:
vet_score, vet_views, vet_answers, vet_favorites, vet_count = veterans.join(all_user_first_q_stats)\
    .map(lambda x: (x[0], (x[1][1][0], x[1][1][1], x[1][1][2], x[1][1][3])))\
    .map(lambda x: (x[1][0], x[1][1], x[1][2], x[1][3], 1))\
    .reduce(lambda x, y: (x[0]+y[0], x[1]+y[1], x[2]+y[2], x[3]+y[3], x[4]+y[4]))

In [107]:
brief_score, brief_views, brief_answers, brief_favorites, brief_count = brief_views.join(all_user_first_q_stats)\
    .map(lambda x: (x[0], (x[1][1][0], x[1][1][1], x[1][1][2], x[1][1][3])))\
    .map(lambda x: (x[1][0], x[1][1], x[1][2], x[1][3], 1))\
    .reduce(lambda x, y: (x[0]+y[0], x[1]+y[1], x[2]+y[2], x[3]+y[3], x[4]+y[4]))

In [108]:
vet_avg = [x/vet_count for x in [vet_score, vet_views, vet_answers, vet_favorites]]

In [109]:
brief_avg = [x/brief_count for x in [brief_score, brief_views, brief_answers, brief_favorites]]

In [110]:
all_avg = vet_avg + brief_avg

In [111]:
names=["vet_score","vet_views","vet_answers","vet_favorites","brief_score","brief_views","brief_answers","brief_favorites"]

In [112]:
identify_veterans = {name:avg for name,avg in zip(names,all_avg)}

In [113]:
identify_veterans

{'vet_score': 3.5322843190450355,
 'vet_views': 927.7042864894195,
 'vet_answers': 1.2962561041779708,
 'vet_favorites': 1.2930005425935973,
 'brief_score': 2.1001740930692137,
 'brief_views': 552.9433491742342,
 'brief_answers': 0.9704512304145297,
 'brief_favorites': 0.5756834329271162}

## Question 6: Identify veterans&mdash;full


Same as above, but on the full Stack Exchange data set.

No pre-parsed data is available for this question.


#### Checkpoints

* Total brief users: 1,848,628
* Total veteran users: 288,285

In [115]:
posts_stats_data_long = sc.textFile('./spark-stack-data/allPosts')
user_stats_data_long = sc.textFile('./spark-stack-data/allUsers')

In [None]:
valid_users = user_stats_data_long.map(
    filter_user_cr_date_no_rep_data).filter(lambda x: x != 'empty')

valid_posts = posts_stats_data_long.map(
    post_user_cre_date).filter(lambda x: x != 'empty')

all_user_cat = valid_users.join(valid_posts)\
    .map(lambda x: (x[0], time_delta(x[1][1], x[1][0])))\
    .map(lambda x: (x[0], validity_check(x[1])))\
    .reduceByKey(lambda x, y: max(x, y))

veterans = all_user_cat.filter(lambda x: x[1] == 1)
brief = all_user_cat.filter(lambda x: x[1] == 0)

all_question = posts_stats_data_long.map(lambda x: post_info_data(x))\
    .filter(lambda x: x != 'empty')\
    .map(lambda x: ((x[0], x[1]), (x[2], x[3], x[4], x[5])))

first_ques = all_question.map(lambda x: (x[0][0], x[0][1]))\
    .reduceByKey(lambda x, y: min(x, y))\
    .map(lambda x: ((x[0], x[1]), 1))

all_user_first_q_stats = first_ques.join(all_question)\
    .map(lambda x: (x[0][0], (x[1][1][0], x[1][1][1], x[1][1][2], x[1][1][3])))

vet_score, vet_views, vet_answers, vet_favorites, vet_count = veterans.join(all_user_first_q_stats)\
    .map(lambda x: (x[0], (x[1][1][0], x[1][1][1], x[1][1][2], x[1][1][3])))\
    .map(lambda x: (x[1][0], x[1][1], x[1][2], x[1][3], 1))\
    .reduce(lambda x, y: (x[0]+y[0], x[1]+y[1], x[2]+y[2], x[3]+y[3], x[4]+y[4]))

brief_score, brief_views, brief_answers, brief_favorites, brief_count = brief.join(all_user_first_q_stats)\
    .map(lambda x: (x[0], (x[1][1][0], x[1][1][1], x[1][1][2], x[1][1][3])))\
    .map(lambda x: (x[1][0], x[1][1], x[1][2], x[1][3], 1))\
    .reduce(lambda x, y: (x[0]+y[0], x[1]+y[1], x[2]+y[2], x[3]+y[3], x[4]+y[4]))

vet_avg = [x/vet_count for x in [vet_score, vet_views, vet_answers, vet_favorites]]

brief_avg = [x/brief_count for x in [brief_score, brief_views, brief_answers, brief_favorites]]

all_avg = vet_avg + brief_avg

names=["vet_score","vet_views","vet_answers","vet_favorites","brief_score","brief_views","brief_answers","brief_favorites"]

identify_veterans_full = {name:avg for name,avg in zip(names,all_avg)}

## Question 7: Word2vec


Word2Vec is an alternative approach for vectorizing text data. The vectorized representations of words in the vocabulary tend to be useful for predicting other words in the document, hence the famous example "vector('king') - vector('man') + vector('woman') ~= vector('queen')".

Let's see how good a Word2Vec model we can train using the **tags** of each Stack Exchange post as documents (this uses the full data set). Use the implementation of Word2Vec from Spark ML (this will require using DataFrames) to return a list of the top 25 closest synonyms to "ggplot2" and their similarity score in tuple format ("string", number).

The tags appear in the data as one string, you will need to separate them into individual tags. There is no need to further parse them beyond separating them.

#### Parameters


The dimensionality of the vector space should be 100. The random seed should be 42 in `PySpark`.


#### Checkpoints

* Mean of the top 25 cosine similarities: 0.8012362027168274

In [3]:
sc.stop()

NameError: name 'sc' is not defined

In [3]:
from pyspark.ml.feature import Word2Vec
from pyspark import SparkContext, SparkConf
import re
from lxml import etree
from lxml.etree import XMLSyntaxError

sc = SparkContext("local[*]", "temp")

from pyspark.sql import SQLContext

sqlContext = SQLContext(sc)

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


In [5]:

posts = sc.textFile('./spark-stack-data/allPosts')

In [6]:
def tag_post(line):
    if '<row' in line:
        try:
            root = etree.fromstring(line)
            if "Tags" in root.attrib:
                tag = root.attrib['Tags']
                return tag
            else:
                return ('empty')
        except XMLSyntaxError:
            return ('empty')
    else:
        return ('empty')

In [7]:
tag_posts = posts.map(tag_post).filter(lambda x: x!= 'empty')

In [8]:
tag_posts.take(5)

['<c#><html><asp.net-mvc><validation><razor>',
 '<java><ftp><ftp-client>',
 '<sharepoint><web-parts>',
 '<ios><animation><frameworks><static><crash>',
 '<android><android-activity><organization>']

In [12]:
df.show()

+--------------------+-----+
|                text|score|
+--------------------+-----+
|[c#, html, asp.ne...|    1|
|[java, ftp, ftp-c...|    1|
|[sharepoint, web-...|    1|
|[ios, animation, ...|    1|
|[android, android...|    1|
|        [freeswitch]|    1|
|[mysql, command-p...|    1|
|   [sql, sql-server]|    1|
|   [android, aptana]|    1|
|        [php, mysql]|    1|
|[iphone, iphone-p...|    1|
|[c#, jquery, json...|    1|
|      [php, mongodb]|    1|
|[excel, vba, init...|    1|
|[cryptography, pa...|    1|
|[url, jboss, loca...|    1|
|[hibernate, jquer...|    1|
|[jquery, jqgrid, ...|    1|
|[assembly, strcat...|    1|
|  [linux, vim, sudo]|    1|
+--------------------+-----+
only showing top 20 rows



In [14]:
df=tag_posts.map(lambda line: ([s for s in re.split("<|>", line) if s != ''], 1))\
            .toDF(['text', 'score'])

w2v = Word2Vec(inputCol="text", outputCol="vectors", vectorSize=100,minCount=10,seed=42)
model = w2v.fit(df)
result = model.transform(df)

# print(model.findSynonyms('ggplot2', 25).rdd.take(25))

In [15]:
word_list = model.findSynonyms('ggplot2', 25).rdd.take(25)

In [16]:
word2vec = [(x['word'],x['similarity']) for x in word_list]
word2vec

[('lattice', 0.8926142454147339),
 ('r-grid', 0.8519014120101929),
 ('plotrix', 0.8382738828659058),
 ('line-plot', 0.8260414004325867),
 ('ecdf', 0.815007209777832),
 ('levelplot', 0.8147303462028503),
 ('ggvis', 0.8136793375015259),
 ('quantile', 0.8123936057090759),
 ('boxplot', 0.808078944683075),
 ('standard-error', 0.8051626086235046),
 ('loess', 0.799207329750061),
 ('rgl', 0.7975688576698303),
 ('plotmath', 0.7954445481300354),
 ('categorical-data', 0.7846398949623108),
 ('density-plot', 0.7797976732254028),
 ('do.call', 0.7794148325920105),
 ('gridextra', 0.7787784934043884),
 ('r-factor', 0.7784388661384583),
 ('r-raster', 0.778320848941803),
 ('weibull', 0.7760669589042664),
 ('tapply', 0.7753050923347473),
 ('lm', 0.7751830816268921),
 ('vegan', 0.7751252055168152),
 ('dplyr', 0.7744709849357605),
 ('mgcv', 0.7738687992095947)]

## Question 8: Classification


We'd like to see if we can predict the tags of a **question** from its body text. Instead of predicting specific tags, we will instead try to predict if a question contains one of the top ten most common tags.  

To this end, we have separated out a train and a test set from the original data.  The training and tests sets were downloaded with the stats data at the beginning of the notebook.  You can also get them from S3:
  * `s3://dataincubator-course/spark-stats-data/posts_train.zip`
  * `s3://dataincubator-course/spark-stats-data/posts_test.zip`

This will involve two steps: first, find the ten most common tags for questions in the training data set (the tags have been removed from the test set). Then train a learner to predict from the text of the question (the `Body` attribute) if it should have one of those ten tags in it - you will need to process the question text with NLP techniques such as splitting the text into tokens.

Since we can't reliably pickle Spark models, instead return a list of your predictions, sorted by the question's `Id`.  This sorting is very important, as our grader expects the results to be submitted in a particular order. These predictions should be `0` if the question isn't expected to have a tag in the top ten, and `1` if it is.

As an example, if our top tags include `spark` and `python`, and we had the following questions:

```
<row Body="..." Id="1740" Tags="<machine-learning><spark><regression>" ... />
<row Body="..." Id="723" Tags="<statistics><neurons>" ... />
<row Body="..." Id="2740" Tags="<functional><python><spark><pyspark>" ... />
```

We would expect to return `[0, 1, 1]` (for the order `[723, 1740, 2740]`).  You may need to do some format manipulation in your DataFrame to get this.

#### Checkpoints

- Number of training posts with a tag in the top 10: `22525`
- Number without: `19540`

In [18]:
# !unzip -d spark-stats-data/train spark-stats-data/posts_train.zip
# !unzip -d spark-stats-data/test spark-stats-data/posts_test.zip

Archive:  spark-stats-data/posts_train.zip
  inflating: spark-stats-data/train/part-00001  
  inflating: spark-stats-data/train/part-00002  
  inflating: spark-stats-data/train/part-00003  
  inflating: spark-stats-data/train/part-00004  
  inflating: spark-stats-data/train/part-00005  
  inflating: spark-stats-data/train/part-00006  
  inflating: spark-stats-data/train/part-00007  
  inflating: spark-stats-data/train/part-00008  
  inflating: spark-stats-data/train/part-00009  
  inflating: spark-stats-data/train/part-00010  
Archive:  spark-stats-data/posts_test.zip
  inflating: spark-stats-data/test/part-00001  
  inflating: spark-stats-data/test/part-00002  
  inflating: spark-stats-data/test/part-00003  
  inflating: spark-stats-data/test/part-00004  
  inflating: spark-stats-data/test/part-00005  
  inflating: spark-stats-data/test/part-00006  
  inflating: spark-stats-data/test/part-00007  
  inflating: spark-stats-data/test/part-00008  
  inflating: spark-stats-data/test/part-0

In [3]:
from bs4 import BeautifulSoup
from pyspark.ml.feature import Word2Vec
from pyspark import SparkContext, SparkConf
import re
from lxml import etree
from lxml.etree import XMLSyntaxError

sc = SparkContext("local[*]", "temp")

from pyspark.sql import SQLContext

sqlContext = SQLContext(sc)

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


In [4]:
train = sc.textFile('./spark-stats-data/train')

In [5]:
train.take(2)[1]

'  <row AnswerCount="0" Body="&lt;p&gt;I\'m developing an application in which users can create \'sections\' (à la subreddit in reddit), in which items/posts can be created and voted with a thumbs-up/down system.&lt;/p&gt;&#10;&#10;&lt;p&gt;&lt;a href=&quot;http://www.evanmiller.org/how-not-to-sort-by-average-rating.html&quot; rel=&quot;nofollow&quot;&gt;A great article&lt;/a&gt; guided me on how to sort these votes so that an item with a 100% positive response but with few votes won\'t get ranked over one with hundreds of votes and an acceptance of 80%. The article describes it pretty well.&lt;/p&gt;&#10;&#10;&lt;p&gt;However, I\'d like to discard the lowest-ranked items and this is where it gets tricky:&lt;/p&gt;&#10;&#10;&lt;ul&gt;&#10;&lt;li&gt;How could I know the minimum number of votes in order to discard it?&lt;/li&gt;&#10;&lt;li&gt;What is the score\'s threshold required to discard the item?&lt;/li&gt;&#10;&lt;/ul&gt;&#10;&#10;&lt;p&gt;As I said, there are sections, and each o

In [6]:
def split_tag_string(tags_string):
    tags = [s for s in re.split("<|>", tags_string) if s != '']
    tags_count = [(t, 1) for t in tags]
    return tags_count


def retrieve_tags(line):
    if '<row' in line:
        try:
            root = etree.fromstring(line)
            if 'Id' and 'Body' and "Tags" in root.attrib:
                tags_string = root.attrib['Tags']
                return (split_tag_string(tags_string))
            else:
                return ('empty')
        except XMLSyntaxError:
            return ('empty')
    else:
        return ('empty')

In [7]:
common_10_tags_with_count = train.map(retrieve_tags)\
    .filter(lambda x: x != 'empty')\
    .flatMap(lambda x: x)\
    .reduceByKey(lambda x, y: x+y)\
    .takeOrdered(10, key=lambda x: -x[1])

common_10_tags = [x[0] for x in common_10_tags_with_count]
common_10_tags

['r',
 'regression',
 'time-series',
 'machine-learning',
 'probability',
 'hypothesis-testing',
 'distributions',
 'self-study',
 'logistic',
 'correlation']

In [8]:
def tag_check(tags_string):
    tags= [s for s in re.split("<|>", tags_string) if s != '']
    for c_tag in common_10_tags:
        if c_tag in tags:
            return 1
    else:
        return 0

In [9]:
def retrieve_body_tag_class(line):
    if '<row' in line:
        try:
            root = etree.fromstring(line)
            if 'Id' and 'Body' and "Tags" in root.attrib:
                _id = root.attrib['Id']
                body =  ' '.join([s for s in re.split("\n", BeautifulSoup(root.attrib['Body']).get_text())])
                tags_string = root.attrib['Tags']
                return (body,tag_check(tags_string))
            else:
                return ('empty')
        except XMLSyntaxError:
            return ('empty')
    else:
        return ('empty')

In [10]:
train_data = train.map(retrieve_body_tag_class).filter(lambda x: x != 'empty').collect()

In [11]:
##checkpoint
print(train.map(retrieve_body_tag_class).filter(lambda x: x != 'Empty').filter(lambda x: x[1]==0).count())
print(train.map(retrieve_body_tag_class).filter(lambda x: x != 'Empty').filter(lambda x: x[1]==1).count())

19540
22525


In [12]:
##save the computation from the traing process by cache()
training_df = sqlContext.createDataFrame(train_data, ["body", "label"]).cache()
training_df.show()

+--------------------+-----+
|                body|label|
+--------------------+-----+
|I'm developing an...|    0|
|I would like to c...|    1|
|I have to generat...|    1|
|I have searched f...|    1|
|I have data with ...|    1|
|I have a 37-quest...|    0|
|I'm trying to do ...|    1|
|I am looking for ...|    0|
|As question, I ha...|    1|
|If i test two hyp...|    1|
|I have a mixed ef...|    0|
|If an exploratory...|    1|
|I am trying to fi...|    1|
|I want to estimat...|    1|
|I have a question...|    0|
|I have a data set...|    0|
|I am trying to in...|    0|
|I am trying to mo...|    1|
|I have data like ...|    0|
|I'm using the ets...|    1|
+--------------------+-----+
only showing top 20 rows



In [13]:
import numpy as np

In [14]:
# range(50, 2000, 100)

In [15]:
# np.logspace(0,10,100)

In [16]:
from pyspark.ml.classification import LogisticRegression

from pyspark.ml.classification import RandomForestClassifier

from pyspark.ml.feature import HashingTF, Tokenizer,RegexTokenizer

from pyspark.ml.feature import StopWordsRemover

from pyspark.ml import Pipeline


regex_tokenizer = RegexTokenizer(inputCol="body", outputCol="words", pattern="\\w")
tokenizer = Tokenizer(inputCol="body", outputCol="words")
remover = StopWordsRemover(inputCol=tokenizer.getOutputCol(), outputCol="filtered")

hashingTF = HashingTF(inputCol=remover.getOutputCol(), outputCol="features")
logreg = LogisticRegression(maxIter=1000000, regParam=0.8)

pipeline = Pipeline(stages=[tokenizer,remover, hashingTF, logreg])

from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
from pyspark.ml.evaluation import BinaryClassificationEvaluator


paramGrid = (ParamGridBuilder() 
    .addGrid(hashingTF.numFeatures, [2000])
    .addGrid(logreg.regParam, [10, 1, 0.1]) 
    .build())

crossval = CrossValidator(estimator=pipeline,
                          estimatorParamMaps=paramGrid,
                          evaluator=BinaryClassificationEvaluator(),
                          numFolds=5)

In [None]:
cvModel = crossval.fit(training_df)
best_model = cvModel.bestModel

In [None]:
test = sc.textFile('./spark-stats-data/test')

In [None]:
def retrieve_body_id(line):
    if '<row' in line:
        try:
            root = etree.fromstring(line)
            if 'Id' and 'Body' and "PostTypeId" in root.attrib:
                if int(root.attrib['PostTypeId']) == 1:
                    _id = root.attrib['Id']
                    body = ' '.join([s for s in re.split("\n", BeautifulSoup(root.attrib['Body']).get_text())])
                    tags_string = root.attrib['Tags']
                    return (body,int(_id))
                else:
                    return ('empty')
            else:
                return ('empty')
        except XMLSyntaxError:
            return ('empty')
    else:
        return ('empty')

In [None]:
test_data = test.map(retrieve_body_id).filter(lambda x: x != 'empty').collect()

In [None]:
test_df = sqlContext.createDataFrame(test_data, ["body", "id"])

In [None]:
test_df.show()

In [None]:
better_prediction = cvModel.transform(test_df)

In [None]:

better_prediction.show()

In [None]:

selected = better_prediction.select("id", "prediction").sort('id').collect()
classification = [int(x['prediction']) for x in selected]
len(classification)