In [2]:

from pyspark.mllib.feature import Word2Vec
from pyspark.ml.feature import Word2Vec
from pyspark.ml.feature import CountVectorizer, CountVectorizerModel, Tokenizer, RegexTokenizer, StopWordsRemover


In [None]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('nlp').getOrCreate()
spark = SparkSession.builder.getOrCreate()
sc = spark.sparkContext

In [4]:
from pyspark import SQLContext

In [7]:
sqlContext = SQLContext(sc)

In [33]:
df = sqlContext.read.csv("news.csv", header=True,inferSchema=True)
df.show(5)

+---+--------------------+--------------------+-------------------+--------------------+--------------------+--------------------+
|_c0|             authors|               title|       publish_date|         description|                text|                 url|
+---+--------------------+--------------------+-------------------+--------------------+--------------------+--------------------+
|  0|        ['Cbc News']|Coronavirus a 'wa...|2020-03-27 08:00:00|Canadian pharmaci...|"Canadian pharmac...|"" he said.  Tadr...|
|  1|        ['Cbc News']|Yukon gov't names...|2020-03-27 01:45:00|The Yukon governm...|"The Yukon govern...|  from March 9 to 13|
|  2|['The Associated ...|U.S. Senate passe...|2020-03-26 05:13:00|The Senate has pa...|"The Senate late ...|"" said Democrati...|
|  3|        ['Cbc News']|Coronavirus: The ...|2020-03-27 00:36:00|Scientists around...|"Scientists aroun...| "" said Zarychanski|
|  4|        ['Cbc News']|The latest on the...|2020-03-26 20:57:00|The latest on th

In [34]:
df.createOrReplaceTempView("table")

In [35]:
practice = spark.sql("SELECT title FROM table" )
practice.show()

+--------------------+
|               title|
+--------------------+
|Coronavirus a 'wa...|
|Yukon gov't names...|
|U.S. Senate passe...|
|Coronavirus: The ...|
|The latest on the...|
|'Worse' pandemic ...|
|What you need to ...|
|Michigan hospital...|
|U.S. coronavirus ...|
|'Avoid the emerge...|
|COVID-19 in Sask:...|
| 'Jorgebarrera Or...|
|How invoking the ...|
| 'As The Late Lin...|
|Alberta's film in...|
| 'The World Cup I...|
|N.L. fisheries re...|
|1st death, 3 new ...|
| 'Caroline.Bargho...|
|It's 'too late' f...|
+--------------------+
only showing top 20 rows



In [36]:
from pyspark.ml.feature import Tokenizer

In [37]:
tokenization = Tokenizer(inputCol='title', outputCol='tokenized')
tokenized_df = tokenization.transform(practice)
tokenized_df.show(5, False)

+------------------------------------------------------------------+----------------------------------------------------------------------------+
|title                                                             |tokenized                                                                   |
+------------------------------------------------------------------+----------------------------------------------------------------------------+
|Coronavirus a 'wake-up call' for Canada's prescription drug supply|[coronavirus, a, 'wake-up, call', for, canada's, prescription, drug, supply]|
|Yukon gov't names 2 possible sources of coronavirus exposure      |[yukon, gov't, names, 2, possible, sources, of, coronavirus, exposure]      |
|U.S. Senate passes $2T coronavirus relief package                 |[u.s., senate, passes, $2t, coronavirus, relief, package]                   |
|Coronavirus: The latest in drug treatment and vaccine development |[coronavirus:, the, latest, in, drug, treatment, and, va

In [38]:
from pyspark.ml.feature import StopWordsRemover

sw_removal = StopWordsRemover(inputCol='tokenized', outputCol='new_tokenized')

new_df  = sw_removal.transform(tokenized_df)

new_df.select([ 'tokenized', 'new_tokenized']).show(4, False)


+----------------------------------------------------------------------------+--------------------------------------------------------------------+
|tokenized                                                                   |new_tokenized                                                       |
+----------------------------------------------------------------------------+--------------------------------------------------------------------+
|[coronavirus, a, 'wake-up, call', for, canada's, prescription, drug, supply]|[coronavirus, 'wake-up, call', canada's, prescription, drug, supply]|
|[yukon, gov't, names, 2, possible, sources, of, coronavirus, exposure]      |[yukon, gov't, names, 2, possible, sources, coronavirus, exposure]  |
|[u.s., senate, passes, $2t, coronavirus, relief, package]                   |[u.s., senate, passes, $2t, coronavirus, relief, package]           |
|[coronavirus:, the, latest, in, drug, treatment, and, vaccine, development] |[coronavirus:, latest, drug, treat

In [39]:
from pyspark.ml.feature import CountVectorizer

count_vectorizer = CountVectorizer(inputCol='new_tokenized', outputCol='count_vector')

cv_df = count_vectorizer.fit(new_df).transform(new_df)

cv_df.select(['count_vector', 'new_tokenized']).show(4, False)


+--------------------------------------------------------------------------+--------------------------------------------------------------------+
|count_vector                                                              |new_tokenized                                                       |
+--------------------------------------------------------------------------+--------------------------------------------------------------------+
|(5935,[1,156,231,440,2799,3654,4323],[1.0,1.0,1.0,1.0,1.0,1.0,1.0])       |[coronavirus, 'wake-up, call', canada's, prescription, drug, supply]|
|(5935,[1,52,236,264,687,1091,3466,4927],[1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0])|[yukon, gov't, names, 2, possible, sources, coronavirus, exposure]  |
|(5935,[1,48,143,431,870,2408,2466],[1.0,1.0,1.0,1.0,1.0,1.0,1.0])         |[u.s., senate, passes, $2t, coronavirus, relief, package]           |
|(5935,[168,230,440,456,1021,3179],[1.0,1.0,1.0,1.0,1.0,1.0])              |[coronavirus:, latest, drug, treatment, vaccine,

In [40]:
from pyspark.ml.feature import HashingTF, IDF

hashing_vector = HashingTF(inputCol='new_tokenized', outputCol='tf_vector')

hashing_df = hashing_vector.transform(new_df)

hashing_df.select(['new_tokenized', 'tf_vector']).show(4, False)


+--------------------------------------------------------------------+--------------------------------------------------------------------------------------------------+
|new_tokenized                                                       |tf_vector                                                                                         |
+--------------------------------------------------------------------+--------------------------------------------------------------------------------------------------+
|[coronavirus, 'wake-up, call', canada's, prescription, drug, supply]|(262144,[19409,40531,69392,119910,135051,205460,206738],[1.0,1.0,1.0,1.0,1.0,1.0,1.0])            |
|[yukon, gov't, names, 2, possible, sources, coronavirus, exposure]  |(262144,[19409,67931,104901,149175,212053,214333,227860,236204],[1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0])|
|[u.s., senate, passes, $2t, coronavirus, relief, package]           |(262144,[5518,19409,46332,70747,89896,90843,182338],[1.0,1.0,1.0,1.0,1.0,1.0,1.0

In [80]:
tf_idf_vector = IDF(inputCol='tf_vector', outputCol='tf_idf_vector')

tf_idf_df = tf_idf_vector.fit(hashing_df).transform(hashing_df)

tf_idf_df.select(['new_tokenized','tf_idf_vector']).show(4, False)


+--------------------------------------------------------------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|new_tokenized                                                       |tf_idf_vector                                                                                                                                                                                                    |
+--------------------------------------------------------------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|[coronavirus, 'wake-up, call', canada's, prescription, drug, supply]|(262144,[19409,40531,69392,119910,135051,205460,206738],[1.6662500744465845,7.486333004

In [77]:
text_df = spark.read.csv('news.csv', inferSchema=True, header=True, sep=',')
text_df.printSchema()

root
 |-- _c0: integer (nullable = true)
 |-- authors: string (nullable = true)
 |-- title: string (nullable = true)
 |-- publish_date: string (nullable = true)
 |-- description: string (nullable = true)
 |-- text: string (nullable = true)
 |-- url: string (nullable = true)



In [82]:
data = sqlContext.read.format('com.databricks.spark.csv').options(header='true', inferschema='true').load('news.csv')

In [83]:
data.show(5)

+---+--------------------+--------------------+-------------------+--------------------+--------------------+--------------------+
|_c0|             authors|               title|       publish_date|         description|                text|                 url|
+---+--------------------+--------------------+-------------------+--------------------+--------------------+--------------------+
|  0|        ['Cbc News']|Coronavirus a 'wa...|2020-03-27 08:00:00|Canadian pharmaci...|"Canadian pharmac...|"" he said.  Tadr...|
|  1|        ['Cbc News']|Yukon gov't names...|2020-03-27 01:45:00|The Yukon governm...|"The Yukon govern...|  from March 9 to 13|
|  2|['The Associated ...|U.S. Senate passe...|2020-03-26 05:13:00|The Senate has pa...|"The Senate late ...|"" said Democrati...|
|  3|        ['Cbc News']|Coronavirus: The ...|2020-03-27 00:36:00|Scientists around...|"Scientists aroun...| "" said Zarychanski|
|  4|        ['Cbc News']|The latest on the...|2020-03-26 20:57:00|The latest on th

In [84]:
drop_list = ['authors', 'publish_date', 'description', 'text', 'Unnamed: 0', 'url']
data = data.select([column for column in data.columns if column not in drop_list])
data.show(5)

+---+--------------------+
|_c0|               title|
+---+--------------------+
|  0|Coronavirus a 'wa...|
|  1|Yukon gov't names...|
|  2|U.S. Senate passe...|
|  3|Coronavirus: The ...|
|  4|The latest on the...|
+---+--------------------+
only showing top 5 rows



In [85]:
data.printSchema()

root
 |-- _c0: integer (nullable = true)
 |-- title: string (nullable = true)

