In [1]:
import findspark
findspark.init()
import pyspark
from pyspark import SQLContext
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()

In [2]:
spark = SparkSession.builder.getOrCreate()
sqlContext = SQLContext.getOrCreate(spark.sparkContext)

In [3]:
news_df = sqlContext.read.csv("Desktop/news.csv",header=True)

In [4]:
news_df.show()

+---+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+
|_c0|             authors|               title|        publish_date|         description|                text|                 url|
+---+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+
|  0|        ['Cbc News']|Coronavirus a 'wa...| 2020-03-27 08:00:00|Canadian pharmaci...|"Canadian pharmac...|"" he said.  Tadr...|
|  1|        ['Cbc News']|Yukon gov't names...| 2020-03-27 01:45:00|The Yukon governm...|"The Yukon govern...|  from March 9 to 13|
|  2|['The Associated ...|U.S. Senate passe...| 2020-03-26 05:13:00|The Senate has pa...|"The Senate late ...|"" said Democrati...|
|  3|        ['Cbc News']|Coronavirus: The ...| 2020-03-27 00:36:00|Scientists around...|"Scientists aroun...| "" said Zarychanski|
|  4|        ['Cbc News']|The latest on the...| 2020-03-26 20:57:00|The late

In [5]:
#clean columns, remove smybols
from pyspark.sql.functions import *
def cleanColumn(tmpdf,colName,findChar,replaceChar):
    tmpdf = tmpdf.withColumn(colName, regexp_replace(colName, findChar, replaceChar))
    return tmpdf

allColNames = news_df.schema.names
charToRemove= "[\"!@#$%^&*\(\)\{\}\[\]\']"
replaceWith =""
for colName in allColNames:
    news_df=cleanColumn(news_df,colName,charToRemove,replaceWith)

In [6]:
news_df.show()

+---+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+
|_c0|             authors|               title|        publish_date|         description|                text|                 url|
+---+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+
|  0|            Cbc News|Coronavirus a wak...| 2020-03-27 08:00:00|Canadian pharmaci...|Canadian pharmaci...| he said.  Tadrou...|
|  1|            Cbc News|Yukon govt names ...| 2020-03-27 01:45:00|The Yukon governm...|The Yukon governm...|  from March 9 to 13|
|  2|The Associated Press|U.S. Senate passe...| 2020-03-26 05:13:00|The Senate has pa...|The Senate late W...| said Democratic ...|
|  3|            Cbc News|Coronavirus: The ...| 2020-03-27 00:36:00|Scientists around...|Scientists around...|    said Zarychanski|
|  4|            Cbc News|The latest on the...| 2020-03-26 20:57:00|The late

In [7]:
#drop column _c0
news_df = news_df.drop('_c0')
news_df.show()

+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+
|             authors|               title|        publish_date|         description|                text|                 url|
+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+
|            Cbc News|Coronavirus a wak...| 2020-03-27 08:00:00|Canadian pharmaci...|Canadian pharmaci...| he said.  Tadrou...|
|            Cbc News|Yukon govt names ...| 2020-03-27 01:45:00|The Yukon governm...|The Yukon governm...|  from March 9 to 13|
|The Associated Press|U.S. Senate passe...| 2020-03-26 05:13:00|The Senate has pa...|The Senate late W...| said Democratic ...|
|            Cbc News|Coronavirus: The ...| 2020-03-27 00:36:00|Scientists around...|Scientists around...|    said Zarychanski|
|            Cbc News|The latest on the...| 2020-03-26 20:57:00|The latest on the...|    Trudeau says ..

In [13]:
#drop rows with NAs/ Null values
def dropNA (tmpdf2,columnName):
    tmpdf2 = tmpdf2.where(col(columnName).isNotNull())
    return tmpdf2

columnName = ["authors","title","publish_date","description","text","url"]
allColNames2 = news_df.schema.names

for columnName in allColNames2:
    news_df = dropNA(news_df,columnName)

In [14]:
news_df.show()

+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+
|             authors|               title|        publish_date|         description|                text|                 url|
+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+
|            Cbc News|Coronavirus a wak...| 2020-03-27 08:00:00|Canadian pharmaci...|Canadian pharmaci...| he said.  Tadrou...|
|            Cbc News|Yukon govt names ...| 2020-03-27 01:45:00|The Yukon governm...|The Yukon governm...|  from March 9 to 13|
|The Associated Press|U.S. Senate passe...| 2020-03-26 05:13:00|The Senate has pa...|The Senate late W...| said Democratic ...|
|            Cbc News|Coronavirus: The ...| 2020-03-27 00:36:00|Scientists around...|Scientists around...|    said Zarychanski|
|            Cbc News|The latest on the...| 2020-03-26 20:57:00|The latest on the...|    Trudeau says ..

In [17]:
# print schema to know what to query against
news_df.printSchema()

root
 |-- authors: string (nullable = true)
 |-- title: string (nullable = true)
 |-- publish_date: string (nullable = true)
 |-- description: string (nullable = true)
 |-- text: string (nullable = true)
 |-- url: string (nullable = true)



In [29]:
#create tmp table view
news_df.createOrReplaceTempView("count_table")

In [38]:
#count # of authors
filtered_count_df = spark.sql("SELECT COUNT (DISTINCT authors) AS Author_Count FROM count_table")
filtered_count_df.show()

+------------+
|Author_Count|
+------------+
|         261|
+------------+



In [40]:
#groupby publish_date to see how many articles were published daily and by time stamp
news_df.groupBy("publish_date").count().show()

+-------------------+-----+
|       publish_date|count|
+-------------------+-----+
|2020-03-20 21:29:00|    3|
|2020-03-11 18:42:00|    1|
|2020-03-05 10:00:00|    1|
|2020-01-28 21:30:00|    1|
|2020-02-22 09:00:00|    4|
|2020-02-04 21:38:00|    2|
|2020-03-25 11:31:00|    1|
|2020-03-25 20:00:00|    2|
|2020-03-12 03:50:00|    1|
|2020-03-18 22:28:00|    1|
|2020-03-14 00:47:00|    2|
|2020-02-06 20:25:00|    1|
|2020-02-26 10:41:00|    1|
|2020-02-10 02:50:00|    1|
|2020-03-24 18:40:00|    1|
|2020-03-25 17:55:00|    1|
|2020-03-25 14:16:00|    1|
|2020-03-24 19:42:00|    3|
|2020-03-17 09:00:00|    1|
|2020-03-17 10:27:00|    1|
+-------------------+-----+
only showing top 20 rows



In [41]:
# groupby and count # of authors
news_df.groupBy("authors").count().show()

+--------------------+-----+
|             authors|count|
+--------------------+-----+
|Joel Ballard Is A...|    1|
|Brian Higgins Sho...|    1|
|Bryan Eneas Is A ...|    3|
|Covering Everythi...|    1|
|Defence, Murray B...|   13|
|Adam Hunter Is Th...|   12|
|Reporter Editor, ...|   14|
|      Elham Shabahat|    1|
|Co-Host, Cbc Vanc...|    1|
|Nick Murray Is A ...|    1|
|Kevin Yarr Is The...|    1|
|Medical Science, ...|   12|
|Brian Rodgers Is ...|    2|
|Senior Writer, Pe...|   10|
|Cbc Investigative...|    2|
|Reporter, Cbc Tor...|    4|
|Data Journalist, ...|    1|
|Andie Bulman Is A...|    3|
|Sarah Rieger Join...|    2|
|Reporter, Robson ...|    6|
+--------------------+-----+
only showing top 20 rows



In [42]:
# groupby and count # of titles, shows duplicate titles
news_df.groupBy("title").count().show()

+--------------------+-----+
|               title|count|
+--------------------+-----+
|Coronavirus: Here...|    1|
|Michigan governor...|    1|
|Unacceptable: Rea...|    1|
|Chinas coronaviru...|    1|
|1 in 3 Canadians ...|    1|
|    MERS coronavirus|    1|
|Seafood plants sc...|    1|
|Mayor assures Win...|    1|
|Saskatoon cannabi...|    1|
|Charlottetown clo...|    2|
|Public health eme...|    2|
|Montreal-area hig...|    1|
|Its scary: Canadi...|    1|
|Plea from an ER d...|    1|
|London to open CO...|    1|
|City of Thunder B...|    1|
|U.S. death toll f...|    1|
|Working from home...|    1|
|To say we werent ...|    1|
|Patients can now ...|    1|
+--------------------+-----+
only showing top 20 rows



In [43]:
# groupby and count # of description shows duplicate descriptions
news_df.groupBy("description").count().show()

+--------------------+-----+
|         description|count|
+--------------------+-----+
|A federal economi...|    1|
|Groups that work ...|    1|
|A ban on visitors...|    1|
|Coronavirus may b...|    2|
|More Indigenous c...|    1|
|In a letter sent ...|    4|
|While other presi...|    1|
|As public health ...|    1|
|Several employees...|    1|
|Those concerned w...|    2|
|As of Thursday, 1...|    1|
|A chartered aircr...|    1|
|The Timiskaming H...|    1|
|Cape Breton Regio...|    1|
|China has given t...|    1|
|As many Canadians...|    1|
|A doctor with the...|    2|
|No sweeping bans ...|    1|
|The Calgary Catho...|    1|
|G7 finance minist...|    1|
+--------------------+-----+
only showing top 20 rows



In [44]:
# groupby and count # of url shows duplicate descriptions
news_df.groupBy("url").count().show()

+--------------------+-----+
|                 url|count|
+--------------------+-----+
| Johnson said in ...|    1|
|      Dr. Mark Joffe|    1|
| the Manitoban sa...|    1|
|Whos the Canadian...|    1|
| the company said...|    1|
| he said. Theres ...|    1|
| the office of th...|    2|
|Much of Quebecs e...|    1|
| Trudeau urged ev...|    2|
| chief provincial...|    1|
|https://www.cbc.c...|    1|
| said Tyler Misla...|    2|
| said Myriam Laro...|    1|
|   said Jon Ola Sand|    1|
| the statement sa...|    1|
| it is not clear ...|    1|
|    Zarychanski said|    1|
| not focused on a...|    2|
| Hamiltons medica...|    1|
| aside from what ...|    1|
+--------------------+-----+
only showing top 20 rows



In [190]:
#Stop word removal code from DataCamp
# Create a baseRDD from the file path
baseRDD = sc.textFile("Desktop/news.csv")

# Split the lines of baseRDD into words
splitRDD = baseRDD.flatMap(lambda x: x.split())

# Count the total number of words
print("Total number of words in splitRDD:", splitRDD.count())

Total number of words in splitRDD: 2734101


In [191]:
stop_words = ['whence', 'here', 'show', 'were', 'why', 'n’t', 'the', 'whereupon', 'not', 'more', 'how', 'eight', 'indeed', 
              'i', 'only', 'via', 'nine', 're', 'themselves', 'almost', 'to', 'already', 'front', 'least', 'becomes', 
              'thereby', 'doing', 'her', 'together', 'be', 'often', 'then', 'quite', 'less', 'many', 'they', 'ourselves', 
              'take', 'its', 'yours', 'each', 'would', 'may', 'namely', 'do', 'whose', 'whether', 'side', 'both', 'what', 
              'between', 'toward', 'our', 'whereby', "'m", 'formerly', 'myself', 'had', 'really', 'call', 'keep', "'re", 
              'hereupon', 'can', 'their', 'eleven', '’m', 'even', 'around', 'twenty', 'mostly', 'did', 'at', 'an', 'seems', 
              'serious', 'against', "n't", 'except', 'has', 'five', 'he', 'last', '‘ve', 'because', 'we', 'himself', 'yet', 
              'something', 'somehow', '‘m', 'towards', 'his', 'six', 'anywhere', 'us', '‘d', 'thru', 'thus', 'which', 
              'everything', 'become', 'herein', 'one', 'in', 'although', 'sometime', 'give', 'cannot', 'besides', 'across', 
              'noone', 'ever', 'that', 'over', 'among', 'during', 'however', 'when', 'sometimes', 'still', 'seemed', 'get', 
              "'ve", 'him', 'with', 'part', 'beyond', 'everyone', 'same', 'this', 'latterly', 'no', 'regarding', 'elsewhere', 
              'others', 'moreover', 'else', 'back', 'alone', 'somewhere', 'are', 'will', 'beforehand', 'ten', 'very', 'most', 
              'three', 'former', '’re', 'otherwise', 'several', 'also', 'whatever', 'am', 'becoming', 'beside', '’s', 'nothing', 
              'some', 'since', 'thence', 'anyway', 'out', 'up', 'well', 'it', 'various', 'four', 'top', '‘s', 'than', 'under', 'might', 
              'could', 'by', 'too', 'and', 'whom', '‘ll', 'say', 'therefore', "'s", 'other', 'throughout', 'became', 'your', 'put', 'per', 
              "'ll", 'fifteen', 'must', 'before', 'whenever', 'anyone', 'without', 'does', 'was', 'where', 'thereafter', "'d", 'another', 
              'yourselves', 'n‘t', 'see', 'go', 'wherever', 'just', 'seeming', 'hence', 'full', 'whereafter', 'bottom', 'whole', 'own', 
              'empty', 'due', 'behind', 'while', 'onto', 'wherein', 'off', 'again', 'a', 'two', 'above', 'therein', 'sixty', 'those', 
              'whereas', 'using', 'latter', 'used', 'my', 'herself', 'hers', 'or', 'neither', 'forty', 'thereupon', 'now', 'after', 
              'yourself', 'whither', 'rather', 'once', 'from', 'until', 'anything', 'few', 'into', 'such', 'being', 'make', 'mine', 
              'please', 'along', 'hundred', 'should', 'below', 'third', 'unless', 'upon', 'perhaps', 'ours', 'but', 'never', 'whoever', 
              'fifty', 'any', 'all', 'nobody', 'there', 'have', 'anyhow', 'of', 'seem', 'down', 'is', 'every', '’ll', 'much', 'none', 
              'further', 'me', 'who', 'nevertheless', 'about', 'everywhere', 'name', 'enough', '’d', 'next', 'meanwhile', 'though', 
              'through', 'on', 'first', 'been', 'hereby', 'if', 'move', 'so', 'either', 'amongst', 'for', 'twelve', 'nor', 'she', 'always', 
              'these', 'as', '’ve', 'amount', '‘re', 'someone', 'afterwards', 'you', 'nowhere', 'itself', 'done', 'hereafter', 'within', 
              'made', 'ca', 'them', 'said', 'said.', '— has', 'says', '— has', '—']

In [192]:
# Convert the words in lower case and remove stop words from stop_words
splitRDD_no_stop = splitRDD.filter(lambda x: x.lower() not in stop_words)

# Create a tuple of the word and 1 
splitRDD_no_stop_words = splitRDD_no_stop.map(lambda w: (w, 1))

# Count of the number of occurences of each word 
resultRDD = splitRDD_no_stop_words.reduceByKey(lambda x, y: x + y)

In [193]:
# Swap the keys and values
resultRDD_swap = resultRDD.map(lambda x: (x[1], x[0]))

# Sort the keys in descending order
resultRDD_swap_sort = resultRDD_swap.sortByKey(ascending=False)

# Show the top 10 most frequent words and their frequencies
for word in resultRDD_swap_sort.take(10):
	print("{} has {} counts". format(word[1], word[0]))

COVID-19 has 12123 counts
people has 11326 counts
health has 8979 counts
coronavirus has 7697 counts
cases has 6950 counts
new has 5099 counts
public has 4953 counts
Health has 4927 counts
government has 4434 counts
Canada has 4396 counts


In [201]:
new_results_df = spark.createDataFrame(resultRDD_swap_sort)
new_results_df.createOrReplaceTempView("Word_Count")

In [207]:
data = new_results_df.select(col("_1").alias("count"), col("_2").alias("word"))

In [208]:
data.show()

+-----+-----------+
|count|       word|
+-----+-----------+
|12123|   COVID-19|
|11326|     people|
| 8979|     health|
| 7697|coronavirus|
| 6950|      cases|
| 5099|        new|
| 4953|     public|
| 4927|     Health|
| 4434| government|
| 4396|     Canada|
| 3874|     spread|
| 3701|       need|
| 3657|       home|
| 3563|  confirmed|
| 3328|      virus|
| 3302|      March|
| 3279|       it's|
| 3228|   province|
| 3006|  officials|
| 2966|       ""We|
+-----+-----------+
only showing top 20 rows



In [209]:
top_10 = data.take(10)
top_10

[Row(count=12123, word='COVID-19'),
 Row(count=11326, word='people'),
 Row(count=8979, word='health'),
 Row(count=7697, word='coronavirus'),
 Row(count=6950, word='cases'),
 Row(count=5099, word='new'),
 Row(count=4953, word='public'),
 Row(count=4927, word='Health'),
 Row(count=4434, word='government'),
 Row(count=4396, word='Canada')]

In [214]:
print("The column names of names_df are", data.columns)

The column names of names_df are ['count', 'word']


In [215]:
df_pandas = data.toPandas()

In [216]:
import pandas as pd
import matplotlib.pyplot as plt

In [None]:
plt.clf()
pdDF = df_pandas
pdDF.plot(x='word', y='count', kind='bar', rot=45)
display()