In [1]:
from pyspark.sql import SparkSession

In [2]:
spark = (SparkSession.builder.appName("Converting articles into BoW Vectors").getOrCreate())

In [3]:
spark

In [4]:
data=spark.read.csv("all_news.csv",header=True,inferSchema=True)

In [5]:
print(data)

DataFrame[_c0: string, Unnamed: 0: string, date: string, year: string, month: string, day: string, author: string, title: string, article: string, url: string, section: string, publication: string]


In [6]:
from pyspark.sql.functions import udf, col, lower, regexp_replace,concat,lit,split,explode,regexp_extract
from pyspark.ml.feature import Tokenizer, StopWordsRemover
import nltk
import string
import re
from nltk.stem.snowball import SnowballStemmer 
from pyspark.sql.functions import udf
import pyspark.sql.types as T
import pyspark.sql.functions as F
from nltk import pos_tag
from nltk.stem import WordNetLemmatizer
from nltk.corpus import stopwords
from spacy.lang.en.stop_words import STOP_WORDS
from itertools import filterfalse
from nltk.corpus import wordnet as wn

In [7]:
data=data.select('title','article')


In [8]:
data.show()

+--------------------+--------------------+
|               title|             article|
+--------------------+--------------------+
|We should take co...|"This post is par...|
|Colts GM Ryan Gri...| The Indianapolis...|
|                null|                null|
|Trump denies repo...|DAVOS, Switzerlan...|
|France's Sarkozy ...|PARIS (Reuters) -...|
|Paris Hilton: Wom...|"Paris Hilton arr...|
|ECB's Coeure: If ...|BERLIN, June 17 (...|
|                null|                null|
|Venezuela detains...|CARACAS (Reuters)...|
|You Can Trick You...|"If only every da...|
|How to watch the ...|Google I/O, the c...|
|China is dismissi...|China is dismissi...|
|“Elizabeth Warren...|Elizabeth Warren ...|
|Hudson's Bay's ch...|(Reuters) - The s...|
|Joakim Noah's Vic...|Joakim Noah's ﻿mo...|
|Jermaine Jackson ...|"Jermaine Jackson...|
|UK PM May presses...|LONDON (Reuters) ...|
|Nancy Pelosi says...|"Nancy Pelosi is ...|
|The government of...|The nonpartisan d...|
|Mark Zuckerberg’s...|The threat

In [9]:
new_data = data.withColumn('text', F.concat(F.col('title'), F.col('article'))).drop(*data.columns[:2])

In [10]:
new_data.show()

+--------------------+
|                text|
+--------------------+
|We should take co...|
|Colts GM Ryan Gri...|
|                null|
|Trump denies repo...|
|France's Sarkozy ...|
|Paris Hilton: Wom...|
|ECB's Coeure: If ...|
|                null|
|Venezuela detains...|
|You Can Trick You...|
|How to watch the ...|
|China is dismissi...|
|“Elizabeth Warren...|
|Hudson's Bay's ch...|
|Joakim Noah's Vic...|
|Jermaine Jackson ...|
|UK PM May presses...|
|Nancy Pelosi says...|
|The government of...|
|Mark Zuckerberg’s...|
+--------------------+
only showing top 20 rows



In [11]:
non_null_data = new_data.dropna()

In [12]:
non_null_data.count()

2676886

In [13]:
non_null_data.show()

+--------------------+
|                text|
+--------------------+
|We should take co...|
|Colts GM Ryan Gri...|
|Trump denies repo...|
|France's Sarkozy ...|
|Paris Hilton: Wom...|
|ECB's Coeure: If ...|
|Venezuela detains...|
|You Can Trick You...|
|How to watch the ...|
|China is dismissi...|
|“Elizabeth Warren...|
|Hudson's Bay's ch...|
|Joakim Noah's Vic...|
|Jermaine Jackson ...|
|UK PM May presses...|
|Nancy Pelosi says...|
|The government of...|
|Mark Zuckerberg’s...|
|Girl Scouts Are T...|
|An Animated Maste...|
+--------------------+
only showing top 20 rows



In [14]:
normalized_data = non_null_data.select(lower(col("text")).alias("normalized tokens"))

In [15]:
normalized_data.show(10)

+--------------------+
|   normalized tokens|
+--------------------+
|we should take co...|
|colts gm ryan gri...|
|trump denies repo...|
|france's sarkozy ...|
|paris hilton: wom...|
|ecb's coeure: if ...|
|venezuela detains...|
|you can trick you...|
|how to watch the ...|
|china is dismissi...|
+--------------------+
only showing top 10 rows



