# Interactive news Analyzer
## Advanced Data Science Capstone project
### by Anton Dziavitsyn 2019

## Part 1: Data extraction

### Stage 1: download RAW data from newsapi

**SETUP stage**  
*Create spark context connected to Cloudant database*

In [1]:
# Cloudant credentials
cloudant_credentials = {
    "host": "REMOVED ON PUBLISHING",
    "custom_url": "REMOVED ON PUBLISHING",
    "username": "REMOVED ON PUBLISHING",
    "password": "REMOVED ON PUBLISHING"
}

# Spark session with attached cloudant storage
spark = SparkSession\
    .builder\
    .appName("Interactive news Analyzer")\
    .config("cloudant.host",cloudant_credentials['custom_url'].split('@')[1])\
    .config("cloudant.username", cloudant_credentials['username'])\
    .config("cloudant.password",cloudant_credentials['password'])\
    .config("createDBOnSave","true")\
    .config("jsonstore.rdd.partitions", 1)\
    .getOrCreate()

**DOWNLOAD headlines stage**  
*Download data about news sources and headlines from news api, and create dataframes to save them to Cloudant DB*

In [2]:
#inputs
from pyspark.sql import Row
import requests

# Key to news API
NEWS_API_KEY = "REMOVED ON PUBLISHING"

# Helpers

# function to create DF from list of dicts
def create_DF(list_of_dicts):
    return spark.createDataFrame(Row(**x) for x in list_of_dicts)

# Get all news sources and categories for English language news
source_url = "https://newsapi.org/v2/sources?language=en&apiKey={0}".format(NEWS_API_KEY)
response = requests.get(source_url).json()
#create dataframe
df_raw_sources = create_DF(response['sources'])
df_raw_sources.cache()
# Write news sources data to Cloudant DB
df_raw_sources.write.format("com.cloudant.spark").save("raw_news_sources")
# Show info about data
print("News sources downloaded: {0}".format(df_raw_sources.count()))
print("First 10 sources:")
df_raw_sources.show(10)

# Then we get TOP headline news for every source
# function to get headlines by source
def get_headlines(source):
    try:
        source_url = "https://newsapi.org/v2/top-headlines?sources={0}&apiKey={1}".format(source.id, NEWS_API_KEY)
        response = requests.get(source_url).json()
        return response['articles']
    except:
        return None

# map function to get list of headlines
list_headlines = []
for news in df_raw_sources.rdd.map(get_headlines).collect():
    if news is not None:
        list_headlines = list_headlines + news
#create dataframe
df_raw_headlines = create_DF(list_headlines)
df_raw_headlines.cache()
# Show info about data
print("News headlines downloaded: {0}".format(df_raw_headlines.count()))
print("First 10 headlines:")
df_raw_headlines.show(10)

# Write headlines data to Cloudant DB
#df_raw_headlines.write.format("com.cloudant.spark").save("raw_news_headlines")

News sources downloaded: 91
First 10 sources:
+----------+-------+--------------------+--------------------+--------+--------------------+--------------------+
|  category|country|         description|                  id|language|                name|                 url|
+----------+-------+--------------------+--------------------+--------+--------------------+--------------------+
|   general|     us|Your trusted sour...|            abc-news|      en|            ABC News|https://abcnews.g...|
|   general|     au|Australia's most ...|         abc-news-au|      en|       ABC News (AU)|http://www.abc.ne...|
|   general|     us|News, analysis fr...|  al-jazeera-english|      en|  Al Jazeera English|http://www.aljaze...|
|technology|     us|The PC enthusiast...|        ars-technica|      en|        Ars Technica|http://arstechnic...|
|   general|     us|The AP delivers i...|    associated-press|      en|    Associated Press| https://apnews.com/|
|  business|     au|The Australian Fi...|a

**Check RAW data from newsapi**

In [3]:
from pyspark.sql.functions import countDistinct

# get distinct autors from raw headlines data
print("Distinct authors:")
df_raw_headlines.agg(countDistinct("author")).show()

Distinct authors:
+----------------------+
|count(DISTINCT author)|
+----------------------+
|                   464|
+----------------------+



**Conclusions:**
+ We have headlines collection, with descriptions and links to articles
+ We have sources collection with info about publishers

### Stage 2: download articles HTML and extract publication text

**DOWNLOAD and CLEAN articles data**

In [4]:
# imports
from bs4 import BeautifulSoup
from pyspark.sql.functions import udf
from pyspark.sql.types import *
import pyspark.sql.functions as F

# remove duplicate lines and return Clean Text as list of lines
def remove_dupe_lines(text):
    lines_seen = set()
    result = []
    for line in text.split('\n'):
        if line not in lines_seen:
            result.append(line)
            lines_seen.add(line)
    return result

# function intersect 2 arrays (lists) and return intersection result
def textIntersection(arr1, arr2):
    return list(filter(lambda x: x in arr1, arr2))

# get and clean article text
def get_article_text(article_url):
    try:
        html = requests.get(article_url).text
        soup = BeautifulSoup(html, "lxml")

        # kill all script and style elements
        for script in soup(["script", "style"]):
            script.extract()

        # get text
        text = soup.get_text()

        # break into lines and remove leading and trailing space on each
        lines = (line.strip() for line in text.splitlines())
        # break multi-headlines into a line each
        chunks = (phrase.strip() for line in lines for phrase in line.split("  "))
        # drop blank lines
        text = '\n'.join(chunk for chunk in chunks if chunk)
        # remove duplication lines
        text_lines = remove_dupe_lines(text)
        # remove linebreaks in lines
        text_lines = [line.replace('/n', ' ').replace('/r', ' ').replace('  ', ' ') for line in text_lines]
        
        return text_lines
    except:
        return []

