# Reputation First
## Public Opinion Research based on Media Article

## Distributed Internet Crawler
In this project, we use distributed Internet Spider to crawl media articles from different sources. However, it is not easy to crawl the history data from media website directly for following reasons.

+ Not only article links are listed in the website, also you could find the advertisement links, recommendation links and so on.
+ The website content is customized based on user’s habit and varies from different users in each single view.
+ The anti-crawler is setup for these popular websites to avoid an overflow network traffic.

Based on the observation above, we have to give up the attempts of fetching contents from the original websites.

However, it is important to note that saome popular newspapers and magazines provide the **archive database** for users publicly, which is usually the printable and readable version of articles. Compared with the website content, it provides following features:
+ It provides all articles published without customer bias.
+ There is no or much less advertisement and other links.
+ The content could be fetched from the html webpage directly by beautiful soup
+ Usually, the content is marked with clear timestamps.

Following are the archives for popular medias, we take the content from May.01 as an example here. Note that usually only the traditional media provides the archive as a summarization of past articles, while internet media or web media does not provide similar service.

**TypeA: Newspaper & Maganizes**

*Newyork Times*
  + http://spiderbites.nytimes.com/2018/articles_2018_05_00000.html
  + The database is updated each day to provide the articles officially. Able to be crawled.

*Wall Street Journals*
  + http://www.wsj.com/public/page/archive-2018-5-01.html
  + The database is updated each day to provide the articles officially. Able to be crawled.

*Washington Post*
  + http://www.washingtonpost.com/wp-adv/archives/copyright.htm
  + The archive is not timestamped, instead key word or topic based.

**TypeB: News Agency**

*BBC*: 
  + http://dracos.co.uk/made/bbc-news-archive/2018/05/01/
  + Collected by third-party organization with less reliability. Also it does not include all contents of the media.
  
*Reuters*
  + https://uk.reuters.com/resources/archive/uk/20180501.html
  + The database is updated each day to provide the articles officially. Able to be crawled.
  
*CNN*
  + CNN provides clear archive before 2001 with good format, however, the following materials are really confused without spicification. 


The newspaper crawler is shown as following:

In [2]:
import requests
from bs4 import BeautifulSoup
import re
import json
import threading
from newspaper import Article

In [None]:
'''
NewsParser: The Newspaper Crawler Class
'''
class NewsParser(threading.Thread):

    def __init__(self,source):
        '''
        Init Function:
        1. Iniralization as a python thread
        2. Define the source used in the parser
        '''
        threading.Thread.__init__(self)
        self.source = source

    def _fetch_links(self,url):
        '''
        1. Fetch the url links from archive database
        2. Parse the database page to get the article links
        '''
        links = []
        r = requests.get(url)
        if r.status_code!=200:
            print(url+":Not Correct Response")
        soup = BeautifulSoup(r.text,"html.parser")
        for link in soup.find_all('a'):
            rawlink = link.get('href')
            if re.match("(\S)+://www.nytimes.com/20(\d)+/(\d)+/(\d)+/(\S)+\.html",rawlink):
                links.append(rawlink)
        return links

    def _fetch_data(self,link):
        '''
        1. Get the articles and parse the articles by "newspaper" lib
        2. Add the realted field into the datum struct
        '''
        datum = {}
        article = Article(link)
        try:
            article.download()
        except:
            print(link+": Cannot be download!")
            return datum
        try:       
            article.parse()
        except:
            print(link+": Cannot be parsed!")
            return datum            
        datum["authors"] = article.authors
        datum["date"]    = str(article.publish_date)
        datum["text"]    = str(article.text)
        datum["title"]   = str(article.title)
        return datum
    
    def parse(self,source):
        '''
        Main Function to use the crawling function
        '''
        f = open("data_"+source.split("/")[-1],"a")
        links = self._fetch_links(source)
        count = 0
        for link in links:
            datum = self._fetch_data(link)
            if len(datum)==0:
                continue
            f.write(json.dumps(datum)+'\n')
            print("Fetch news from " + source + " : "+str(count)+"/"+str(len(links)))
            count += 1
        f.close()

    def run(self):
        '''
        Provide the interactive interface as a python thread
        '''
        self.parse(self.source)
             

