In [1]:
import pyspark as ps
import warnings
from pyspark.sql import *
from pyspark.sql import functions as F
from src.preprocessing import cleanhtml_test
from pyspark.sql.types import StringType
import pandas as pd


In [2]:
try:
    # we try to create a SparkContext to work locally on all cpus available
    sc = ps.SparkContext('local[4]')
    print("Just created a SparkContext")
except ValueError:
    # give a warning if SparkContext already exists (for use inside pyspark)
    warnings.warn("SparkContext already exists in this scope")

  import sys


In [5]:
sc

### Load in the data into two tables:

In [6]:
# infile = 'csvs/posts.csv'
# outfile = 'csvs/dev_set.csv'
# models.create_dev_set(infile, outfile)
df = pd.read_csv('csvs/dev_set.csv')


In [7]:
test_set = spark.read.csv('csvs/dev_set.csv', 
                          header=True, 
                          encoding = 'utf8',
                          quote ='"',
                          escape=None,
                          multiLine=True)

In [8]:
test_set.show(5)

+---+-------+--------------------+--------------------+-----+--------+--------+
|_c0|post_Id|               title|                body|score|   views|comments|
+---+-------+--------------------+--------------------+-----+--------+--------+
|  0|      4|While applying op...|<p>I want to use ...|  543| 34799.0|       1|
|  1|      6|Percentage width ...|<p>I have an abso...|  241| 15696.0|       0|
|  2|      7|                null|<p>An explicit ca...|  391|    null|       0|
|  3|      9|How do I calculat...|<p>Given a <code>...| 1716|439828.0|      16|
|  4|     11|Calculate relativ...|<p>Given a specif...| 1286|130055.0|       3|
+---+-------+--------------------+--------------------+-----+--------+--------+
only showing top 5 rows



In [9]:
df = spark.read.csv('csvs/posts.csv',
                         header=True,       # use headers or not
                         inferSchema=True)  # do we infer schema or not ?


In [10]:
tags = spark.read.csv('csvs/tags_posts_rel.csv',
                         header=True,       # use headers or not
                         quote='"',         # char for quotes
                         sep=",",           # char for separation
                         inferSchema=True)

In [11]:
df.show(10)

+-------+--------------------+--------------------+-----+------+--------+
|post_Id|               title|                body|score| views|comments|
+-------+--------------------+--------------------+-----+------+--------+
|      4|While applying op...|<p>I want to use ...|  543| 34799|       1|
|      6|Percentage width ...|<p>I have an abso...|  241| 15696|       0|
|      7|                null|<p>An explicit ca...|  391|  null|       0|
|      9|How do I calculat...|<p>Given a <code>...| 1716|439828|      16|
|     11|Calculate relativ...|<p>Given a specif...| 1286|130055|       3|
|     12|                null|<p>Here's how I d...|  313|  null|      10|
|     13|Determine a User'...|<p>Is there any s...|  519|149911|       6|
|     14|Difference betwee...|<p>What is the di...|  352|102761|       3|
|     16|Filling a DataSet...|<p>How do you exp...|  106| 77797|       0|
|     17|Binary Data in MySQL|<p>How do I store...|  159| 61581|       3|
+-------+--------------------+--------

In [12]:
tags.show(2)

+-------+--------+
|post_Id|  tag_id|
+-------+--------+
|      4|      c#|
|      4|winforms|
+-------+--------+
only showing top 2 rows



In [13]:
df.count()

39646923

In [14]:
df.printSchema()

root
 |-- post_Id: integer (nullable = true)
 |-- title: string (nullable = true)
 |-- body: string (nullable = true)
 |-- score: integer (nullable = true)
 |-- views: integer (nullable = true)
 |-- comments: integer (nullable = true)



In [15]:
tags.printSchema()

root
 |-- post_Id: integer (nullable = true)
 |-- tag_id: string (nullable = true)



### Going to first take a look at the tags and see how many we are working with here

In [16]:
tags.registerTempTable('post_tags')

In [17]:
spark.sql("""
            SELECT DISTINCT tag_id
            FROM post_tags
            """).count()

51672

Lets select only tags that have over 50k posts, that should give us plenty to work with

In [18]:
tags_with_100k = spark.sql("""
                    SELECT tag_id, Count(post_Id)
                    FROM post_tags
                    GROUP BY tag_id
                    HAVING COUNT(post_Id) > 200000
                    """)
tags_with_100k.registerTempTable('top_tags')
tags_with_100k.count()


25

In [19]:
tags_with_100k.show(100)

