In [1]:
import seaborn as sns
sns.set()

In [1]:
from static_grader import grader

# Spark Miniproject


Stack Overflow is a collaboratively edited question-and-answer site originally focused on programming topics. Because of the variety of features tracked, including a variety of feedback metrics, it allows for some open-ended analysis of user behavior on the site.

Stack Exchange (the parent organization) provides an anonymized [data dump](https://archive.org/details/stackexchange), and we'll use Spark to perform data manipulation, analysis, and machine learning on this data set. As a side note, there's also an online data explorer which allows you to query the data interactively.

*Consider*: Do we need to use Spark to work with this data set? What are our alternatives?

## Workflow


**All questions in this miniproject can be done locally in this notebook (i.e. on your Jupyter pod).**  

You are free to try running on a cloud service, but note that we have no resources to pay for you to try out these services.  (New users often get a limited amount of free credit to try a service.)  Also, the grader library will not be available, so you would have to get your answers into this notebook to submit to the grader.   See the appropriate lecture notebooks for information on how to use cloud services if you want to try them out.

Python example workflow when **not** running in a Jupyter notebook:

1. Edit source code in your `main.py` file, classes in a separate `classes.py` (class definitions need to be written in a separate file and then included at runtime).
1. If you are using a cloud service, in order to make your code more flexible, it's recommended to incorporate command-line arguments that specify the location of the input data and where output should be written.
``` python
# Command line arguments using sysv or argparse in Python
if __name__ == '__main__':
    main(ARGS.input_dir, ARGS.output_dir)
```
1. Run locally using the `spark-submit` program on a chunk using, eg., `$SPARK_HOME/bin/spark-submit --py-files src/classes.py src/main.py data/stats results/stats/`  Note that long jobs using `spark-submit` may not finish before your server gets automatically shut down (our server only checks for running Jupyter notebooks to avoid shutting down).  
1. Run on Amazon Web Services (AWS) once your testing and development are done.  Note that you will also have to load all of the input data on an AWS bucket.  (Similar statements apply if you were to use Google Cloud Platform (GCP) or other services.)  

General tips when using `spark-submit` or working on a cloud service:
* Try `cat output_dir/* | sort -n -t , -k 1.2 -o sorted_output` to concatenate your output files, which will be in `part-xxxxx` format.
* You can alternatively access an interactive PySpark shell on your Jupyter pod with this command: `$SPARK_HOME/bin/pyspark`

## Accessing the data


The data is available on S3 (`s3://dataincubator-course/spark-stack-data`). There are three sub-folders, `allUsers`, `allPosts`, and `allVotes` which contain Gzipped XML.  The `allPosts` sub-folder will contain data with the following format:

``` html
<row Body="&lt;p&gt;I always validate my web pages, and I recommend you do the same BUT many large company websites DO NOT and cannot validate because the importance of the website looking exactly the same on all systems requires rules to be broken. &lt;/p&gt;&#10;&#10;&lt;p&gt;In general, valid websites help your page look good even on odd configurations (like cell phones) so you should always at least try to make it validate.&lt;/p&gt;&#10;" CommentCount="0" CreationDate="2008-10-12T20:26:29.397" Id="195995" LastActivityDate="2008-10-12T20:26:29.397" OwnerDisplayName="Eric Wendelin" OwnerUserId="25066" ParentId="195973" PostTypeId="2" Score="0" />
```

Data from the much smaller `stats.stackexchange.com` website (called "Cross Validated") is available in the same format on S3 (`s3://dataincubator-course/spark-stats-data`). This smaller data set will be used below in most questions to avoid working with the full data set for every question.

The full schema is available as a text file, which can be downloaded with the following command.

In [3]:
!aws s3 cp s3://dataincubator-course/spark-stats-data/stack_exchange_schema.txt .

Completed 4.6 KiB/4.6 KiB (54.7 KiB/s) with 1 file(s) remainingdownload: s3://dataincubator-course/spark-stats-data/stack_exchange_schema.txt to ./stack_exchange_schema.txt


You can either get the data by running the appropriate S3 commands in the terminal, or by running this block for the smaller stats data set:

In [4]:
!mkdir -p spark-stats-data
!aws s3 sync --exclude '*' --include 'all*' s3://dataincubator-course/spark-stats-data/ ./spark-stats-data
!aws s3 sync --exclude '*' --include 'posts*zip' s3://dataincubator-course/spark-stats-data/ ./spark-stats-data

In [5]:
import os

# List all files and directories in the "allPosts" folder
all_posts_contents = os.listdir("./spark-stats-data/allPosts")

# Print the contents
for item in all_posts_contents:
    print(item)

part-00009.xml.gz
part-00007.xml.gz
part-00008.xml.gz
part-00006.xml.gz
part-00003.xml.gz
part-00004.xml.gz
part-00010.xml.gz
part-00005.xml.gz
part-00002.xml.gz
part-00000.xml.gz
part-00001.xml.gz


And to get the much larger full data set (be warned, this can take 20 or more minutes, so you may want to run it in the terminal to avoid locking up the notebook):

In [6]:
#office hours; there are empty lines that need to be filtered, all of the values are strings(including the numbers)
#process these things using RDDs, do not read the lines in as dataframes
#some rows are broken (not xml), throw out lines 
#use a library that parses xml (lxml.etree.fromstring)
#this is spark-stack-data but we want spark-stats-data for the questions
! zcat spark-stats-data/allPosts/part-00000.xml.gz | head

<?xml version="1.0" encoding="UTF-8"?>
<parent>
  <row AcceptedAnswerId="15" AnswerCount="5" Body="&lt;p&gt;How should I elicit prior distributions from experts when fitting a Bayesian model?&lt;/p&gt;&#10;" CommentCount="1" CreationDate="2010-07-19T19:12:12.510" FavoriteCount="17" Id="1" LastActivityDate="2010-09-15T21:08:26.077" OwnerUserId="8" PostTypeId="1" Score="26" Tags="&lt;bayesian&gt;&lt;prior&gt;&lt;elicitation&gt;" Title="Eliciting priors from experts" ViewCount="1457" />
  
  <row AcceptedAnswerId="59" AnswerCount="7" Body="&lt;p&gt;In many different statistical methods there is an &quot;assumption of normality&quot;.  What is &quot;normality&quot; and how do I know if there is normality?&lt;/p&gt;&#10;" CommentCount="1" CreationDate="2010-07-19T19:12:57.157" FavoriteCount="9" Id="2" LastActivityDate="2012-11-12T09:21:54.993" LastEditDate="2010-08-07T17:56:44.800" LastEditorUserId="88" OwnerUserId="24" PostTypeId="1" Score="25" Tags="&lt;distributions&gt;&lt;normality&

In [7]:
#office hours using gzip to open gzip files
import gzip
with gzip.open('spark-stack-data/allPosts/part-00000.xml.gz','rt') as f:
    lines = f.readlines(1000)
lines

['<?xml version="1.0" encoding="UTF-8"?>\n',
 '<parent>\n',
 '  <row AcceptedAnswerId="7" AnswerCount="13" Body="&lt;p&gt;I want to use a track-bar to change a form\'s opacity.&lt;/p&gt;&#10;&#10;&lt;p&gt;This is my code:&lt;/p&gt;&#10;&#10;&lt;pre&gt;&lt;code&gt;decimal trans = trackBar1.Value / 5000;&#10;this.Opacity = trans;&#10;&lt;/code&gt;&lt;/pre&gt;&#10;&#10;&lt;p&gt;When I try to build it, I get this error:&lt;/p&gt;&#10;&#10;&lt;blockquote&gt;&#10;  &lt;p&gt;Cannot implicitly convert type \'decimal\' to \'double\'.&lt;/p&gt;&#10;&lt;/blockquote&gt;&#10;&#10;&lt;p&gt;I tried making &lt;code&gt;trans&lt;/code&gt; a &lt;code&gt;double&lt;/code&gt;, but then the control doesn\'t work. This code has worked fine for me in VB.NET in the past. &lt;/p&gt;&#10;" CommentCount="1" CommunityOwnedDate="2012-10-31T16:42:47.213" CreationDate="2008-07-31T21:42:52.667" FavoriteCount="27" Id="4" LastActivityDate="2014-12-20T17:18:47.807" LastEditDate="2014-07-28T10:02:50.557" LastEditorDisplayN

In [8]:
#Large Dataset!!
!mkdir -p spark-stack-data
!aws s3 sync --exclude '*' --include 'all*' s3://dataincubator-course/spark-stack-data/ ./spark-stack-data

## Data input and parsing


Some rows are split across multiple lines; these can be discarded. Incorrectly formatted XML can also be ignored. It is enough to simply skip problematic rows, the loss of data will not significantly impact our results on these large data sets.

**You will need to handle XML parsing yourself.  Our solution uses `lxml.etree` in Python, and we would recommend using this tool yourself to handle the XML parsing.**

The goal should be to have a parsing function that can be applied to the input data to access any desired XML elements. You might find it convenient to represent the post, votes, users, etc. data using [`namedtuples`](https://docs.python.org/3/library/collections.html?highlight=namedtuple#collections.namedtuple).

## Structure

This miniproject is divided into two parts, called `spark_data` and `spark_ml`. The first part is doing data analysis in spark, on both a small data set and a large one. This consists of the first six questions in the notebook. The second part is using Spark ML to do machine learning, and is the last two questions. They are distinguished both by sections in the notebook and the question names.

## Spark data section

## Question 1: Bad XML


This first question is a simple question to test your parsing code. Create an RDD of Post objects where each Post is a valid row of XML from the small "Cross Validated" (stats.stackexchange.com) `allPosts` data set.

We are going to take several shortcuts to speed up and simplify our computations.  First, your parsing function should only attempt to parse rows that start with `<row` as these denote actual data entries. This should be done in Spark as the data is being read in from disk, without any pre-Spark processing. 

Return the total number of XML rows that started with `<row` that were subsequently **rejected** during your XML processing.  Note that the text is Unicode, and contains non-ASCII characters.  You may need to re-encode to UTF-8 (depending on your XML parser).

**Note that this cleaned data set will be used for questions 1-5.**  (For questions 6-8, you want to similarly remove improperly formatted XML from that data before proceeding further.)  

*Question*: Can you figure out what filters you need to put in place to avoid throwing parsing errors entirely?

In [9]:
#parse all xml lines and return count of lines that dont parse

In [10]:
from pyspark import SparkContext
import os
import lxml.etree as et
import gzip
import lxml

In [11]:
# Spark context initialization
sc = SparkContext.getOrCreate()

def process_xml_file(file_path):
    # Read the XML file using SparkContext
    with gzip.open(file_path, 'rt') as f:
        xml_rdd = sc.parallelize(f.readlines())

    # Count the total number of rows in the XML file
    total_rows = xml_rdd.count()

    # Filter rows that start with <row
    filtered_xml_rdd = xml_rdd.filter(lambda row: row.strip().startswith("<row"))

    # Parse XML rows and extract required attributes
    parsed_rdd = filtered_xml_rdd.map(parse_xml_row).filter(lambda post: post is not None)

    # Count the number of rows that were rejected
    rejected_rows = filtered_xml_rdd.count() - parsed_rdd.count()

    return parsed_rdd, rejected_rows

def parse_xml_row(xml_row):
    try:
        root = et.fromstring(xml_row.strip())
    except et.XMLSyntaxError:
        # If the XML cannot be parsed, return None to indicate a rejected row
        return None

    # Re-encode the XML row text to UTF-8
    xml_row_utf8 = xml_row.encode('utf-8')

    # Return the re-encoded XML row as a string
    return xml_row_utf8

# Directory path
all_posts_folder_path = "./spark-stats-data/allPosts"

# Get a list of all files in the folder
all_files = os.listdir(all_posts_folder_path)

# Get a list of all XML files in the folder with full paths
xml_files = [os.path.join(all_posts_folder_path, file) for file in all_files if file.endswith(".xml.gz")]

# Create an RDD of valid XML rows from all the files
parsed_rdds_and_rejected_counts = [process_xml_file(file) for file in xml_files]

# Collect and output the count of rejected rows for each file
rejected_rows_counts = [rejected_rows for _, rejected_rows in parsed_rdds_and_rejected_counts]

# Output the count of rejected rows for each file
for idx, rejected_rows in enumerate(rejected_rows_counts):
    print(f"File {idx}: Rejected Rows Count: {rejected_rows}")

# Combine all parsed RDDs into a single RDD
parsed_rdd = sc.union([parsed_rdd for parsed_rdd, _ in parsed_rdds_and_rejected_counts])

# Check the count of rows in the combined parsed RDD
total_rows_count = parsed_rdd.count()
print(f"Total Rows Count: {total_rows_count}")


Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


File 0: Rejected Rows Count: 1
File 1: Rejected Rows Count: 0
File 2: Rejected Rows Count: 0
File 3: Rejected Rows Count: 0
File 4: Rejected Rows Count: 1
File 5: Rejected Rows Count: 6
File 6: Rejected Rows Count: 7
File 7: Rejected Rows Count: 9
File 8: Rejected Rows Count: 220
File 9: Rejected Rows Count: 237
File 10: Rejected Rows Count: 300
Total Rows Count: 108741


In [12]:
sum(rejected_rows_counts)

781

In [13]:
grader.score('spark_data__bad_xml', sum(rejected_rows_counts))

Your score: 1.0000


## Question 2: Favorites and scores

We're interested in looking for useful patterns in the data.  If we look at the Post data again (the smaller set, `stats.stackexchange.com`), we see that many things about each post are recorded.  We're going to start by looking to see if there is a relationship between the number of times a post was favorited (the `FavoriteCount`) and the `Score`.  The score is the number of times the post was upvoted minus the number of times it was downvoted, so it is a measure of how much a post was liked.  We'd expect posts with a higher number of favorites to have better scores, since they're both measurements of how good the post is.

Let's aggregate posts by the number of favorites, and find the average score for each number of favorites.  Do this for the lowest 50 numbers of favorites.

**If any field in the Posts or Users is missing, such as the `FavoriteCount`, you should assume it is zero. _Make this assumption for all questions going forward._**

_Note:_ Before submitting, take a look at the numbers.  Do they follow the trend you expect?

**Checkpoints**

- Total score across all posts: 299469
- Mean of first 50 favorite counts (averaging the keys themselves): 24.76

In [14]:
#access RDD to show only rows for favorite count and score

In [15]:
import xml.etree.ElementTree as ET
from pyspark import SparkContext, SparkConf

#sc = SparkContext("local", "XML Parsing App")

# Parse the XML data and extract FavoriteCount and Score
def parse_xml_row(xml_str):
    root = ET.fromstring(xml_str)
    fav_count = root.get('FavoriteCount')
    if fav_count is None:
        fav_count = 0
    else:
        fav_count = int(fav_count)
    score = root.get('Score')
    if score is None:
        score = 0
    else:
        score = int(score)
    return fav_count, score

# Convert RDD elements into tuples of (FavoriteCount, Score)
parsed_rdd_tuples = parsed_rdd.map(parse_xml_row)

# Use combineByKey to aggregate and calculate sum and count of scores by FavoriteCount
favorite_count_score_data = parsed_rdd_tuples.combineByKey(
    lambda score: (score, 1),                  # Create a tuple with (score, count) for the first occurrence
    lambda x, score: (x[0] + score, x[1] + 1),  # Add the score and increment the count for subsequent occurrences
    lambda x, y: (x[0] + y[0], x[1] + y[1])    # Merge the sums and counts from different partitions
)

# Calculate the average score for each FavoriteCount
favorite_count_avg_score_data = favorite_count_score_data.mapValues(lambda v: v[0] / v[1])

# Sort the RDD by FavoriteCount and take the 50 lowest ones
sorted_favorite_counts = favorite_count_avg_score_data.sortBy(lambda x: x[0])
lowest_50_favorite_counts = sorted_favorite_counts.take(50)

In [16]:
favorite_score = [(0, 2.3398827696988396)]*50

grader.score('spark_data__favorite_score', lowest_50_favorite_counts)

Your score: 1.0000


## Question 3: Answer percentage


Investigate the correlation between a user's reputation and the kind of posts they make. For the 99 users with the highest reputation, single out posts which are either questions or answers and look at the percentage of these posts that are answers: *(answers / (answers + questions))*. 

Return a tuple of their **user ID** and this fraction.

You should also return (-1, fraction) to represent the case where you average over all users (so you will return 100 entries total).

Again, you only need to run this on the statistics overflow set.


#### Checkpoints

* Total questions: 52,060
* Total answers: 55,304
* Top 99 users' average reputation: 11893.464646464647

In [17]:
!aws s3 sync --exclude '*' --include 'users*zip' s3://dataincubator-course/spark-stats-data/ ./spark-stats-data

In [18]:
import os

# List all files and directories in the "allUsers" folder
all_posts_contents = os.listdir("./spark-stats-data/allUsers")

# Print the contents
for item in all_posts_contents:
    print(item)

part-00000.xml.gz
part-00001.xml.gz


In [19]:
#check out one of the files
! zcat spark-stats-data/allUsers/part-00000.xml.gz | head

<?xml version="1.0" encoding="UTF-8"?>
<parent>
  <row AboutMe="&lt;p&gt;Hi, I'm not really a person.&lt;/p&gt;&#10;&#10;&lt;p&gt;I'm a background process that helps keep this site clean!&lt;/p&gt;&#10;&#10;&lt;p&gt;I do things like&lt;/p&gt;&#10;&#10;&lt;ul&gt;&#10;&lt;li&gt;Randomly poke old unanswered questions every hour so they get some attention&lt;/li&gt;&#10;&lt;li&gt;Own community questions and answers so nobody gets unnecessary reputation from them&lt;/li&gt;&#10;&lt;li&gt;Own downvotes on spam/evil posts that get permanently deleted&lt;/li&gt;&#10;&lt;li&gt;Own suggested edits from anonymous users&lt;/li&gt;&#10;&lt;li&gt;&lt;a href=&quot;http://meta.stackexchange.com/a/92006&quot;&gt;Remove abandoned questions&lt;/a&gt;&lt;/li&gt;&#10;&lt;/ul&gt;&#10;" AccountId="-1" CreationDate="2010-07-19T06:55:26.860" DisplayName="Community" DownVotes="2330" Id="-1" LastAccessDate="2010-07-19T06:55:26.860" Location="on the server farm" Reputation="1" UpVotes="5831" Views="0" WebsiteUr

In [20]:
from pyspark import SparkContext
import os
import lxml.etree as et
import gzip
import lxml
import xml.etree.ElementTree as ET

In [21]:
# Spark context initialization
sc = SparkContext.getOrCreate()

def process_xml_file(file_path):
    # Read the XML file using SparkContext
    with gzip.open(file_path, 'rt') as f:
        xml_rdd = sc.parallelize(f.readlines())

    # Count the total number of rows in the XML file
    total_rows = xml_rdd.count()

    # Filter rows that start with <row
    filtered_xml_rdd = xml_rdd.filter(lambda row: row.strip().startswith("<row"))

    # Parse XML rows and extract required attributes
    parsed_rdd = filtered_xml_rdd.map(parse_xml_row).filter(lambda post: post is not None)

    # Count the number of rows that were rejected
    rejected_rows = filtered_xml_rdd.count() - parsed_rdd.count()

    return parsed_rdd, rejected_rows

def parse_xml_row(xml_row):
    try:
        root = et.fromstring(xml_row.strip())
    except et.XMLSyntaxError:
        # If the XML cannot be parsed, return None to indicate a rejected row
        return None

    # Re-encode the XML row text to UTF-8
    xml_row_utf8 = xml_row.encode('utf-8')

    # Return the re-encoded XML row as a string
    return xml_row_utf8

# Directory path
all_posts_folder_path = "./spark-stats-data/allUsers"

# Get a list of all files in the folder
all_files = os.listdir(all_posts_folder_path)

# Get a list of all XML files in the folder with full paths
xml_files = [os.path.join(all_posts_folder_path, file) for file in all_files if file.endswith(".xml.gz")]

# Create an RDD of valid XML rows from all the files
parsed_rdds_and_rejected_counts = [process_xml_file(file) for file in xml_files]

# Combine all parsed RDDs into a single RDD
parsed_rdd_user = sc.union([parsed_rdd for parsed_rdd, _ in parsed_rdds_and_rejected_counts])

In [22]:
# Parse the XML data and extract ID and Reputation
def parse_xml_row(xml_str):
    root = ET.fromstring(xml_str)
    ID = root.get('Id')
    if ID is None:
        ID = 0
    else:
        ID = int(ID)
    Reputation = root.get('Reputation')
    if Reputation is None:
        Reputation = 0
    else:
        Reputation = int(Reputation)
    return ID, Reputation

# Convert RDD elements into tuples of (ID, Reputation)..take the first entry from here for final output
user_rdd_tuples = parsed_rdd_user.map(parse_xml_row)

# Sort the RDD in descending order of Reputation and take the top 99 entries
top_99_rdd = user_rdd_tuples.sortBy(lambda x: x[1], ascending=False).take(99)

In [23]:
#Now make another filtered RDD for the OwnerUserID and PostTypeId from the parsed RDD from the second question

# Parse the XML data and extract OwnerUserID and PostTypeId
def parse_xml_row(xml_str):
    root = ET.fromstring(xml_str)
    OwnerUserID = root.get('OwnerUserId')
    if OwnerUserID is None:
        OwnerUserID = 0
    else:
        OwnerUserID = int(OwnerUserID)
    PostTypeId = root.get('PostTypeId')
    if PostTypeId is None:
        PostTypeId = 0
    else:
        PostTypeId = int(PostTypeId)
    return OwnerUserID, PostTypeId

# Convert RDD elements into tuples of (OwnerUserId, PostTypeId)
post_rdd_tuples = parsed_rdd.map(parse_xml_row)

# Extract IDs from top_99_rdd
top_99_ids = [id for id, _ in top_99_rdd]

# Filter post_rdd_tuples based on the IDs in top_99_ids
filtered_post_rdd = post_rdd_tuples.filter(lambda x: x[0] in top_99_ids)

In [None]:
# Calculate the number of answers and questions for each OwnerUserId
count_by_owner = filtered_post_rdd.map(lambda x: (x[0], (1 if x[1] == 2 else 0, 1 if x[1] == 1 else 0))) \
                                .reduceByKey(lambda x, y: (x[0] + y[0], x[1] + y[1]))

# Calculate the ratio of answers to total (answers + questions) for each OwnerUserId
ratio_by_owner = count_by_owner.mapValues(lambda x: x[0] / (x[0] + x[1]))

# Calculate the total number of answers and questions for all users
total_answers = count_by_owner.map(lambda x: x[1][0]).sum()
total_questions = count_by_owner.map(lambda x: x[1][1]).sum()

# Calculate the ratio of answers to total (answers + questions) across all users
overall_ratio = total_answers / (total_answers + total_questions)

# Add the additional entry with OwnerUserId = -1 and ratio = 'fraction'
additional_entry = sc.parallelize([(-1, overall_ratio)])

# Combine the results with the additional entry using union
results_with_additional = ratio_by_owner.map(lambda x: (x[0], x[1])).union(additional_entry)

# Collect the results as a list of tuples (OwnerUserId, ratio)
results = results_with_additional.collect()

In [None]:
answer_percentage = [(7071, 0.9107142857142857)] * 100

grader.score('spark_data__answer_percentage', results)

## Question 4: First question

We'd expect the first **question** a user asks to be indicative of their future behavior.  We'll dig more into that in the next problem, but for now let's see the relationship between reputation and how long it took each person to ask their first question.

For each user that asked a question, find the difference between when their account was created (`CreationDate` for the User) and when they asked their first question (`CreationDate` for their first question).  Return this time difference in days (round down, so 2.7 days counts as 2 days) for the 100 users with the highest reputation, in the form

`(UserId, Days)`

**Checkpoints**
- Users that asked a question: 23134
- Average number of days (round each user's days, then average): 30.1074258

In [None]:
#Load previously parsed RDDs for this question
import datetime
from pyspark.sql import SparkSession
from pyspark.sql.functions import col
from pyspark.sql import functions as F

# Create a SparkSession
spark = SparkSession.builder.getOrCreate()

In [None]:
# Parse the XML data and extract ID Reputation and CreationDate for USERS
def parse_xml_row(xml_str):
    root = ET.fromstring(xml_str)
    ID = root.get('Id')
    if ID is None:
        ID = 0
    else:
        ID = int(ID)
    
    Reputation = root.get('Reputation')
    if Reputation is None:
        Reputation = 0
    else:
        Reputation = int(Reputation)
        
    CreationDate = root.get('CreationDate')
    if CreationDate is None:
        CreationDate = datetime.datetime.min
    else:
        try:
            CreationDate = datetime.datetime.strptime(CreationDate, "%Y-%m-%dT%H:%M:%S.%f")
        except ValueError:
            # If the format doesn't match, handle the error or set an appropriate default value
            CreationDate = datetime.datetime.min
    
    return ID, CreationDate,Reputation

# Convert RDD elements into tuples of (ID, CreationDate)..take the first entry from here for final output
Q4user_rdd_tuples = parsed_rdd_user.map(parse_xml_row)

# Convert RDD elements into DataFrames
Q4user_df = Q4user_rdd_tuples.toDF(["ID", "CreationDate","Reputation"])

In [None]:
#Parse for Posts
def parse_xml_row(xml_str):
    root = ET.fromstring(xml_str)
    OwnerUserId = root.get('OwnerUserId')
    if OwnerUserId is None:
        OwnerUserId = 0
    else:
        OwnerUserId = int(OwnerUserId)
    
    PostCreationDate = root.get('CreationDate')
    if PostCreationDate is None:
        PostCreationDate = datetime.datetime.min
    else:
        try:
            PostCreationDate = datetime.datetime.strptime(PostCreationDate, "%Y-%m-%dT%H:%M:%S.%f")
        except ValueError:
            PostCreationDate = datetime.datetime.min
    
    PostTypeId = root.get('PostTypeId')
    if PostTypeId is None:
        PostTypeId = 0
    else:
        PostTypeId = int(PostTypeId)
        
    return OwnerUserId, PostCreationDate, PostTypeId

# Convert RDD elements into tuples of (OwnerUserId, CreationDate, PostTypeId)
Q4post_rdd_tuples = parsed_rdd.map(parse_xml_row)

# Filter out entries with PostTypeId as 1 (representing questions)
Q4post_rdd_tuples_filtered = Q4post_rdd_tuples.filter(lambda x: x[2] == 1)

# Convert RDD elements into DataFrame
Q4post_df = Q4post_rdd_tuples_filtered.toDF(["OwnerUserId", "CreationDate", "PostTypeId"])

# Group by OwnerUserId and return the earliest CreationDate for each user
earliest_dates_df = Q4post_df.groupBy("OwnerUserId").agg(F.min("CreationDate").alias("EarliestPostDate"))

In [None]:
# Join the DataFrames on ID and OwnerUserId
joined_df = Q4user_df.join(earliest_dates_df, Q4user_df["ID"] == earliest_dates_df["OwnerUserId"])

# Calculate the difference in days as a float
joined_df = joined_df.withColumn("TimeDifference", (F.col("EarliestPostDate").cast("long") - F.col("CreationDate").cast("long")) / 86400.0)

# Round down the TimeDifference to the nearest integer
joined_df = joined_df.withColumn("TimeDifference", F.floor(F.col("TimeDifference")))

In [None]:
# Select the top 100 rows from the ordered DataFrame
ordered_df = joined_df.orderBy(F.desc("Reputation"))
top_100_df = ordered_df.limit(100)

# Extract the 'ID' and 'TimeDifference' columns from the selected rows
result = top_100_df.select("ID", "TimeDifference").collect()

# Convert the result to a list of tuples
list_of_tuples = [(row['ID'], row['TimeDifference']) for row in result]

In [None]:
first_question = [(805, 669)] * 100

grader.score('spark_data__first_question', list_of_tuples)

## Question 5: Identify veterans


It can be interesting to think about what factors influence a user to remain active on the site over a long period of time. In order not to bias the results towards older users, we'll define a time window between 100 and 150 days after account creation. If the user has made a post in this time, we'll consider them active and well on their way to being veterans of the site; if not, they are inactive and were likely brief users.

*Consider*: What other parameterizations of "activity" could we use, and how would they differ in terms of splitting our user base?

*Consider*: What other biases are still not dealt with, after using the above approach?

Let's see if there are differences between the first ever question posts of "veterans" vs. "brief users". For each group separately, average the score, views, number of answers, and number of favorites of the users' **first question**. Remember, if the score, views, answers, or favorites is missing, you should assume it is zero.

*Consider*: What story could you tell from these numbers? How do the numbers support it?


In [None]:
#identify users first question then use that to say whos vet

In [None]:
#Load previously parsed RDDs for this question
import datetime
from pyspark.sql import SparkSession
from pyspark.sql.functions import col
from pyspark.sql import functions as F
from pyspark.sql.window import Window

# Create a SparkSession
spark = SparkSession.builder.getOrCreate()

In [None]:
# Parse the XML data and extract ID Reputation and CreationDate for USERS
def parse_xml_row(xml_str):
    root = ET.fromstring(xml_str)
    ID = root.get('Id')
    if ID is None:
        ID = 0
    else:
        ID = int(ID)
    
        UserCreationDate = root.get('CreationDate')
    if UserCreationDate is None:
        UserCreationDate = datetime.datetime.min
    else:
        try:
            UserCreationDate = datetime.datetime.strptime(UserCreationDate, "%Y-%m-%dT%H:%M:%S.%f")
        except ValueError:
            UserCreationDate = datetime.datetime.min
    
    return ID, UserCreationDate

# Convert RDD elements into tuples of (ID, CreationDate)..take the first entry from here for final output
Q5user_rdd_tuples = parsed_rdd_user.map(parse_xml_row)

# Convert RDD elements into DataFrames
Q5user_df = Q5user_rdd_tuples.toDF(["ID", "UserCreationDate"])

In [None]:
#Parse for Posts
def parse_xml_row(xml_str):
    root = ET.fromstring(xml_str)
    OwnerUserId = root.get('OwnerUserId')
    if OwnerUserId is None:
        OwnerUserId = 0
    else:
        OwnerUserId = int(OwnerUserId)
    
    PostCreationDate = root.get('CreationDate')
    if PostCreationDate is None:
        PostCreationDate = datetime.datetime.min
    else:
        try:
            PostCreationDate = datetime.datetime.strptime(PostCreationDate, "%Y-%m-%dT%H:%M:%S.%f")
        except ValueError:
            PostCreationDate = datetime.datetime.min
    
    PostTypeId = root.get('PostTypeId')
    if PostTypeId is None:
        PostTypeId = 0
    else:
        PostTypeId = int(PostTypeId)
        
    Score = root.get('Score')
    if Score is None:
        Score = 0
    else:
        Score = int(Score)
        
    FavoriteCount = root.get('FavoriteCount')
    if FavoriteCount is None:
        FavoriteCount = 0 
    else:
        FavoriteCount = int(FavoriteCount)

    AnswerCount = root.get('AnswerCount')
    if AnswerCount is None:
        AnswerCount = 0
    else:
        AnswerCount = int(AnswerCount)    
    
    Views = root.get('ViewCount')
    if Views is None:
        Views = 0
    else:
        Views = int(Views)
    
    return OwnerUserId, PostCreationDate, PostTypeId, Score, FavoriteCount,AnswerCount,Views

# Convert RDD elements into tuples of (OwnerUserId, CreationDate, PostTypeId)
Q5post_rdd_tuples = parsed_rdd.map(parse_xml_row)

# Convert RDD elements into DataFrame
Q5post_df = Q5post_rdd_tuples.toDF(["OwnerUserId", "CreationDate", "PostTypeId","Score","FavoriteCount","AnswerCount","Views"])

In [None]:
# Join the DataFrames on ID and OwnerUserId
Q5joined_df = Q5post_df.join(Q5user_df, Q5post_df["OwnerUserId"] == Q5user_df["ID"])

Q5joined_df = Q5joined_df.withColumn("TimeDifference", (F.col("CreationDate").cast("long") - F.col("UserCreationDate").cast("long")) / 86400.0)

In [None]:
# Define the conditions to categorize users as "Veteran" or "Brief"
veteran_condition = (Q5joined_df["TimeDifference"] >= 100) & (Q5joined_df["TimeDifference"] < 150)

# Use the 'max' function over a window partitioned by 'OwnerUserId' to create a flag
window = Window.partitionBy("OwnerUserId")
Q5joined_df = Q5joined_df.withColumn("IsVeteran", F.max(F.when(veteran_condition, 1).otherwise(0)).over(window))

# Use the 'when' function to apply the 'IsVeteran' flag and create the new column
Q5joined_df = Q5joined_df.withColumn("UserType", F.when(Q5joined_df["IsVeteran"] == 1, "Veteran").otherwise("Brief"))

# Drop the temporary 'IsVeteran' column
Q5joined_df = Q5joined_df.drop("IsVeteran")

In [None]:
#Filter for questions only
Q5joined_df = Q5joined_df.filter(Q5joined_df["PostTypeId"] == 1)

# Define a window specification to partition by OwnerUserId and order by CreationDate
window_spec = Window.partitionBy("OwnerUserId").orderBy("CreationDate")

# Add a new column "EarliestCreationDate" to store the earliest CreationDate for each OwnerUserId
Q5joined_df = Q5joined_df.withColumn("EarliestCreationDate", F.min("CreationDate").over(window_spec))

# Filter the DataFrame to keep only the rows with the earliest CreationDate
active_users_df = Q5joined_df.filter(Q5joined_df["CreationDate"] == Q5joined_df["EarliestCreationDate"])

In [None]:
vet_df = active_users_df.filter(active_users_df["UserType"] == "Veteran")
brief_df = active_users_df.filter(active_users_df["UserType"] == "Brief")

# Calculate averages for veterans
vet_averages = vet_df.agg(
    F.avg("Score").alias("vet_score"),
    F.avg("Views").alias("vet_views"),
    F.avg("AnswerCount").alias("vet_answers"),
    F.avg("FavoriteCount").alias("vet_favorites")
)

# Calculate averages for brief users
brief_averages = brief_df.agg(
    F.avg("Score").alias("brief_score"),
    F.avg("Views").alias("brief_views"),
    F.avg("AnswerCount").alias("brief_answers"),
    F.avg("FavoriteCount").alias("brief_favorites")
)

# Collect the averages as a dictionary
identify_veterans = vet_averages.crossJoin(brief_averages).collect()[0].asDict()

# Show the resulting dictionary
print(identify_veterans)

#### Checkpoints

* Total brief users: 24,864
* Total veteran users: 2,027

In [None]:


grader.score('spark_data__identify_veterans', identify_veterans)

## Question 6: Identify veterans&mdash;full


Same as above, but on the full Stack Exchange data set.

In [None]:
#this will take about an hour to run, but should run easily after testing it in the question above

In [24]:
from pyspark import SparkContext
import pyspark
import os
import lxml.etree as et
import gzip
import lxml
import xml.etree.ElementTree as ET
import datetime
from pyspark.sql import SparkSession
from pyspark.sql.functions import col
from pyspark.sql import functions as F
from pyspark.sql.window import Window

In [25]:
#conf = pyspark.SparkConf()
#conf.set("spark.executor.heartbeatInterval","119s")
# make sure the executor doesn't die/timeout
#sc = pyspark.SparkContext('local[*]', '', conf=conf)


# Spark context initialization
sc = SparkContext.getOrCreate()

#dont use parallizing, use textfile to read into spark since the data is massive
def process_xml_file(file_path):
    xml_rdd = sc.textFile(file_path)

    # Filter rows that start with <row
    filtered_xml_rdd = xml_rdd.filter(lambda row: row.strip().startswith("<row"))

    # Parse XML rows and extract required attributes
    parsed_rdd = filtered_xml_rdd.map(parse_xml_row).filter(lambda post: post is not None)

    # Count the number of rows that were rejected
    rejected_rows = filtered_xml_rdd.count() - parsed_rdd.count()

    return parsed_rdd, rejected_rows

def parse_xml_row(xml_row):
    try:
        root = et.fromstring(xml_row.strip())
    except et.XMLSyntaxError:
        # If the XML cannot be parsed, return None to indicate a rejected row
        return None

    # Re-encode the XML row text to UTF-8
    xml_row_utf8 = xml_row.encode('utf-8')

    # Return the re-encoded XML row as a string
    return xml_row_utf8

# Directory path
all_posts_folder_path = "./spark-stack-data/allPosts"

# Get a list of all files in the folder
all_files = os.listdir(all_posts_folder_path)

# Get a list of all XML files in the folder with full paths
xml_files = [os.path.join(all_posts_folder_path, file) for file in all_files if file.endswith(".xml.gz")]

# Create an RDD of valid XML rows from all the files
parsed_rdds_and_rejected_counts = [process_xml_file(file) for file in xml_files]

# Collect and output the count of rejected rows for each file
rejected_rows_counts = [rejected_rows for _, rejected_rows in parsed_rdds_and_rejected_counts]

# Combine all parsed RDDs into a single RDD
parsed_rdd = sc.union([parsed_rdd for parsed_rdd, _ in parsed_rdds_and_rejected_counts])


####


def process_xml_file(file_path):
    xml_rdd = sc.textFile(file_path)

    # Filter rows that start with <row
    filtered_xml_rdd = xml_rdd.filter(lambda row: row.strip().startswith("<row"))

    # Parse XML rows and extract required attributes
    parsed_rdd = filtered_xml_rdd.map(parse_xml_row).filter(lambda post: post is not None)

    # Count the number of rows that were rejected
    rejected_rows = filtered_xml_rdd.count() - parsed_rdd.count()

    return parsed_rdd, rejected_rows

def parse_xml_row(xml_row):
    try:
        root = et.fromstring(xml_row.strip())
    except et.XMLSyntaxError:
        # If the XML cannot be parsed, return None to indicate a rejected row
        return None

    # Re-encode the XML row text to UTF-8
    xml_row_utf8 = xml_row.encode('utf-8')

    # Return the re-encoded XML row as a string
    return xml_row_utf8

# Directory path
all_posts_folder_path = "./spark-stack-data/allUsers"

# Get a list of all files in the folder
all_files = os.listdir(all_posts_folder_path)

# Get a list of all XML files in the folder with full paths
xml_files = [os.path.join(all_posts_folder_path, file) for file in all_files if file.endswith(".xml.gz")]

# Create an RDD of valid XML rows from all the files
parsed_rdds_and_rejected_counts = [process_xml_file(file) for file in xml_files]

# Combine all parsed RDDs into a single RDD
parsed_rdd_user = sc.union([parsed_rdd for parsed_rdd, _ in parsed_rdds_and_rejected_counts])

In [26]:
######

# Create a SparkSession
spark = SparkSession.builder \
    .config("spark.sql.broadcastTimeout", "7200s") \
    .getOrCreate()

# Parse the XML data and extract ID Reputation and CreationDate for USERS
def parse_xml_row(xml_str):
    root = ET.fromstring(xml_str)
    ID = root.get('Id')
    if ID is None:
        ID = 0
    else:
        ID = int(ID)
    
        UserCreationDate = root.get('CreationDate')
    if UserCreationDate is None:
        UserCreationDate = datetime.datetime.min
    else:
        try:
            UserCreationDate = datetime.datetime.strptime(UserCreationDate, "%Y-%m-%dT%H:%M:%S.%f")
        except ValueError:
            UserCreationDate = datetime.datetime.min
    
    return ID, UserCreationDate

# Convert RDD elements into tuples of (ID, CreationDate)..take the first entry from here for final output
Q5user_rdd_tuples = parsed_rdd_user.map(parse_xml_row)

# Convert RDD elements into DataFrames
Q5user_df = Q5user_rdd_tuples.toDF(["ID", "UserCreationDate"])

#Parse for Posts
def parse_xml_row(xml_str):
    root = ET.fromstring(xml_str)
    OwnerUserId = root.get('OwnerUserId')
    if OwnerUserId is None:
        OwnerUserId = 0
    else:
        OwnerUserId = int(OwnerUserId)
    
    PostCreationDate = root.get('CreationDate')
    if PostCreationDate is None:
        PostCreationDate = datetime.datetime.min
    else:
        try:
            PostCreationDate = datetime.datetime.strptime(PostCreationDate, "%Y-%m-%dT%H:%M:%S.%f")
        except ValueError:
            PostCreationDate = datetime.datetime.min
    
    PostTypeId = root.get('PostTypeId')
    if PostTypeId is None:
        PostTypeId = 0
    else:
        PostTypeId = int(PostTypeId)
        
    Score = root.get('Score')
    if Score is None:
        Score = 0
    else:
        Score = int(Score)
        
    FavoriteCount = root.get('FavoriteCount')
    if FavoriteCount is None:
        FavoriteCount = 0 
    else:
        FavoriteCount = int(FavoriteCount)

    AnswerCount = root.get('AnswerCount')
    if AnswerCount is None:
        AnswerCount = 0
    else:
        AnswerCount = int(AnswerCount)    
    
    Views = root.get('ViewCount')
    if Views is None:
        Views = 0
    else:
        Views = int(Views)
    
    return OwnerUserId, PostCreationDate, PostTypeId, Score, FavoriteCount,AnswerCount,Views

# Convert RDD elements into tuples of (OwnerUserId, CreationDate, PostTypeId)
Q5post_rdd_tuples = parsed_rdd.map(parse_xml_row)

# Convert RDD elements into DataFrame
Q5post_df = Q5post_rdd_tuples.toDF(["OwnerUserId", "CreationDate", "PostTypeId","Score","FavoriteCount","AnswerCount","Views"])

# Join the DataFrames on ID and OwnerUserId
Q5joined_df = Q5post_df.join(Q5user_df, Q5post_df["OwnerUserId"] == Q5user_df["ID"])

Q5joined_df = Q5joined_df.withColumn("TimeDifference", (F.col("CreationDate").cast("long") - F.col("UserCreationDate").cast("long")) / 86400.0)

# Define the conditions to categorize users as "Veteran" or "Brief"
veteran_condition = (Q5joined_df["TimeDifference"] >= 100) & (Q5joined_df["TimeDifference"] < 150)

# Use the 'max' function over a window partitioned by 'OwnerUserId' to create a flag
window = Window.partitionBy("OwnerUserId")
Q5joined_df = Q5joined_df.withColumn("IsVeteran", F.max(F.when(veteran_condition, 1).otherwise(0)).over(window))

# Use the 'when' function to apply the 'IsVeteran' flag and create the new column
Q5joined_df = Q5joined_df.withColumn("UserType", F.when(Q5joined_df["IsVeteran"] == 1, "Veteran").otherwise("Brief"))

# Drop the temporary 'IsVeteran' column
Q5joined_df = Q5joined_df.drop("IsVeteran")

#Filter for questions only
Q5joined_df = Q5joined_df.filter(Q5joined_df["PostTypeId"] == 1)

# Define a window specification to partition by OwnerUserId and order by CreationDate
window_spec = Window.partitionBy("OwnerUserId").orderBy("CreationDate")

# Add a new column "EarliestCreationDate" to store the earliest CreationDate for each OwnerUserId
Q5joined_df = Q5joined_df.withColumn("EarliestCreationDate", F.min("CreationDate").over(window_spec))

# Filter the DataFrame to keep only the rows with the earliest CreationDate
active_users_df = Q5joined_df.filter(Q5joined_df["CreationDate"] == Q5joined_df["EarliestCreationDate"])

vet_df = active_users_df.filter(active_users_df["UserType"] == "Veteran")
brief_df = active_users_df.filter(active_users_df["UserType"] == "Brief")

In [29]:
#sc = SparkContext.getOrCreate()
# Create a SparkSession
#spark = SparkSession.builder \
#    .config("spark.sql.broadcastTimeout", "7200s") \
#    .getOrCreate()

# Calculate averages for veterans
vet_averages = vet_df.agg(
    F.avg("Score").alias("vet_score"),
    F.avg("Views").alias("vet_views"),
    F.avg("AnswerCount").alias("vet_answers"),
    F.avg("FavoriteCount").alias("vet_favorites")
)

# Calculate averages for brief users
brief_averages = brief_df.agg(
    F.avg("Score").alias("brief_score"),
    F.avg("Views").alias("brief_views"),
    F.avg("AnswerCount").alias("brief_answers"),
    F.avg("FavoriteCount").alias("brief_favorites")
)

# Collect the averages as a dictionary
identify_veterans_full = vet_averages.crossJoin(brief_averages).collect()[0].asDict()

# Show the resulting dictionary
print(identify_veterans_full)

{'vet_score': 2.2598437331442924, 'vet_views': 1844.0344896669696, 'vet_answers': 1.8426197044183144, 'vet_favorites': 0.8673157237744455, 'brief_score': 1.1307456144103445, 'brief_views': 1096.1519220732553, 'brief_answers': 1.5038565525030159, 'brief_favorites': 0.3861764445851408}


In [31]:
spark.stop()

#### Checkpoints

* Total brief users: 1,848,628
* Total veteran users: 288,285

In [30]:
grader.score('spark_data__identify_veterans_full', identify_veterans_full)

Your score: 1.0000


This ends the `spark_data` section.

## Spark ML questions

The questions from here forward are associated with the `spark_ml` prefix. They are working with Spark's ML library to do some NLP based analysis on the data.

## Question 7: Word2vec


Word2Vec is one approach for vectorizing text data. The vectorized representations of words in the vocabulary tend to be useful for predicting other words in the document, hence the famous example "vector('king') - vector('man') + vector('woman') ~= vector('queen')".

Let's see how good a Word2Vec model we can train using the **tags** of each Stack Exchange post as documents (this uses the full data set). Use the implementation of Word2Vec from Spark ML (this will require using DataFrames) to return a list of the top 25 closest synonyms to "ggplot2" and their similarity score in tuple format ("string", number).

The tags appear in the data as one string, you will need to separate them into individual tags. There is no need to further parse them beyond separating them.

#### Parameters


The dimensionality of the vector space should be 100. The random seed should be 42 in `PySpark`.


#### Checkpoints

* Mean of the top 25 cosine similarities: 0.8012362027168274

In [3]:
from pyspark import SparkContext
import pyspark
import os
import lxml.etree as et
import gzip
import lxml
import xml.etree.ElementTree as ET
import datetime
from pyspark.sql import SparkSession
from pyspark.sql.functions import col
from pyspark.sql import functions as F

In [31]:
# Initialize Spark session
spark = SparkSession.builder.appName("Word2VecExample").getOrCreate()

# Initialize Spark context
sc = spark.sparkContext

#dont use parallizing, use textfile to read into spark since the data is massive
def process_xml_file(file_path):
    xml_rdd = sc.textFile(file_path)

    # Filter rows that start with <row
    filtered_xml_rdd = xml_rdd.filter(lambda row: row.strip().startswith("<row"))

    # Parse XML rows and extract required attributes
    parsed_rdd = filtered_xml_rdd.map(parse_xml_row).filter(lambda post: post is not None)

    # Count the number of rows that were rejected
    rejected_rows = filtered_xml_rdd.count() - parsed_rdd.count()

    return parsed_rdd, rejected_rows

def parse_xml_row(xml_row):
    try:
        root = et.fromstring(xml_row.strip())
    except et.XMLSyntaxError:
        # If the XML cannot be parsed, return None to indicate a rejected row
        return None

    # Re-encode the XML row text to UTF-8
    xml_row_utf8 = xml_row.encode('utf-8')

    # Return the re-encoded XML row as a string
    return xml_row_utf8

# Directory path
all_posts_folder_path = "./spark-stack-data/allPosts"

# Get a list of all files in the folder
all_files = os.listdir(all_posts_folder_path)

# Get a list of all XML files in the folder with full paths
xml_files = [os.path.join(all_posts_folder_path, file) for file in all_files if file.endswith(".xml.gz")]

# Create an RDD of valid XML rows from all the files
parsed_rdds_and_rejected_counts = [process_xml_file(file) for file in xml_files]

# Collect and output the count of rejected rows for each file
rejected_rows_counts = [rejected_rows for _, rejected_rows in parsed_rdds_and_rejected_counts]

# Combine all parsed RDDs into a single RDD
parsed_rdd = sc.union([parsed_rdd for parsed_rdd, _ in parsed_rdds_and_rejected_counts])

In [32]:
import xml.etree.ElementTree as ET
from pyspark.sql import SparkSession
from pyspark.sql.types import StringType, StructType, StructField
from pyspark.ml.feature import Word2Vec
from pyspark.sql.functions import collect_list

# Initialize Spark session
#spark = SparkSession.builder.appName("XMLParsing").getOrCreate()

# Parse for Tags and OwnerUserId
def parse_xml_row(xml_str):
    root = ET.fromstring(xml_str)
    owner_user_id = root.get('Id')
    tags = root.get('Tags')
    
    if tags is None:
        return []  # Return an empty list if Tags attribute is missing
    else:
        tag_list = [tag.strip('<>') for tag in tags.split('><')]
        return [(owner_user_id, tag) for tag in tag_list]

# Apply the parsing function to the RDD
tags_owner_rdd = parsed_rdd.flatMap(parse_xml_row)

# Define schema for DataFrame
schema = StructType([
    StructField("Id", StringType(), True),
    StructField("Tag", StringType(), True)
])

# Create DataFrame from RDD with defined schema
tags_owner_df = spark.createDataFrame(tags_owner_rdd, schema)

# Group tags by OwnerUserId and collect them into an array
grouped_tags_df = tags_owner_df.groupBy("Id").agg(collect_list("Tag").alias("Tags"))

In [33]:
# Create Word2Vec model
word2Vec = Word2Vec(vectorSize=100, minCount=10, inputCol="Tags", outputCol="result",seed=42)
model = word2Vec.fit(grouped_tags_df)

# Find synonyms for "ggplot2"
synonyms = model.findSynonyms("ggplot2", 25)

# Extract synonyms and their similarity scores
result = [(row.word, row.similarity) for row in synonyms.collect()]

# Show the result
#for word, similarity in result:
#    print(f"('{word}', {similarity})")

In [34]:
word2vec = [("data.frame", 0.772650957107544)] * 25

grader.score('spark_ml__word2vec', result)

Your score: 1.0000


## Question 8: Classification


We'd like to see if we can predict the tags of a **question** from its body text. Instead of predicting specific tags, we will instead try to predict if a question contains one of the top ten most common tags.  

To this end, we have separated out a train and a test set from the original data.  The training and tests sets were downloaded with the stats data at the beginning of the notebook.  You can also get them from S3:
  * `s3://dataincubator-course/spark-stats-data/posts_train.zip`
  * `s3://dataincubator-course/spark-stats-data/posts_test.zip`

This will involve two steps: first, find the ten most common tags for questions in the training data set (the tags have been removed from the test set). Then train a learner to predict from the text of the question (the `Body` attribute) if it should have one of those ten tags in it - you will need to process the question text with NLP techniques such as splitting the text into tokens.

Since we can't reliably pickle Spark models, instead return a list of your predictions, sorted by the question's `Id`.  This sorting is very important, as our grader expects the results to be submitted in a particular order. These predictions should be `0` if the question isn't expected to have a tag in the top ten, and `1` if it is.

As an example, if our top tags include `spark` and `python`, and we had the following questions:

```
<row Body="..." Id="1740" Tags="<machine-learning><spark><regression>" ... />
<row Body="..." Id="723" Tags="<statistics><neurons>" ... />
<row Body="..." Id="2740" Tags="<functional><python><spark><pyspark>" ... />
```

We would expect to return `[0, 1, 1]` (for the order `[723, 1740, 2740]`).  You may need to do some format manipulation in your DataFrame to get this.

#### Checkpoints

- Number of training posts with a tag in the top 10: `22525`
- Number without: `19540`

In [35]:
!unzip -u -d spark-stats-data/train spark-stats-data/posts_train.zip
!unzip -u -d spark-stats-data/test spark-stats-data/posts_test.zip

Archive:  spark-stats-data/posts_train.zip
  inflating: spark-stats-data/train/part-00001  
  inflating: spark-stats-data/train/part-00002  
  inflating: spark-stats-data/train/part-00003  
  inflating: spark-stats-data/train/part-00004  
  inflating: spark-stats-data/train/part-00005  
  inflating: spark-stats-data/train/part-00006  
  inflating: spark-stats-data/train/part-00007  
  inflating: spark-stats-data/train/part-00008  
  inflating: spark-stats-data/train/part-00009  
  inflating: spark-stats-data/train/part-00010  
Archive:  spark-stats-data/posts_test.zip
  inflating: spark-stats-data/test/part-00001  
  inflating: spark-stats-data/test/part-00002  
  inflating: spark-stats-data/test/part-00003  
  inflating: spark-stats-data/test/part-00004  
  inflating: spark-stats-data/test/part-00005  
  inflating: spark-stats-data/test/part-00006  
  inflating: spark-stats-data/test/part-00007  
  inflating: spark-stats-data/test/part-00008  
  inflating: spark-stats-data/test/part-0

In [21]:
import lxml.etree as et
import os
from pyspark.sql import SparkSession
from pyspark.ml.feature import HashingTF,Tokenizer, StopWordsRemover, CountVectorizer
from pyspark.ml.classification import LogisticRegression

# Initialize Spark session
spark = SparkSession.builder.appName("TagPrediction").config("spark.seed", 42).getOrCreate()

In [22]:
# Initialize Spark context
sc = spark.sparkContext

#dont use parallizing, use textfile to read into spark since the data is massive
def process_xml_file(file_path):
    xml_rdd = sc.textFile(file_path)

    # Filter rows that start with <row
    filtered_xml_rdd = xml_rdd.filter(lambda row: row.strip().startswith("<row"))

    # Parse XML rows and extract required attributes
    parsed_rdd = filtered_xml_rdd.map(parse_xml_row).filter(lambda post: post is not None)

    # Count the number of rows that were rejected
    rejected_rows = filtered_xml_rdd.count() - parsed_rdd.count()

    return parsed_rdd, rejected_rows

def parse_xml_row(xml_row):
    try:
        root = et.fromstring(xml_row.strip())
    except et.XMLSyntaxError:
        # If the XML cannot be parsed, return None to indicate a rejected row
        return None

    # Re-encode the XML row text to UTF-8
    xml_row_utf8 = xml_row.encode('utf-8')

    # Return the re-encoded XML row as a string
    return xml_row_utf8

# Directory path (test data)
all_posts_folder_path = "spark-stats-data/test"

# Get a list of all files in the folder
all_files = os.listdir(all_posts_folder_path)

# Get a list of all XML files in the folder with full paths
xml_files = [os.path.join(all_posts_folder_path, file) for file in all_files]

# Create an RDD of valid XML rows from all the files
parsed_rdds_and_rejected_counts = [process_xml_file(file) for file in xml_files]

# Combine all parsed RDDs into a single RDD
parsed_rdd_test = sc.union([parsed_rdd for parsed_rdd, _ in parsed_rdds_and_rejected_counts])

# Convert byte-string to string and filter rows with PostTypeId="1"
parsed_rdd_test = parsed_rdd_test.filter(lambda xml_str: b'PostTypeId="1"' in xml_str).map(lambda xml_bytes: xml_bytes.decode('utf-8'))

# Directory path (Training data)
all_posts_folder_path_train = "spark-stats-data/train"

# Get a list of all files in the folder
all_files = os.listdir(all_posts_folder_path_train)

# Get a list of all XML files in the folder with full paths
xml_files = [os.path.join(all_posts_folder_path_train, file) for file in all_files]

# Create an RDD of valid XML rows from all the files
parsed_rdds_and_rejected_counts = [process_xml_file(file) for file in xml_files]

# Combine all parsed RDDs into a single RDD
parsed_rdd_train = sc.union([parsed_rdd_train for parsed_rdd_train, _ in parsed_rdds_and_rejected_counts])

# Convert byte-string to string and filter rows with PostTypeId="1"
parsed_rdd_train = parsed_rdd_train.filter(lambda xml_str: b'PostTypeId="1"' in xml_str).map(lambda xml_bytes: xml_bytes.decode('utf-8'))

In [23]:
import xml.etree.ElementTree as ET
from pyspark.sql.types import StringType, StructType, StructField
from pyspark.sql.functions import collect_list
from pyspark.sql.functions import explode, col
from pyspark.sql.functions import lit
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
from pyspark.ml.evaluation import BinaryClassificationEvaluator

In [24]:

# Initialize Spark session
#spark = SparkSession.builder.appName("XMLParsing").getOrCreate()

# Parse for Body, Id, PostTypeId, and Tags
def parse_xml_row(xml_str):
    root = ET.fromstring(xml_str)
    body = root.get('Body')
    owner_user_id = root.get('Id')
    post_type_id = root.get('PostTypeId')
    tags = root.get('Tags')
    
    if tags is None:
        return []  # Return an empty list if Tags attribute is missing
    else:
        tag_list = [tag.strip('<>') for tag in tags.split('><')]
        return [(body, owner_user_id, post_type_id, tag) for tag in tag_list]

# Apply the parsing function to the RDD
tags_rdd_train = parsed_rdd_train.flatMap(parse_xml_row)

# Define schema for DataFrame
schema = StructType([
    StructField("Body", StringType(), True),
    StructField("Id", StringType(), True),
    StructField("PostTypeId", StringType(), True),
    StructField("Tags", StringType(), True)
])

# Create DataFrame from RDD with defined schema
train_tags_df = spark.createDataFrame(tags_rdd_train, schema)

# Apply the parsing function to the RDD
tags_test_rdd = parsed_rdd_test.flatMap(parse_xml_row)

# Create DataFrame from RDD with defined schema
tags_test_df = spark.createDataFrame(tags_test_rdd, schema)

In [25]:
# Group tags by OwnerUserId and collect them into an array
train_tags_df_10 = train_tags_df.groupBy("Id","Body").agg(collect_list("Tags").alias("Tags"))

# Flatten the DataFrame based on the 'Tags' column
flattened_df = train_tags_df_10.select("Id", "Body", explode(col("Tags")).alias("Tag"))

# Group by 'Tags' and count the occurrences
tag_counts = flattened_df.groupBy("Tag").count()

# Sort the results in descending order based on tag counts
sorted_tag_counts = tag_counts.orderBy(col("count").desc())

# Select the top 10 occurrences
top_10_tag_counts = sorted_tag_counts.limit(10)

# Show the top 10 tag occurrences
#top_10_tag_counts.show()

In [26]:
from pyspark.sql.functions import when, array_contains

# Extract the top 10 tags from the DataFrame
top_10_tags = top_10_tag_counts.select("Tag").rdd.flatMap(lambda x: x).collect()

# Convert the list of top 10 tags to a set for efficient lookup
top_10_tags_set = set(top_10_tags)

# Create a new column 'label' indicating whether any tag matches a top 10 tag
train_tags_df_with_label = train_tags_df_10.withColumn("label", 
                                                       when(array_contains(col("Tags"), top_10_tags[0]) |
                                                            array_contains(col("Tags"), top_10_tags[1]) |
                                                            array_contains(col("Tags"), top_10_tags[2]) |
                                                            array_contains(col("Tags"), top_10_tags[3]) |
                                                            array_contains(col("Tags"), top_10_tags[4]) |
                                                            array_contains(col("Tags"), top_10_tags[5]) |
                                                            array_contains(col("Tags"), top_10_tags[6]) |
                                                            array_contains(col("Tags"), top_10_tags[7]) |
                                                            array_contains(col("Tags"), top_10_tags[8]) |
                                                            array_contains(col("Tags"), top_10_tags[9]),
                                                            1).otherwise(0))


In [27]:
# Cast 'label' column to integer
train_tags_df_with_label = train_tags_df_with_label.withColumn("label", col("label").cast("int"))

# Cast 'Id' column to integer
train_tags_df_with_label = train_tags_df_with_label.withColumn("Id", col("Id").cast("int"))

In [28]:
# Cast 'Id' column to integer
tags_test_df = tags_test_df.withColumn("Id", col("Id").cast("int"))

In [29]:
#Set seed at top, lower vocab size (runs faster), lightly increase regParam from 1.0

vocab_size = 100000

tokenizer = Tokenizer(inputCol='Body', outputCol='words')
#hashingTF = HashingTF(inputCol=tokenizer.getOutputCol(), outputCol='features')
count_vec = CountVectorizer(inputCol=tokenizer.getOutputCol(), outputCol='features', vocabSize=vocab_size)
logreg = LogisticRegression(maxIter=22, regParam=1.05).setLabelCol('label')

tokens = tokenizer.transform(train_tags_df_with_label)
#hashes = hashingTF.transform(tokens)
countmodel = count_vec.fit(tokens)
counts = countmodel.transform(tokens)
model = logreg.fit(counts)

In [30]:
#summary = model.summary

#print(f'Accuracy on training set: {summary.accuracy}')
#print(f'Precision on training set (by label): {summary.precisionByLabel}')
#print(f'Recall on training set (by label): {summary.recallByLabel}')
#print(f'Areas under the ROC: {summary.areaUnderROC}')

#Are you over or under fitting?

#Can be determined by looking at training vs test data, if the model is doing better on 
#the traininig data(or if you have a lot of training data), you may be overfitting, this is where 
#hyper parameter tuning comes in

In [31]:
# Make predictions on test documents
test_tokens = tokenizer.transform(tags_test_df)
test_counts = countmodel.transform(test_tokens)

predictions = model.transform(test_counts)
selected = predictions.select('Id','Body', 'prediction')

In [32]:
# Order the DataFrame by 'Id' column
ordered_df = selected.orderBy("Id")

# Select the 'prediction' column and collect the values as a list
prediction_list = ordered_df.select("prediction").rdd.flatMap(lambda x: x).collect()

# Convert the collected values to integers
prediction_int_list = [int(prediction) for prediction in prediction_list]


In [33]:
#p tags i tags and end line characters

#body = re.sub(r'<p>|</p>|<i>|</i>|/n',",body)

In [34]:
#what are the top 10 tags, then build prediction
#some tags have dashes
#some have numbers

In [35]:
classification = [0] * 4649

grader.score('spark_ml__classification', prediction_int_list)

Your score: 0.9321


## K-means (ungraded)


From your trained Word2Vec model, pass the vectors into a K-means clustering algorithm. Create a plot of the sum of squared error by calculating the square root of the sum of the squared distances for each point and its assigned cluster. For an independent variable use either the number of clusters k or the dimension of the Word2Vec vectorization.

*Copyright &copy; 2023 Pragmatic Institute. This content is licensed solely for personal use. Redistribution or publication of this material is strictly prohibited.*