In [1]:
import findspark
findspark.init('C:\spark\spark-2.2.1-bin-hadoop2.7')

import numpy as np
import pandas as pd

from pyspark import SparkContext
from pyspark.sql import SparkSession
from pyspark.sql.types import *
from pyspark.sql.functions import udf
from pyspark.sql import functions

from bs4 import BeautifulSoup

from nltk.tokenize import word_tokenize
from nltk.tokenize import RegexpTokenizer
from nltk.corpus import stopwords
from nltk.stem import PorterStemmer

import matplotlib.pyplot as plt
%matplotlib inline

import datetime

In [2]:
sc = SparkContext().getOrCreate()
spark = SparkSession.builder.appName('comment clean').getOrCreate()
# sc.stop()
# spark.stop()

### read file to dataframe

In [3]:
inputloc = "C:/Users/LunaS/Desktop/2018-01/CMPT 733/project/SOCC/raw/gnm_comments.csv"
schema = StructType([
    StructField('article_id', IntegerType(), False),
    StructField('comment_counter', StringType(), False),
    StructField('comment_author', StringType(), False),
    StructField('timestamp', FloatType(), False),
    StructField('post_time', StringType(), False),
    StructField('comment_text', StringType(), False),
    StructField('TotalVotes', FloatType(), False),
    StructField('posVotes', FloatType(), False),
    StructField('negVotes', FloatType(), False),
    StructField('vote', StringType(), False),
    StructField('reactions', StringType(), False),
    StructField('replies', StringType(), False),
    StructField('comment_id', FloatType(), False),    
    StructField('parentID', FloatType(), False),
    StructField('threadID', FloatType(), False),
    StructField('streamId', FloatType(), False),
    StructField('edited', BooleanType(), False),
    StructField('isModerator', BooleanType(), False),
    StructField('highlightGroups', StringType(), False),
    StructField('moderatorEdit', StringType(), False),
    StructField('descendantsCount', StringType(), False),
    StructField('threadTimestamp', StringType(), False),
    StructField('flagCount', StringType(), False),
    StructField('sender_isSelf', StringType(), False),
    StructField('sender_loginProvider', StringType(), False),
    StructField('data_type', StringType(), False),
    StructField('is_empty', StringType(), False),
    StructField('status', StringType(), False),

])
df = spark.read.csv(inputloc, header="true", schema = schema)

In [4]:
data = df.select('article_id', 'comment_author', 'comment_text')
data.show()

+----------+--------------+--------------------+
|article_id|comment_author|        comment_text|
+----------+--------------+--------------------+
|  10012655|     modoglobe|I think the progr...|
|  10012655|      stueegee|This is just 'off...|
|  10012655|       EJS1018|How do you spell ...|
|  10012655|     Seedy How|TFWs have a place...|
|  10012655|       Roger_M|Why should tempor...|
|  10012655|   Rick Taves1|No sense playing ...|
|  10012655|        OldCdn|There will never ...|
|  10012655|     Seedy How|TFWs have a place...|
|  10012655|      john2012|TFWs work against...|
|  10012655|         stono|like to give you ...|
|  10012655|     Seedy How|TFWs have a prope...|
|  10012655|          SFD1|I think we have H...|
|  10012655|          SFD1|it's not the same...|
|  10012655|       Roger_M|Yep. And along th...|
|  10012655|         stono|  wow you are a fool|
|  10012655|The Work Farce|By law, employers...|
|  10012655|           M_G|Hmmm. I don't rec...|
|  10012655|     See

### clean comment text

In [5]:
def clean_comment(comment):
    
    # remove html tags
    soup = BeautifulSoup(comment)
    comment = soup.get_text()
    
    # tokenize
    tokenized = word_tokenize(comment)
    
    # remove punctuation
    tokens = [word.lower() for word in tokenized if word.isalpha()]
    
    # remove stop words
    stop_words = stopwords.words('english')
    stop_words = set(stopwords.words('english'))
    words = [w for w in tokens if not w in stop_words]
    
    # stemming
    stemmer = PorterStemmer()
    words = [stemmer.stem(w) for w in words if not w in stop_words]
    
    return words

In [7]:
udf_clean_comment = udf(clean_comment, StringType())
data = data.withColumn("clean_comment_text", udf_clean_comment(data["comment_text"]))
data.show()

+----------+--------------+--------------------+--------------------+
|article_id|comment_author|        comment_text|  clean_comment_text|
+----------+--------------+--------------------+--------------------+
|  10012655|     modoglobe|I think the progr...|[think, program, ...|
|  10012655|      stueegee|This is just 'off...|[revers, well, un...|
|  10012655|       EJS1018|How do you spell ...|[spell, exploit, ...|
|  10012655|     Seedy How|TFWs have a place...|[tfw, place, econ...|
|  10012655|       Roger_M|Why should tempor...|[temporari, worke...|
|  10012655|   Rick Taves1|No sense playing ...|[sens, play, game...|
|  10012655|        OldCdn|There will never ...|[never, reason, e...|
|  10012655|     Seedy How|TFWs have a place...|[tfw, place, econ...|
|  10012655|      john2012|TFWs work against...|[tfw, work, tenet...|
|  10012655|         stono|like to give you ...| [like, give, thumb]|
|  10012655|     Seedy How|TFWs have a prope...|[tfw, proper, pla...|
|  10012655|        

### group comments by article id and save to csv

In [9]:
# count # of comments the article receviced
article_count = data.groupby('article_id').count().orderBy('count', ascending=False)

article_count_list = article_count.filter(article_count['count']>100).select("article_id").rdd.flatMap(lambda x: x).collect()

In [None]:
file = open(“article_count_list.txt”,”w”) 
 
file.write(article_count_list) 
file.close() 

In [26]:
import os
lst = []
for x in os.listdir('comments_groupby_article'):
    lst.append(int(x.split('_')[2]))
rests = set(article_count_list)-set(lst)

In [27]:
def save_by_article(article_num):
    data_article = data.where(data['article_id'] == article_num).select(data['comment_text'], data['clean_comment_text'])
    data_article.write.csv('comments_groupby_article/data_article_'+str(article_num), header=True)

In [28]:
for article_id in rests:
    save_by_article(article_id)

12394508
22423565
11395088
15058963
9623584
17414176
29343789
26288173
12445744
23760947
29452357
18001997
17371213
22837331
14184533
17598552
18735197
15032413
32993373
22360163
13815907
17834090
17586295
15577208
12445818
27215996
16701565
30929037
7028879
8276117
33450158
23787704
32149689
19271871
28223696
31289555
21387475
19325141
32577750
15968475
24498401
16330980
26441959
18620648
31848710
19450121
27281681
23894289
24903968
17561891
11436337
14588210
19161397
15126840
30882108
17779009
31537479
27359561
30253397
22329693
30384483
24729955
32450922
18047340
19155314
21481849
29333884
26638718
24004992
20812161
19358090
10762639
33096081
8952216
16277915
25911708
29694364
27898273
32907689
13269419
30253490
27484601
22487481
20056515
18373067
9707992
17555933
23069152
32932323
23523813
20566515
32653825
11153921
26184196
12235270
26415624
22819338
30112266
14057998
25725454
14707217
27965975
23732763
6873631
22532644
22794797
11811373
23812654
31220275
18459188
6935101
28781118