The program shown above is a single thread internet crawler, however, the single thread crawler is too slower to crawl all newspaper articles. Following codes extend it into a multi-thread crawler.

In [None]:
def _get_targets():
    '''
    Read the "sourcelist" file to get the crawling resources    
    '''
    f = open("sourcelist")
    lines = f.readlines()
    f.close()
    result = []
    for line in lines:
        if line:
            result.append(line.strip())
    return result

def real_main():
    '''
    1. Intializes multi-threads based on sources, each of which uses one thread
    2. Coordinate multiple threads and wait for working threads end.
    '''
    sources = _get_targets()
    threads = []
    for source in sources:
        threads.append(NewsParser(source))
    for t in threads:
        t.start()
    for t in threads:
        t.join()

real_main()

## Data Cleaning by Spark (ETL)

In the next step, we use Spark to clean the raw dataset fetched from the crawler following these steps:

*Step1.* Filter out all articles of which **the company name is not included**.  Note that we match the company name by python regular expression within a case-insensitve style.

*Step2.* For articles remained, we parse the articles to get the **title**, **summary** and **tag** fields. The method to extract the summary from original text would be described in the next chapter.

*Step3.* Filter out articles which the company name is not included in title, summary and tag fields.

It is important to note that,
+ Sometimes the company name has different meanings, for example, the company name *"Adobe"* is used to describe a type of house as well which is used in some articles about earthquakes. Therefore, we use the summary function to identify the relation between articles and company names.
+ The summary extraction is a time-consuming task with complex algorithm. Hence, we use step1 as a croase filtering to reduce the overall workload.

### Step1: Coarse-Grained Data Filter

Coarse Data Filter is used to filter out the articles which is not related to the companies in the company list.First, we use nltk to tokenize text into a list of tokens. And then, we compare the tokens with the company list one by one. Finally, we add the tag name into the result stuct. 

In [None]:
import pyspark
import json
import nltk
from nltk import word_tokenize

def _init_line(line):
    name = line.lower().split()[0]
    return (name,line.lower().split())

def _init_list(sc):
    results = {}
    companyRDD = sc.textFile("gs://group688/companylist")
    coms = companyRDD.map(_init_line).collect()
    for com in coms:
        for name in com[1]:
              results[name] = com[0]
    return results   

def _data_filter(lines,company,source):
    import nltk
    nltk.download('punkt',download_dir='./nltk_data')
    nltk.data.path.append("./nltk_data")
    results = []
    for datum in lines:
        data    = json.loads(datum)
        authors = data["authors"]
        date    = data["date"]
        text    = data["text"]
        title   = data["title"]
        tokens_text  = word_tokenize(text.lower())
        tokens_title = word_tokenize(title.lower())
        tags = []
        for word in text.lower().split():
            if word[0]=="#":
            tags.append(word.lower())
        #Stat is a dictionary, key is the company name, and value is the attribute
        #attributes: [in_title,title_count,total_count]
        stat  = {}
        for token in tokens_title:
              if token in company:
                if company[token] in stat:
                    stat[company[token]][0] = True
                    stat[company[token]][1] += 1
                else:
                    stat[company[token]] = [True,1,0]
        for token in tokens_text:
              if token in company:
                if company[token] in stat:
                      stat[company[token]][2] += 1
                else:
                      stat[company[token]] = [False,0,1]
        for name in stat:
            result = {}
            if (source=="wsj"):
                result["date"]      = date[:5] + '0' + date[5:9]
            else:
                result["date"]      = date[:10]
        result["text"]        = text
        result["tokens"]      = tokens_text
        result["company"]     = name
        result["source"]      = source
        result["in_title"]    = stat[name][0]
        result["title_count"] = max(stat[name][1],title.lower().count(name))
        result["total_count"] = max(stat[name][2],text.lower().count(name))
        result["title"]       = title
        result["authors"]     = authors
        result["tags"]        = tags
        results.append((name,json.dumps(result)))
    return results