+-------------+--------------+
|       tag_id|count(post_Id)|
+-------------+--------------+
|       iphone|        218782|
|      android|       1081802|
|      node.js|        216420|
|           c#|       1188414|
|         html|        733142|
|      asp.net|        330632|
|         json|        228383|
|        mysql|        509242|
|       jquery|        900794|
|   javascript|       1576130|
|          css|        524689|
|       arrays|        257543|
|          sql|        430275|
|   sql-server|        226832|
|          c++|        559729|
|            c|        272952|
|  objective-c|        282994|
|         java|       1385397|
|            r|        227621|
|          php|       1176839|
|         .net|        265331|
|          ios|        554171|
|       python|        914075|
|    angularjs|        250039|
|ruby-on-rails|        288879|
+-------------+--------------+



Now that we have all those tags lets fish out relevant ids and posts we will be working with this will become our data to analyze

In [20]:
##Lets fish out posts with tags with over 100k
inner_join = spark.sql("""
                    SELECT DISTINCT post_tags.post_Id as post_id
                    FROM post_tags
                    INNER JOIN top_tags
                    ON post_tags.tag_id = top_tags.tag_id
                    """)


In [21]:
inner_join.show(2)

+-------+
|post_id|
+-------+
| 351954|
| 491584|
+-------+
only showing top 2 rows



In [22]:
df.registerTempTable('all_posts')
inner_join.registerTempTable('relevant_ids')
relevant_posts = spark.sql("""
                            SELECT all_posts.post_Id, CONCAT(all_posts.title, ' ', all_posts.body) as text
                            FROM all_posts
                            INNER JOIN relevant_ids
                            ON all_posts.post_Id = relevant_ids.post_id
                            """)
relevant_posts.show(5)

+-------+--------------------+
|post_Id|                text|
+-------+--------------------+
|    833|Editing database ...|
|   1829|How do I make a m...|
|   6658|JUnit vs TestNG <...|
|   7880|How do you open a...|
|   9376|ILMerge Best Prac...|
+-------+--------------------+
only showing top 5 rows



In [21]:
relevant_posts.count()

10773100

### Now lets clean the body of the text, code vs text

In [22]:
test_set.show()

+---+-------+--------------------+--------------------+-----+--------+--------+
|_c0|post_Id|               title|                body|score|   views|comments|
+---+-------+--------------------+--------------------+-----+--------+--------+
|  0|      4|While applying op...|<p>I want to use ...|  543| 34799.0|       1|
|  1|      6|Percentage width ...|<p>I have an abso...|  241| 15696.0|       0|
|  2|      7|                null|<p>An explicit ca...|  391|    null|       0|
|  3|      9|How do I calculat...|<p>Given a <code>...| 1716|439828.0|      16|
|  4|     11|Calculate relativ...|<p>Given a specif...| 1286|130055.0|       3|
|  5|     12|                null|<p>Here's how I d...|  313|    null|      10|
|  6|     13|Determine a User'...|<p>Is there any s...|  519|149911.0|       6|
|  7|     14|Difference betwee...|<p>What is the di...|  352|102761.0|       3|
|  8|     16|Filling a DataSet...|<p>How do you exp...|  106| 77797.0|       0|
|  9|     17|Binary Data in MySQL|<p>How

In [23]:
cleaner_udf = F.udf(cleanhtml_test, StringType())
cleaned_data = relevant_posts.withColumn("cleaned", cleaner_udf(relevant_posts.text))

In [24]:
#dev_set, hold_set = relevant_posts.randomSplit([0.7,0.3])


In [25]:
#hold_set.write.csv('holdset.csv')

In [26]:
cleaned_data.registerTempTable("clean_data")
clean_stuff = spark.sql("""
                            SELECT post_Id, text
                            FROM clean_data
                            """)
clean_stuff.show(2)
dev_set, hold_set = clean_stuff.randomSplit([0.7,0.3])

# cleaned_data.write.csv('csvs/cleaned_data.csv')


+-------+--------------------+
|post_Id|                text|
+-------+--------------------+
|    833|Editing database ...|
|   1829|How do I make a m...|
+-------+--------------------+
only showing top 2 rows



In [27]:
# dev_set.write.csv('clean_dev_set')
# hold_set.write.csv('clean_holdout_set')


In [28]:
# dev_set.registerTempTable("dev_set")
# dev_set = spark.sql("""
#                             SELECT post_Id, title, cleaned
#                             FROM dev_set
#                             """)
# # dev_set.write.csv('dev_set.csv')
# dev_set.show(2)
# # cleaned_body = cleaned_data.select("cleaned")
# # cleaned_body.write.csv('body_text_only')