In [16]:
from pyspark.ml.feature import Tokenizer, RegexTokenizer



In [17]:
tokenizer=Tokenizer(inputCol='normalized tokens',outputCol='tokens')

In [18]:
data_words_token = tokenizer.transform(normalized_data).select('words_token')

In [19]:
tokenizer = Tokenizer(inputCol='normalized tokens', outputCol='words_token')
data_words_token = tokenizer.transform(normalized_data).select('words_token')

In [None]:
remover = StopWordsRemover(inputCol='words_token', outputCol='words_clean')
text = remover.transform(data_words_token).select('words_clean')

In [20]:
data_words_token.show()

+--------------------+
|         words_token|
+--------------------+
|[we, should, take...|
|[colts, gm, ryan,...|
|[trump, denies, r...|
|[france's, sarkoz...|
|[paris, hilton:, ...|
|[ecb's, coeure:, ...|
|[venezuela, detai...|
|[you, can, trick,...|
|[how, to, watch, ...|
|[china, is, dismi...|
|[“elizabeth, warr...|
|[hudson's, bay's,...|
|[joakim, noah's, ...|
|[jermaine, jackso...|
|[uk, pm, may, pre...|
|[nancy, pelosi, s...|
|[the, government,...|
|[mark, zuckerberg...|
|[girl, scouts, ar...|
|[an, animated, ma...|
+--------------------+
only showing top 20 rows



In [21]:
remover = StopWordsRemover(inputCol='words_token', outputCol='words_clean')
text = remover.transform(data_words_token).select('words_clean')

In [22]:
text.show(truncate=100)

+----------------------------------------------------------------------------------------------------+
|                                                                                         words_clean|
+----------------------------------------------------------------------------------------------------+
|[take, concerns, health, liberal, democracy, seriously"this, post, part, polyarchy,, independent,...|
|[colts, gm, ryan, grigson, says, andrew, luck's, contract, makes, difficult, build, team, indiana...|
|[trump, denies, report, ordered, mueller, fireddavos,, switzerland, (reuters), -, u.s., president...|
|[france's, sarkozy, reveals, 'passions', insists, come-back, cardsparis, (reuters), -, former, fr...|
|[paris, hilton:, woman, black, uncle, monty's, funeral"paris, hilton, arrived, lax, wednesday, dr...|
|[ecb's, coeure:, decide, cut, rates,, consider, tieringberlin,, june, 17, (reuters), -, ecb, boar...|
|[venezuela, detains, six, military,, police, officials:, family, members

In [23]:
snow_stemmer = SnowballStemmer(language='english')

In [24]:
stemmer = Stemmer().setInputCols([“token”]).setOutputCol(“stem”)
stemmer.transform(df)


SyntaxError: invalid character in identifier (<ipython-input-24-0293f77be1bc>, line 2)

In [35]:
stemmer_udf = udf(lambda tokens: [stemmer.stem(token) for token in tokens],T.ArrayType(T.StringType()))
data_stemmed = text.withColumn("words_stemmed", stemmer_udf("words_clean")).select('words_stemmed')


In [36]:
filter_length_udf = udf(lambda row: [x for x in row if len(x) >= 3], T.ArrayType(T.StringType()))
data_final_words = data_stemmed.withColumn('words', filter_length_udf(col('words_stemmed')))


In [37]:
from pyspark.sql.functions import explode


In [38]:
tokens = data_final_words.select(explode(col("words_stemmed")).alias("tokens"))

In [39]:
token_groups = tokens.groupby(col("tokens"))

In [40]:
token_groups

<pyspark.sql.group.GroupedData at 0x1ea29e02c10>

In [41]:
token_counts = token_groups.count()

In [42]:
token_counts.show()

Py4JJavaError: An error occurred while calling o210.showString.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 1 in stage 15.0 failed 1 times, most recent failure: Lost task 1.0 in stage 15.0 (TID 146) (Kinjal executor driver): java.net.SocketException: Connection reset by peer: socket write error
	at java.base/java.net.SocketOutputStream.socketWrite0(Native Method)
	at java.base/java.net.SocketOutputStream.socketWrite(SocketOutputStream.java:110)
	at java.base/java.net.SocketOutputStream.write(SocketOutputStream.java:150)
	at java.base/java.io.BufferedOutputStream.flushBuffer(BufferedOutputStream.java:81)
	at java.base/java.io.BufferedOutputStream.write(BufferedOutputStream.java:122)
	at java.base/java.io.DataOutputStream.write(DataOutputStream.java:107)
	at java.base/java.io.FilterOutputStream.write(FilterOutputStream.java:108)
	at org.apache.spark.api.python.PythonRDD$.write$1(PythonRDD.scala:295)
	at org.apache.spark.api.python.PythonRDD$.$anonfun$writeIteratorToStream$1(PythonRDD.scala:307)
	at org.apache.spark.api.python.PythonRDD$.$anonfun$writeIteratorToStream$1$adapted(PythonRDD.scala:307)
	at scala.collection.Iterator.foreach(Iterator.scala:941)
	at scala.collection.Iterator.foreach$(Iterator.scala:941)
	at scala.collection.AbstractIterator.foreach(Iterator.scala:1429)
	at org.apache.spark.api.python.PythonRDD$.writeIteratorToStream(PythonRDD.scala:307)
	at org.apache.spark.sql.execution.python.PythonUDFRunner$$anon$1.writeIteratorToStream(PythonUDFRunner.scala:53)
	at org.apache.spark.api.python.BasePythonRunner$WriterThread.$anonfun$run$1(PythonRunner.scala:397)
	at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1996)
	at org.apache.spark.api.python.BasePythonRunner$WriterThread.run(PythonRunner.scala:232)

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2253)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2202)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2201)
	at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
	at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2201)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1078)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1078)
	at scala.Option.foreach(Option.scala:407)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1078)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2440)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2382)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2371)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:868)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2202)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2223)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2242)
	at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:472)
	at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:425)
	at org.apache.spark.sql.execution.CollectLimitExec.executeCollect(limit.scala:47)
	at org.apache.spark.sql.Dataset.collectFromPlan(Dataset.scala:3696)
	at org.apache.spark.sql.Dataset.$anonfun$head$1(Dataset.scala:2722)
	at org.apache.spark.sql.Dataset.$anonfun$withAction$1(Dataset.scala:3687)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:103)
	at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:163)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:90)
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:772)
	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
	at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3685)
	at org.apache.spark.sql.Dataset.head(Dataset.scala:2722)
	at org.apache.spark.sql.Dataset.take(Dataset.scala:2929)
	at org.apache.spark.sql.Dataset.getRows(Dataset.scala:301)
	at org.apache.spark.sql.Dataset.showString(Dataset.scala:338)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.base/java.lang.reflect.Method.invoke(Method.java:566)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:238)
	at java.base/java.lang.Thread.run(Thread.java:834)
Caused by: java.net.SocketException: Connection reset by peer: socket write error
	at java.base/java.net.SocketOutputStream.socketWrite0(Native Method)
	at java.base/java.net.SocketOutputStream.socketWrite(SocketOutputStream.java:110)
	at java.base/java.net.SocketOutputStream.write(SocketOutputStream.java:150)
	at java.base/java.io.BufferedOutputStream.flushBuffer(BufferedOutputStream.java:81)
	at java.base/java.io.BufferedOutputStream.write(BufferedOutputStream.java:122)
	at java.base/java.io.DataOutputStream.write(DataOutputStream.java:107)
	at java.base/java.io.FilterOutputStream.write(FilterOutputStream.java:108)
	at org.apache.spark.api.python.PythonRDD$.write$1(PythonRDD.scala:295)
	at org.apache.spark.api.python.PythonRDD$.$anonfun$writeIteratorToStream$1(PythonRDD.scala:307)
	at org.apache.spark.api.python.PythonRDD$.$anonfun$writeIteratorToStream$1$adapted(PythonRDD.scala:307)
	at scala.collection.Iterator.foreach(Iterator.scala:941)
	at scala.collection.Iterator.foreach$(Iterator.scala:941)
	at scala.collection.AbstractIterator.foreach(Iterator.scala:1429)
	at org.apache.spark.api.python.PythonRDD$.writeIteratorToStream(PythonRDD.scala:307)
	at org.apache.spark.sql.execution.python.PythonUDFRunner$$anon$1.writeIteratorToStream(PythonUDFRunner.scala:53)
	at org.apache.spark.api.python.BasePythonRunner$WriterThread.$anonfun$run$1(PythonRunner.scala:397)
	at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1996)
	at org.apache.spark.api.python.BasePythonRunner$WriterThread.run(PythonRunner.scala:232)


In [None]:
token_counts.orderBy("count",ascending=False).show(100)