As we introduced, even the coarse data filter is really time-consuming. In our experiment, we use a single thread data filter running on 300MB raw dataset with 255k articles, and it takes more than 2 hours to complete. To accelerate the execution time in a GB-level dataset, we use the Spark+Yarn platform to execute the program in parallel on a 5 instance cluster.
The distributed cluster configuration is:
+ **1 Master Node **
  1. 4   CPUs
  2. 16  GB memory
  3. 100 GB storage
  
+ **4 Worker Nodes **
  1. 2  CPUs
  2. 12 GB memory
  3. 80 GB storage
 
 The spark program is included as following:

In [None]:
def real_main():
    sc = pyspark.SparkContext()
    company = _init_list(sc)
    dataRDD1 = sc.textFile("gs://group688/nytimes",5)
    dataRDD1 = dataRDD1.mapPartitions(lambda x:_data_filter(x,company,"nytimes"))
    dataRDD2 = sc.textFile("gs://group688/wsj",10)
    dataRDD2 = dataRDD2.mapPartitions(lambda x:_data_filter(x,company,"wsj"))
    dataRDD3 = sc.textFile("gs://group688/reuters.dat",10)
    dataRDD3 = dataRDD3.mapPartitions(lambda x:_data_filter(x,company,"reuters"))
    dataRDD  = dataRDD3.union(dataRDD2).union(dataRDD1)
    dataRDD.sortByKey().map(lambda x:x[1]).saveAsTextFile("gs://group688/688v1")

real_main()

To support the Spark execution on cloud platform, we uses the **Google Cloud Dataproc** service to deply the spark cluster efficiently. Following is the scipt to combine the spark program with the google dataproc cluster. 

In [None]:
gsutil rm -r gs://group688/688v1
gcloud dataproc jobs submit pyspark \
--cluster spark688 \
--region us-east1 \
etl.py

### Step2: Generate Article Summary

The second step is to generate the summary from the article, which helps further data cleaning. The principle and implementation of summary extraction would be introduced in next chapter. It also uses spark cluster to accelerate the execution.

In [None]:
from summa import summarizer
from summa import keywords
def generate_summary(text):
    abstract  = summarizer.summarize(text)

### Step3: Fine-Grained Data Filter

The fine-grained data filter is the data filter based on the

In [None]:
import time
import json
import datetime
import pyspark

def get_result_list(lines):
    from summa import summarizer
    from summa import keywords
    import nltk
    nltk.download('punkt',download_dir='./nltk_data')
    nltk.data.path.append("./nltk_data")
    result_list = []
    for line in lines:
        json_data = json.loads(line)
        text      = json_data["text"]
        abstract  = summarizer.summarize(text)
        keyword   = keywords.keywords(text, split=True)
        name = json_data['company']
        tags_words = list(map(lambda x: x[1:], json_data['tags']))
        abstract_words = list(map(lambda x: x.lower(), nltk.tokenize.word_tokenize(abstract)))
        title_words = list(map(lambda x: x.lower(), nltk.tokenize.word_tokenize(json_data['title'])))
        if abstract != '' and name not in abstract_words and name not in title_words and name not in tags_words:
            continue
        json_data['abstract'] = abstract
        json_data['keywords'] = keyword
        result_list.append(json.dumps(json_data))
    return result_list

if __name__=="__main__":
    sc = pyspark.SparkContext()
    dataRDD = sc.textFile("gs://group688/688v2.dat",20)
    dataRDD.mapPartitions(get_result_list).saveAsTextFile("gs://group688/688v3")

### Step3: Generate Article Summary

The second step is to generate the summary from the article, which helps further data cleaning. The principle and implementation of summary extraction would be introduced in next chapter.