### Milestone2: Data Warehouse (HIVE)
WQD180102 Ng Wei Xin
WQD180104 Tan Bing Shien

In [1]:
import time
import pandas as pd

import requests
from bs4 import BeautifulSoup

from impala.dbapi import connect
from hdfs import InsecureClient

# using requests to fetch pages
# Note: Python Selenium library can be used for javascript pages (by imitating browser behaviour)
# bs4: Html-Parser (or XML), to navigate through html documents

# using impala to connect to HIVE (for Windows)
# using HDFS to connect to  Hadoop HDFS

In [3]:
# session connections 
class SessionConnections: 
    def __init__(self): 
        
        # Connection to HIVE: localhost, default port 10000, default user/password = hive
        conn = connect(host='localhost', port=10000, database='datamining', user='hive', password='hive', auth_mechanism="PLAIN")
        self.hivecursor = conn.cursor()

        # Connecting to Webhdfs: localhost, default port 50070
        self.hdfsclient = InsecureClient('http://localhost:50070')

In [8]:
## CRAWLING Function

## Crawling Source: TheStar Latest News

def crawl(objDF):
    page = requests.get('https://www.thestar.com.my/news/latest')
    contents = page.content

    soup = BeautifulSoup(contents, 'html.parser')

    for tag in soup.find_all(class_= "timeline-content", limit = 15):
        for subtag in tag.find('h2'):
            
            # News-ID
            idNews = "".join(line.strip() for line in subtag['data-content-id'].split("\n"))
                        
            # Check for duplication, proceed only for new item
            # else skip to save computation
            if not df['NewsID'].str.contains(idNews).any():
                
                # Headline
                strNews = "".join(line.strip() for line in subtag.string.split("\n"))
                
                # Category
                catNews = "".join(line.strip() for line in subtag['data-content-category'].split("\n"))
                
                # Timestamp (spider on respective news page)
                newsHref = subtag['href']
                subPage = requests.get(newsHref)
                subContent = subPage.content
                subSoup = BeautifulSoup(subContent, 'html.parser')
                try:
                    dateNews = "".join(line.strip() for line in subSoup.find(class_="date").string.split("\n"))
                    dateNews = dateNews.replace(",", " ")
                except:
                    dateNews = "-"
                try:
                    timeNews = "".join(line.strip() for line in subSoup.find(class_="timestamp").string.split("\n"))
                    timeNews = timeNews.replace(",", " ")
                except:
                    timeNews = "-"
                # Add info to list
                objDF = objDF.append(pd.Series([idNews, dateNews, timeNews, catNews, strNews, newsHref], index=objDF.columns), ignore_index=True)

    return objDF


In [None]:
# Create Empty DataFrame:
df = pd.DataFrame(columns=["NewsID", "PublishedDate", "PublishedTime", "Category", "Headline", "URL"])
df_prev = df # create temporary df

# Test: using i as counter, for recurring crawl.
# for endless loop, use while-True loop.
i = 2 
# while i>0:
while True:
    
    
    # Re-Crawl Latest News
    df_current = crawl(df_prev)
    #print(df)
    
    # get only unique values:
    df = pd.concat([df_current, df_prev])
    df = df.drop_duplicates(keep=False, inplace=False)

    # Proceed only if there is any new entry:
    if df.shape[0] != 0 :
        
        # save only last 20 crawls in df_prev (buffer for next crawl)
        df_prev = df_current.iloc[-20:, :]

        # Export dataframe to csv file (save to  LOCAL DIRECTROY)
        ## Use current DateTime to store unique csv file 
        curDateTime = time.strftime(r"%Y%m%d_%H%M", time.localtime())
        # df.to_csv(r'C:\Users\FORGE-15 I7\OneDrive - AsiaPay Limited\Sem 3\WQD7005 DATA MINING\dataset.csv', index = False)
        df.to_csv(r'C:\Users\ngwei\Desktop\Crawling\dataset_'+curDateTime+'.csv', index = False)

        # Create SESSION (hive / hdfs connection, to avoid disconnection from server)
        mySession = SessionConnections()

        # Writing Dataframe to hdfs, as csv
        with mySession.hdfsclient.write('/user/admin/datamining/tmpcsv.csv', encoding = 'utf-8') as writer:
            df.to_csv(writer, index=False)

        # load csv data into hive-table
        insertSQL = "LOAD DATA INPATH '/user/admin/datamining/tmpcsv.csv' INTO TABLE mynews"
        mySession.hivecursor.execute(insertSQL)
    
    # temporary counter-holder
    i=i-1
    
    # Re-Crawl after every x seconds
    time.sleep(300)


In [None]:
# ## ----------------------------------------------------------------
# ## WQD7005: Milestone 2, HIVE Data Warehouse
# ## Tan Bing Shien WQD180104, Ng Wei Xin WQD180102
# ## ----------------------------------------------------------------
# ## Writing Data into HIVE
# ## Step1: Write DataFrame (windows, local) into hdfs (HDP, VirtualBox)
# ## Step2: Load data (csv) into HIVE (database)
# ## ----------------------------------------------------------------

# # Re-Creating Session to avoid disconnection:
# mySession = SessionConnections()

# # Writing Dataframe to hdfs
# with mySession.hdfsclient.write('/user/admin/datamining/tmpcsv.csv', encoding = 'utf-8') as writer:
#     df.to_csv(writer, index=False)

# # insert csv data into temporary table
# insertSQL = "LOAD DATA INPATH '/user/admin/datamining/tmpcsv.csv' INTO TABLE mynews"
# mySession.hivecursor.execute(insertSQL)
# ## Note: Table created with skip.header.line.count=1 property, would skip first line (header) of csv file.

# # verify result
# showSQL = "SELECT * FROM mynews"
# mySession.hivecursor.execute(showSQL)
# print(mySession.hivecursor.fetchall())


In [None]:
# ------------------------
# # CREATE TABLE
# ------------------------
# create mynews table
# createSQL = "CREATE TABLE IF NOT EXISTS mynews \
#             (NewsID STRING, PublishedDate STRING, PublishedTime STRING, Category STRING, Headline STRING, URL STRING) \
#             ROW FORMAT DELIMITED FIELDS TERMINATED BY ',' STORED AS TEXTFILE tblproperties ('skip.header.line.count'='1') \
#             "
# cur.execute(createSQL)


# ------------------------
# # SELECT & INSERT DATA
# ------------------------

# # Select 
# cur.execute('SELECT * FROM mynews WHERE newsid > 50 LIMIT 1, 10')
# print(cur.fetchall())

# # Insert 
# # -- Note: very laggy on HIVE
# cur.execute('INSERT INTO testdata (test_id, name, item1, item2) VALUES (52, "myName", "myItem1", "myItem2")')
# print(cur.fetchall())


# ------------------------
# ## Build DataFrame 
# ------------------------
# data = pd.read_csv(r"C:\Users\ngwei\Desktop\testdata2.csv")
# print(data)

# ------------------------
# ## Create Connections to HIVE / HDFS 
# ------------------------
# # Connection to HIVE: localhost, default port 10000, default user/password = hive
# conn = connect(host='localhost', port=10000, database='datamining', user='hive', password='hive', auth_mechanism="PLAIN")
# cur = conn.cursor()

# # Connecting to Webhdfs: localhost, default port 50070
# client_hdfs = InsecureClient('http://localhost:50070')
# ------------------------
