## Analysis of News Dataset 
Our main goal is to 
1. Cluster similar news articles. 
2. Extract important topics from a large corpus of news articles.

In [69]:
import sparknlp
from sparknlp.base import *
from sparknlp.annotator import * 

spark = sparknlp.start()

### Loading the dataset using jdbc

In [70]:
df = spark.read.format("jdbc").options(url ="jdbc:sqlite:/home/sankethbk7777/News_dataset/all-the-news.db",  driver="org.sqlite.JDBC", dbtable = "longform").load()

In [71]:
# Loading the dataset 

#df = spark.read.option('header',True).csv('archive/articles2.csv')  # trial dataset 
df.printSchema()

root
 |-- id: decimal(38,18) (nullable = true)
 |-- title: string (nullable = true)
 |-- author: string (nullable = true)
 |-- date: string (nullable = true)
 |-- content: string (nullable = true)
 |-- year: integer (nullable = true)
 |-- month: integer (nullable = true)
 |-- publication: string (nullable = true)
 |-- category: string (nullable = true)
 |-- digital: integer (nullable = true)
 |-- section: string (nullable = true)
 |-- url: string (nullable = true)



In [60]:
df.count()  # no of rows

204135

In [61]:
df.first()

Row(id=Decimal('1.000000000000000000'), title='Agent Cooper in Twin Peaks is the audience: once delighted, now disintegrating', author='\nTasha Robinson\n', date='2017-05-31', content='      And never more so than in Showtime’s new series revival Some spoilers ahead through episode 4 of season 3 of Twin Peaks. On May 21st, Showtime brought back David Lynch’s groundbreaking TV series Twin Peaks, and fulfilled a prophecy in the process. In the second season finale, back in 1991, the spirit of series-defining murder victim Laura Palmer told FBI special agent and series protagonist Dale Cooper, “I’ll see you again in 25 years.” That clip plays again in the first episode of Lynch’s Twin Peaks revival, as a reminder that decades have in fact gone by, Laura’s promise has been carried out, and a series canceled mid-story is back on the air.A lot has changed in 25 years. The original cast members, who are mostly back on board, have all aged heavily and visibly. Many of the characters have moved

In [72]:
df.select("*").show(5)    # we don't have url for all rows

+--------------------+--------------------+-----------------+----------+--------------------+----+-----+-----------+--------+-------+-------+----+
|                  id|               title|           author|      date|             content|year|month|publication|category|digital|section| url|
+--------------------+--------------------+-----------------+----------+--------------------+----+-----+-----------+--------+-------+-------+----+
|1.000000000000000000|Agent Cooper in T...| 
Tasha Robinson
|2017-05-31|      And never m...|2017|    5|      Verge|Longform|      1|   null|null|
|2.000000000000000000|   AI, the humanity!|     
Sam Byford
|2017-05-30|      AlphaGo’s v...|2017|    5|      Verge|Longform|      1|   null|null|
|3.000000000000000000|   The Viral Machine|
Kaitlyn Tiffany
|2017-05-25|      Super Delux...|2017|    5|      Verge|Longform|      1|   null|null|
|4.000000000000000000|How Anker is beat...|     
Nick Statt
|2017-05-22|      Steven Yang...|2017|    5|      Verge|Lo

### Number of Non null values in each column

In [63]:
from pyspark.sql.functions import col, count, isnan, lit, sum
  
def count_not_null(c, nan_as_null=False):
    """Use conversion between boolean and integer
    - False -> 0
    - True -> 1
    """
    pred = col(c).isNotNull() & (~isnan(c) if nan_as_null else lit(True))
    return sum(pred.cast("integer")).alias(c)

df.agg(*[count_not_null(c) for c in df.columns]).show()

+------+------+------+------+-------+------+------+-----------+--------+-------+-------+-----+
|    id| title|author|  date|content|  year| month|publication|category|digital|section|  url|
+------+------+------+------+-------+------+------+-----------+--------+-------+-------+-----+
|204135|204135|172000|191532| 191181|191532|191532|     196420|  168713| 193115|  74572|98796|
+------+------+------+------+-------+------+------+-----------+--------+-------+-------+-----+



we will later drop all those columns in which both title and content are empty

## Data Preprocessing

We only need id , title and content columns to perform our analysis so we are dropping rest of the columns

In [73]:
# we don't need all columns we just need 3 columns id, title, content

columns_to_drop = ['author', 'date', 'year', 'month', 'url','conent','publication','_c0','digital', 'section','category']
df = df.drop(*columns_to_drop)

df.printSchema()

root
 |-- id: decimal(38,18) (nullable = true)
 |-- title: string (nullable = true)
 |-- content: string (nullable = true)



In [74]:
# we want to combine title and cotents of title and content column 
#also we need to maintain id's of documents for later identification

import pyspark.sql.functions as F

df2 = df.select('id',F.concat(F.col('title'), F.lit(" "), F.col('content')))
df2 = df2.withColumnRenamed('concat(title,  , content)', 'text')

print("First title = ")
print(df.select('title').first())
print("First content = ")
print(df.select('content').first())
print("Concatenated first title and content")
print(df2.select('text').first().asDict())

First title = 
Row(title='Agent Cooper in Twin Peaks is the audience: once delighted, now disintegrating')
First content = 
Row(content='      And never more so than in Showtime’s new series revival Some spoilers ahead through episode 4 of season 3 of Twin Peaks. On May 21st, Showtime brought back David Lynch’s groundbreaking TV series Twin Peaks, and fulfilled a prophecy in the process. In the second season finale, back in 1991, the spirit of series-defining murder victim Laura Palmer told FBI special agent and series protagonist Dale Cooper, “I’ll see you again in 25 years.” That clip plays again in the first episode of Lynch’s Twin Peaks revival, as a reminder that decades have in fact gone by, Laura’s promise has been carried out, and a series canceled mid-story is back on the air.A lot has changed in 25 years. The original cast members, who are mostly back on board, have all aged heavily and visibly. Many of the characters have moved on in life, getting new jobs, forming families,

In [79]:
def count_not_null(c, nan_as_null=False):
    """Use conversion between boolean and integer
    - False -> 0
    - True -> 1
    """
    pred = col(c).isNotNull() & (~isnan(c) if nan_as_null else lit(True))
    return sum(pred.cast("integer")).alias(c)

df2.agg(*[count_not_null(c) for c in df2.columns]).show()

+------+------+
|    id|  text|
+------+------+
|204135|191181|
+------+------+



We see that we have some null values in text column so we will drop them

In [83]:
df2 = df2.na.drop("any")

In [84]:
def count_not_null(c, nan_as_null=False):
    """Use conversion between boolean and integer
    - False -> 0
    - True -> 1
    """
    pred = col(c).isNotNull() & (~isnan(c) if nan_as_null else lit(True))
    return sum(pred.cast("integer")).alias(c)

df2.agg(*[count_not_null(c) for c in df2.columns]).show()

+------+------+
|    id|  text|
+------+------+
|191181|191181|
+------+------+



In [75]:
df2.show(5)

+--------------------+--------------------+
|                  id|                text|
+--------------------+--------------------+
|1.000000000000000000|Agent Cooper in T...|
|2.000000000000000000|AI, the humanity!...|
|3.000000000000000000|The Viral Machine...|
|4.000000000000000000|How Anker is beat...|
|5.000000000000000000|Tour Black Panthe...|
+--------------------+--------------------+
only showing top 5 rows



### Setting up the Data preprocessing Pipeline 
1. Convert the raw text into sparknlp's Document type using DocumentAssembler 
2. SentenceDetector - splits the document into sentences. 
3. Tokenizer - splits a document into tokens (words).
4. Normalizer - Removes all dirty characters from text following a regex pattern and transforms words based on a provided dictionary
4. StopWordsRemover - Removes all the stop words. 
5. Lemmatizer - converts words to their base forms. eg: studying , studies, studied all are converted to study. 

In [76]:
from pyspark.ml import Pipeline

document_assembler = DocumentAssembler() \
                        .setInputCol('text') \
                        .setOutputCol('document') 

sentenceDetector = SentenceDetector() \
                        .setInputCols(['document']) \
                        .setOutputCol('sentences')

tokenizer = Tokenizer() \
                .setInputCols(['sentences']) \
                .setOutputCol('token') \
                .setContextChars(['(', ')', '?', '!','"','-',"'",' ']) 

normalizer = Normalizer() \
                .setInputCols(['token']) \
                .setOutputCol('normal')



stop_words = StopWordsCleaner() \
        .setInputCols(["token"]) \
        .setOutputCol("cleanTokens")

lemmatizer = LemmatizerModel.pretrained(name="lemma_antbnc", lang="en") \
                .setInputCols(['cleanTokens']) \
                .setOutputCol('lemma') 
                

finisher = Finisher() \
    .setInputCols(["lemma"]) \
    .setIncludeMetadata(False)


preprocess_pipeline = Pipeline(stages = [document_assembler, sentenceDetector, tokenizer,normalizer, stop_words, lemmatizer,finisher])



lemma_antbnc download started this may take some time.
Approximate size to download 907.6 KB
[OK!]


In [9]:
preprocess_model = preprocess_pipeline.fit(df2)

transformed_df = preprocess_model.transform(df2)


In [131]:
#transformed_df.printSchema()

# Now we can notice the difference between preprocessed text and orevious text

transformed_df.show(10)

+-----+--------------------+--------------------+
|   id|                text|      finished_lemma|
+-----+--------------------+--------------------+
|73471|Patriots Day Is B...|[Patriots, Day, B...|
|73472|A Break in the Se...|[Break, Search, O...|
|73474|Obama’s Ingenious...|[Obama’s, Ingenio...|
|73475|Donald Trump Meet...|[Donald, Trump, M...|
|73476|Trump: ’I Think’ ...|[Trump:, ’I, Thin...|
|73477|Seth Meyers Quest...|[Seth, Meyers, Qu...|
|73478|Obama Frames His ...|[Obama, Frames, E...|
|73479|The Trump Adminis...|[Trump, Administr...|
|73484|The Longstanding ...|[Longstanding, Cr...|
|73485|The Atlantic Dail...|[Atlantic, Daily:...|
+-----+--------------------+--------------------+
only showing top 10 rows



In [58]:
transformed_df.select('finished_lemma').first()

Row(finished_lemma=['Patriots', 'Day', 'Best', 'Digs', 'Past', 'Heroism', 'Patriots', 'Day,', 'Peter', 'Berg’s', 'new', 'thriller', 'recreate', '2013', 'Boston', 'Marathon', 'bomb', 'ensuing', 'manhunt', 'follow', 'it,', 'surprisingly', 'oblique,', 'morally', 'ambiguous', 'movie', 'typically', 'straightforward', 'filmmaker.', 'Patriots', 'Day', 'take', 'unexpectedly', 'cynical', 'view', 'chaos,', 'rash', 'bureaucratic', 'infighting', 'follow', 'bomb', '—', 'question', 'whether', 'Berg’s', 'intend', 'message.', 'grim', 'run', 'time,', 'movie', 'celebrate', 'man', 'ground', 'help', 'bring', 'bomber', 'justice,', 'it’s', 'glimpse', 'something', 'complicated', 'jingoism', 'really', 'linger.', 'scene', 'good', 'illustrate', 'dichotomy', 'come', 'late', 'Patriots', 'Day.', 'search', 'one', 'bombers,', 'Dzhokhar', 'Tsarnaev,', 'intensifies,', 'brother’s', 'wife', 'Katherine', '(', 'Melissa', 'Benoist', ')', 'bring', 'interrogation', 'connection', 'case.', 'ask', 'lawyer,', 'protest', '“has', 

### Since considering all words for the training would be expensive we will consider only first 100 words from each article 
We are making an assumption that the main content of the article is somewhat understood by first 100 words.
We are considering words of length greater than or equal to 3 as words of length less than 3 may not be useful

In [87]:
def filter100words(row):
    wordList = row
    newWordList = []
    i = 0

    while (len(newWordList) < 100):
        
        if i >= len(wordList):
            break
        if len(wordList[i]) >= 3:
            newWordList.append(wordList[i])
        i += 1 
        
    return newWordList
    


In [88]:
from pyspark.sql.types import *

udfSomeFunc = F.udf(filter100words, ArrayType(StringType()))
final_df = transformed_df.withColumn("final_words", udfSomeFunc("finished_lemma"))

In [89]:
final_df.printSchema()

root
 |-- id: string (nullable = true)
 |-- text: string (nullable = true)
 |-- finished_lemma: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- final_words: array (nullable = true)
 |    |-- element: string (containsNull = true)



In [90]:
final_df = final_df.drop("text", "finished_lemma")

In [91]:
final_df.show()

+-----+--------------------+
|   id|         final_words|
+-----+--------------------+
|73471|[Patriots, Day, B...|
|73472|[Break, Search, O...|
|73474|[Obama’s, Ingenio...|
|73475|[Donald, Trump, M...|
|73476|[Trump:, Think’, ...|
|73477|[Seth, Meyers, Qu...|
|73478|[Obama, Frames, E...|
|73479|[Trump, Administr...|
|73484|[Longstanding, Cr...|
|73485|[Atlantic, Daily:...|
|73486|[Pledge, Pentagon...|
|73487|[Atlantic, Politi...|
|73488|[Contradictions, ...|
|73489|[Sanctions, Skept...|
|73490|[Baltimore, Polic...|
|73491|[Young, Pope, Ima...|
|73492|[DeVos, Hearings:...|
|73493|[Donald, Trump, S...|
|73494|[‘One-Stop, Shop’...|
|73495|[Rich, Students, ...|
+-----+--------------------+
only showing top 20 rows



In [92]:
final_df.first()

Row(id='73471', final_words=['Patriots', 'Day', 'Best', 'Digs', 'Past', 'Heroism', 'Patriots', 'Day,', 'Peter', 'Berg’s', 'new', 'thriller', 'recreate', '2013', 'Boston', 'Marathon', 'bomb', 'ensuing', 'manhunt', 'follow', 'it,', 'surprisingly', 'oblique,', 'morally', 'ambiguous', 'movie', 'typically', 'straightforward', 'filmmaker.', 'Patriots', 'Day', 'take', 'unexpectedly', 'cynical', 'view', 'chaos,', 'rash', 'bureaucratic', 'infighting', 'follow', 'bomb', 'question', 'whether', 'Berg’s', 'intend', 'message.', 'grim', 'run', 'time,', 'movie', 'celebrate', 'man', 'ground', 'help', 'bring', 'bomber', 'justice,', 'it’s', 'glimpse', 'something', 'complicated', 'jingoism', 'really', 'linger.', 'scene', 'good', 'illustrate', 'dichotomy', 'come', 'late', 'Patriots', 'Day.', 'search', 'one', 'bombers,', 'Dzhokhar', 'Tsarnaev,', 'intensifies,', 'brother’s', 'wife', 'Katherine', 'Melissa', 'Benoist', 'bring', 'interrogation', 'connection', 'case.', 'ask', 'lawyer,', 'protest', '“has', 'right

### Making a countvectorizer 

**Countvectorizer is a sparse matrix compression technique which works by creating a vocabulary of words and giving numerical indexes to them. The numerical index is used to replace the actual word.**
Countvectorizer is matrix with all unique words in rows and all documents in columns with each cell representing the frequency of that word in that document. 
Since this is a sparse matrix , we use library functions which are optimized to store spaarse matrices. 

In [96]:
from pyspark.ml.feature import CountVectorizer , CountVectorizerModel

cv = CountVectorizer(inputCol='final_words', outputCol='features', vocabSize=10000, minDF=3.0)

During the fitting process, CountVectorizer will select the top vocabSize words ordered by term frequency across the corpus. An optional parameter minDF also affects the fitting process by specifying the minimum number (or fraction if < 1.0) of documents a term must appear in to be included in the vocabulary. Another optional binary toggle parameter controls the output vector. If set to true all nonzero counts are set to 1. This is especially useful for discrete probabilistic models that model binary, rather than integer, counts.

In [97]:
# train the model 

cv_model = cv.fit(final_df.limit(10000))

# transform the data. Output column name will be features.

vectorized_tokens = cv_model.transform(final_df.limit(10000))

## LDA - Latent Dirichlet Allocation
he latent Dirichlet allocation (LDA) is a generative statistical model that allows sets of observations to be explained by unobserved groups that explain why some parts of the data are similar. and

We have to input the number of topics it categorizes some relevant keywords into each topic. 
This could be useful in cases like : we want to know about all the important events happened in 2018 in India from a large bunch of news articles. 

Eg output: 
- topic 1 : President, parliament, prime minister, election, budget. 
- topic 2 : Criclet, World cup, India, lost, ... 


In [98]:
from pyspark.ml.clustering import LDA
num_topics = 10
lda = LDA(k=num_topics, maxIter=100)
model = lda.fit(vectorized_tokens)
ll = model.logLikelihood(vectorized_tokens)
lp = model.logPerplexity(vectorized_tokens)
print("The lower bound on the log likelihood of the entire corpus: " + str(ll))
print("The upper bound on perplexity: " + str(lp))

The lower bound on the log likelihood of the entire corpus: -4695365.008124288
The upper bound on perplexity: 8.155200786666958


In [99]:
# extract vocabulary from CountVectorizer
vocab = cv_model.vocabulary
topics = model.describeTopics()   
topics_rdd = topics.rdd
topics_words = topics_rdd\
       .map(lambda row: row['termIndices'])\
       .map(lambda idx_list: [vocab[idx] for idx in idx_list])\
       .collect()
for idx, topic in enumerate(topics_words):
    print("topic: {}".format(idx))
    print("*"*25)
    print(topic)
    print("*"*25)

topic: 0
*************************
['student', 'school', 'college', 'University', 'percent', 'high', 'year', 'education', 'new', 'black']
*************************
topic: 1
*************************
['Please', 'ads.', 'write', 'stories,', 'follow', 'continue', 'step', 'block', 'great', 'display']
*************************
topic: 2
*************************
['Cohen', 'discussions,', 'Fueling', 'Capsule”', 'Activists', '“Trump', 'Trump?”', 'transition', '“How', 'Much']
*************************
topic: 3
*************************
['Trump', 'want', 'say', 'one', 'receive', 'update', 'partner', 'make', 'sponsors.', 'Donald']
*************************
topic: 4
*************************
['generic', 'Affleck', 'FDA', 'petition', 'pharmaceutical', 'Flynn’s', 'Cohen', 'frequent', 'mechanical', 'Florida']
*************************
topic: 5
*************************
['Sims', 'David', 'Stewart', 'film', 'Week', 'Spencer', 'Samantha', 'Bee', 'Kornhaber', 'new']
*************************
topic: 6
***

## K-Means

In [100]:
from pyspark.ml.feature import HashingTF, IDF


hashingTF = HashingTF(inputCol = "final_words", outputCol = "rawFeatures", numFeatures=2000)
idf = IDF(inputCol="rawFeatures", outputCol="features", minDocFreq=5)

In [101]:
from pyspark.ml.clustering import KMeans

kmeans = KMeans(k=10).setMaxIter(200)

In [102]:
kmeansPipeline = Pipeline(stages = [hashingTF, idf, kmeans])

model = kmeansPipeline.fit(final_df.limit(10000))

results = model.transform(final_df.limit(10000))

In [103]:
results.printSchema()

root
 |-- id: string (nullable = true)
 |-- final_words: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- rawFeatures: vector (nullable = true)
 |-- features: vector (nullable = true)
 |-- prediction: integer (nullable = false)



In [104]:
results.groupBy("prediction").count().show()

+----------+-----+
|prediction|count|
+----------+-----+
|         0| 9578|
|         7|  405|
|         3|    1|
|         4|    1|
|         2|   10|
|         1|    1|
|         5|    1|
|         8|    1|
|         6|    1|
|         9|    1|
+----------+-----+



In [106]:
import pandas as pd
import os
import nltk
from nltk.tokenize import RegexpTokenizer
from nltk.corpus import stopwords

import numpy as np

from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.cluster import KMeans, MiniBatchKMeans 

vectorizer = TfidfVectorizer(max_features=200000, lowercase=True,
                             min_df=5, stop_words='english',
                             use_idf=True)


In [107]:
py_df = final_df.limit(10000).toPandas()

In [110]:
py_df.head()

Unnamed: 0,id,final_words
0,73471,"[Patriots, Day, Best, Digs, Past, Heroism, Pat..."
1,73472,"[Break, Search, Origin, Complex, Life, Norse, ..."
2,73474,"[Obama’s, Ingenious, Mention, Atticus, Finch, ..."
3,73475,"[Donald, Trump, Meets,, Assails,, Press, Updat..."
4,73476,"[Trump:, Think’, Hacking, Russian, Updated, 12..."


In [111]:
py_df.shape

(10000, 2)

In [112]:
vectorizer = TfidfVectorizer(max_features=5000,
                             min_df=5, 
                             use_idf=True)

s = list(py_df["final_words"])
docs = [" ".join(i) for i in s]
X = vectorizer.fit_transform(docs)

#Apply K-means to create cluster
from time import time

model = KMeans(n_clusters=10, init='k-means++', max_iter=200, n_init=1)


In [113]:
model.fit(X)

KMeans(max_iter=200, n_clusters=10, n_init=1)

In [114]:
labels = model.labels_
ids = np.array(py_df['id'])

In [115]:
result_labels = pd.DataFrame(list(zip(ids, labels)), columns = ["ids", "labels"])

In [116]:
result_labels["labels"].value_counts()

1    7832
2    1743
3     405
5       6
7       5
6       4
4       2
9       1
8       1
0       1
Name: labels, dtype: int64