# Download and clean articles HTML to get clear text (list of lines)
udfGetArticleText = udf(get_article_text, ArrayType(StringType()))
df_raw_articles = df_raw_headlines.withColumn("article", udfGetArticleText("url")).filter(F.size(F.col("article")) > 0)
df_raw_articles.cache()

# Write articles data to Cloudant DB
#df_raw_articles.write.format("com.cloudant.spark").save("raw_headlines_with_articles")

# Show info about data
print("News articles downloaded: {0}".format(df_raw_articles.count()))
print("First 10 articles records:")
df_raw_articles.show(10)

News articles downloaded: 852
First 10 articles records:
+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+
|              author|             content|         description|         publishedAt|              source|               title|                 url|          urlToImage|             article|
+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+
|    Lucien Bruggeman|President Donald ...|The longtime Trum...|2019-01-29T00:00:00Z|Map(name -> ABC N...|Roger Stone expec...|https://abcnews.g...|https://s.abcnews...|[Roger Stone expe...|
|The Associated Press|California regula...|PG&E faces billio...|2019-01-29T00:00:00Z|Map(name -> ABC N...|California regula...|https://abcnews.g...|https://s.abcnews...|[PG&E file

**Removing site related text**  
*Now we have the articles text, but this text contains site related text (headers, buttons etc).*  
*We need to intersect articles lines from one site to find common site text, and remove it from article. (that is why I store articles as lines collection)*

In [5]:
from functools import reduce

# function for List intersection
def intersection(lst1, lst2): 
    lst3 = [value for value in lst1 if value in lst2] 
    return lst3

# function for intersect all articles in list
def intersect_lists(lst):
    if len(lst) > 1:
        return reduce((lambda x, y: intersection(x, y)), lst)
    else:
        return [[]]

udfItersectLists = udf(intersect_lists, ArrayType(StringType()))
# intersect all articles with groupping by article source
df_site_related = df_raw_articles.groupBy('source.id').agg(F.collect_list('article').alias('site_lines')).withColumn('site_lines', udfItersectLists('site_lines'))
df_site_related.cache()

#function to return clean article text (with rows which are not in site data)
# returns clean text without line breaks
def get_clean_article(article, site_data):
    return ' '.join(row for row in article if row not in set(site_data))

udfGetCleanText = udf(get_clean_article, StringType())

df_clean_articles = df_raw_articles.join(df_site_related, df_raw_articles.source.id == df_site_related.id, how='left').withColumn('article', udfGetCleanText('article', 'site_lines')).drop('site_lines', 'df_site_related.id')
df_clean_articles.cache()
# Write articles data to Cloudant DB
df_clean_articles.write.format("com.cloudant.spark").save("clean_articles")

# Show info about data
print("Clean articles: {0}".format(df_clean_articles.count()))
print("First 10 articles records:")
df_clean_articles.show(10)

Clean articles: 852
First 10 articles records:
+----------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+
|          author|             content|         description|         publishedAt|              source|               title|                 url|          urlToImage|             article|                  id|
+----------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+
| Katherine Blunt|PG&amp;E Corp. fi...|California utilit...|2019-01-29T08:22:00Z|Map(name -> The W...|PG&E Files for Ba...|https://www.wsj.c...|https://images.ws...|PG&E Files for Ba...|the-wall-street-j...|
| Robert McMillan|Apple Inc. scramb...|Apple scrambled t...|2019-01-29T05:50:00Z|Map(name -> The W...|Apple Bug Enables..

**Let us look into one article text - it is clean enough for processing**

In [6]:
df_clean_articles.limit(1)[['article']].collect()[0][0]

"PG&E Files for Bankruptcy Following California Wildfires - WSJ Dow Jones, a News Corp companyNews Corp is a network of leading companies in the worlds of diversified media, news, education, and information servicesDow JonesBarron'sBigChartsDJXDow Jones NewswiresFactivaFinancial NewsMansion GlobalMarketWatchNewsPlusPrivate MarketsRisk & ComplianceWSJ ConferenceWSJ ProWSJ VideoWSJ.comNews CorpBig DecisionsBusiness SpectatorCheckout51Harper CollinsHousingMakaanNew York PostNews America MarketingPropTigerREArealtor.comStoryfulThe AustralianThe SunThe TimesUnrulySubscribeSign InThe Wall Street JournalEurope EditionU.S.AsiaEuropeIndia中国 (China)日本 (Japan)January 28, 2019Print EditionVideoHomeWorldRegionsAfricaAsiaCanadaChinaEuropeLatin AmericaMiddle EastSectionsEconomyMoreWorld VideoU.S.SectionsEconomyLawNew YorkPoliticsColumns & BlogsReal Time EconomicsWashington WireMoreJournal ReportU.S. VideoWhat's News PodcastPoliticsBlogsWashington WireMorePolitics VideoWSJ/NBC News PollEconomyBlogsRea

**Now we have: Headline articles collection with clean article text, and info about publisher**

## END of Part 1: Data extraction
### by Anton Dziavitsyn 2019