# Hacker News Data Processing

In [1]:
# Here show picture of the inital code. Simple and pretty. 

In [2]:
# Then tell story of how it was beautiful but couldnt last. 

In [3]:
# Show Google BigQuery querys... 3.2gb... yikes! 

In [4]:
#!pip install google-cloud-bigquery
#!pip install textblob

from google.cloud import bigquery
from textblob import TextBlob
import bokeh
import numpy as np # linear algebra
import pandas as pd # data processing, CSV file I/O (e.g. pd.read_csv)
import html 
import dask
import re
import dask.dataframe as dd
from tqdm import tqdm, tqdm_pandas

# Define the BigQuery Client
client = bigquery.Client.from_service_account_json("winterrose-nlp-49041459bd3c.json")

# A Google BigQuery Function
def querytodf(query):
    query_job = client.query(query)
    
    iterator = query_job.result(timeout=60)
    rows = list(iterator)

    # Transform the rows into a nice pandas dataframe
    df = pd.DataFrame(data=[list(x.values()) for x in rows], columns=list(rows[0].keys()))
    
    return df

## Submit query to Google BigQuery

In [5]:
%%time

# Using WHERE reduces the amount of data scanned / quota used
query = """
SELECT hnc.id, 
       hnc.by,
       hnc.author,
       hnc.text, 
       hnc.time, 
       hnc.ranking, 
       hnc.deleted, 
       hnc.dead, 
       hnc.parent as sid,
       hns.by as sauthor,
       hns.time as stime,
       hns.title as stitle,
       hns.deleted as sdeleted,
       hns.dead as sdead,
       hns.score as score,
       hns.text as stext,
       hns.url as surl
FROM `bigquery-public-data.hacker_news.comments` as hnc
INNER JOIN `bigquery-public-data.hacker_news.stories`as hns ON hns.id  = hnc.parent
"""
df = querytodf(query)

CPU times: user 3min 3s, sys: 11.4 s, total: 3min 15s
Wall time: 12min 47s


### Inspect shape of completed query. Verify that all rows are present. 

In [15]:
df.shape

(2620593, 17)

In [16]:
df.head()

Unnamed: 0,id,by,author,text,time,ranking,deleted,dead,sid,sauthor,stime,stitle,sdeleted,sdead,score,stext,surl
0,1935438,tocomment,tocomment,What should I take away from this?,1290547754,5,,,1935059,atularora,1290541321,What Android is,,,212.0,,http://www.tbray.org/ongoing/When/201x/2010/11...
1,102967,wallflower,wallflower,I see a trend that indicates flattening of rev...,1201116956,5,,,102843,kirubakaran,1201104980,Apple stock plunges $30. Sky IS falling.,,,25.0,,http://finance.google.com/finance?q=AAPL
2,8165541,sobkas,sobkas,This remainds me of sd card hacking:\n<a href=...,1407794353,6,,,8164766,thefreeman,1407784981,BadUSB – On accessories that turn evil [pdf],,,84.0,,https://srlabs.de/blog/wp-content/uploads/2014...
3,10373528,glormph,glormph,Someone mentioned that FB may want to be the w...,1444644248,6,,,10372964,aestetix,1444633632,Global coalition tells Facebook to kill its Re...,,,78.0,,https://boingboing.net/2015/10/06/global-coali...
4,4209452,vamsikv,vamsikv,....,1341606025,7,,,4209294,diminium,1341603367,"Ask HN: App Store Devs, Are you guys still mot...",,,9.0,We now know that the vast majority of apps mak...,


### Save query results to CSV.

In [17]:
%%time
df.to_csv('data/hn_commentors_all.csv')

CPU times: user 56.3 s, sys: 1.62 s, total: 57.9 s
Wall time: 58.1 s


### Read CSV back into new Dataframe.

In [19]:
%%time
ds2 = pd.read_csv('data/hn_commentors_all.csv')

CPU times: user 22 s, sys: 1.64 s, total: 23.6 s
Wall time: 23.6 s


### Inspect and verify that all rows are present.

In [21]:
print(ds2.shape)
display(ds2.head(3))

(2620593, 18)


Unnamed: 0.1,Unnamed: 0,id,by,author,text,time,ranking,deleted,dead,sid,sauthor,stime,stitle,sdeleted,sdead,score,stext,surl
0,0,1935438,tocomment,tocomment,What should I take away from this?,1290547754,5,,,1935059,atularora,1290541321,What Android is,,,212.0,,http://www.tbray.org/ongoing/When/201x/2010/11...
1,1,102967,wallflower,wallflower,I see a trend that indicates flattening of rev...,1201116956,5,,,102843,kirubakaran,1201104980,Apple stock plunges $30. Sky IS falling.,,,25.0,,http://finance.google.com/finance?q=AAPL
2,2,8165541,sobkas,sobkas,This remainds me of sd card hacking:\n<a href=...,1407794353,6,,,8164766,thefreeman,1407784981,BadUSB – On accessories that turn evil [pdf],,,84.0,,https://srlabs.de/blog/wp-content/uploads/2014...


### Remove all  `author` and `text` NaN rows from Dataframe. 

In [22]:
nans = ds2.text.isna().sum()
print('This many nans:', nans)
ds2 = ds2.dropna(subset=['author', 'text'])
print('New Shape after nan removal:', ds2.shape)

This many nans: 80486
New Shape after nan removal: (2540107, 18)


## Sentiment Analysis and Text Cleaning

### Define utility functions

In [13]:
def encode_decode(text):
    """
    Utility function to clean text by decoding HTML text.
    """
    unescaped = html.unescape(text)
    return unescaped

def noHTML(text):
    """
    Utility function to clean text by removing HTML flags.
    """
    cleanr = re.compile('<.*?>')
    cleantext = re.sub(cleanr, ' ', text)
    return cleantext

def noURLS(text):
    """
    Utility function to clean text by removing links
    using simple regex statements.
    """
    return ''.join(re.sub(r"http\S+", "", text))

def get_sentiment(text):
    """
    Utility function to classify sentiment of passed text
    using textblob's sentiment method. Return the polarity
    score as a float within the range [-1.0, 1.0]
    """
    return TextBlob(text).sentiment.polarity

### Apply text cleaning to comment texts and create new column in Dataframe

In [14]:
tqdm.pandas(tqdm())
ds2['cleaned_comment'] = ds2.text.progress_apply(lambda x: noURLS(noHTML(encode_decode(x))))

0it [00:00, ?it/s]

AttributeError: 'Series' object has no attribute 'progress_apply'

### Apply sentiment analysis (TextBlob.polarity) to each cleaned Comment text. 

In [None]:
ds2['comment_sentiment'] = ds2['cleaned_comment'].apply(lambda x: get_sentiment(x))

### Remove `Unnamed: 0` Column

In [None]:
ds3 = ds2.loc[:, ~ds2.columns.str.match('Unnamed')]
ds3.head()

### Save to CSV 

In [None]:
ds3.to_csv('data/hn_all_w_sentiment_cleaned.csv',index=False)

## Load cleaned / analyzed data back into dataframe from CSV

In [None]:
%%time
# IMPORT FROM CSV's
ds4 = pd.read_csv('data/hn_all_w_sentiment_cleaned.csv')
print(ds4.shape)

In [None]:
ds4 = d4.loc[:, ~d4.columns.str.match('Unnamed')]

In [None]:
commentorList = ds4.by.unique().tolist()
print("There are this many unique commentors:", len(commentorList))
c_list = pd.DataFrame(commentorList)
c_list.columns = ['commentor']
display(c_list.head())

## Aggregate commentors' sentiment statistics and make final dataframe.

### Define aggreation function. 

In [None]:
def loopSentimentAggegator(i):  
    """
    Utility function that groups commment rows by 
    commentor and returns sentiment statistics and samples
    """
    # Select subdf for the selected author
    subdf = ds3[ds3['by'].values == i]
    # Commentor Name
    commentor = i
    # Create a float indicating commentor's mean sentiment score
    commentor_sentiment = subdf['comment_sentiment'].mean() 
    # Upvotes Mean
    commentor_upvotes_mean = subdf['ranking'].mean() 
    # Upvotes Total
    commentor_upvotes_total = subdf['ranking'].sum()
    # Total Happiness
    commentor_total_happyness = subdf[subdf['comment_sentiment'] > 0.0].comment_sentiment.sum() 
    # Total Saltiness
    commentor_total_saltiness = subdf[subdf['comment_sentiment'] < 0.0].comment_sentiment.sum() 
    # Third output, total number of commments
    total_comments = len(subdf.index)
    # Total salty comments
    qty_salty_comments = (subdf.comment_sentiment < 0.0).sum()
    # Total non-salty comments
    qty_non_salty_comments = (subdf.comment_sentiment > 0.0).sum()
    # Create the second output, a list of the commentor's saltiest comments. 
    salty_comments = subdf[['time','comment_sentiment','ranking','cleaned_comment', 'stitle']][0:9].to_json(orient='records')
    # Ten most positive comments
    sweet_comments = subdf[['time','comment_sentiment','ranking','cleaned_comment', 'stitle']].tail(10).to_json(orient='records')
    outputDF = pd.DataFrame([{ 'commentor': commentor, 
                                            'commentor_sentiment': commentor_sentiment, 
                                            'commentor_upvotes_mean': commentor_upvotes_mean,
                                            'commentor_upvotes_total': commentor_upvotes_total,
                                            'commentor_total_happyness': commentor_total_happyness,
                                            'commentor_total_saltiness': commentor_total_saltiness,
                                            'total_comments': total_comments,
                                            'qty_salty_comments': qty_salty_comments, 
                                            'qty_non_salty_comments': qty_non_salty_comments,
                                            'salty_comments': salty_comments, 
                                            'sweet_comments': sweet_comments} ])
    return outputDF

### Test function with single commentor to ensure output is good. 

In [None]:
testingDF = loopSentimentAggegator('eli')
display(testingDF.head())

### Apply aggregation functions by commentor to entire dataframe.

In [None]:
results = []
for j in tqdm(commentorList):
    newDF = loopSentimentAggegator(j)
    results.append(newDF)

### Concatenate aggregation outputs (list of dfs) into a single final dataframe.

In [None]:
finalTableResults = pd.concat(results)
print(finalTableResults.shape)
display(finalTableResults.head())

### Save final results to CSV.

In [None]:
finalTableResults.to_csv('data/hn_commentor_data.csv',index=False)

### Save final results to AVRO (just to be safe) :). 

In [None]:
pdx.to_avro('data/hn_commentor_data.avro', finalTableResults)

### Check lengths & tail to make sure it looks right. 

In [None]:
csvsaved = df.read_csv('data/hn_commentor_data.csv')
avrosaved = pdx.read_avro('data/hn_commentor_data.avro')

In [None]:
print('Saved csv shape:', csvsaved.shape)
print('Saved avro shape:', avrosaved.shape)

In [None]:
display(avrosaved.tail(3))
display(csvsaved.tail(3))

# SUCCESS!  Now just need to get it into a AWS RDS PostgreSQL instance. : )

### Bonus Material: The Graveyard - Ideas that didn't work.

* This didn't work because df.to_sql() is sloooooooow. Just send the CSV straight to PostgreSQL. 

``` python
def verify_output(pgres_engine, table_name):
    # ______  verify output-table contents ____
    query = 'SELECT * FROM ' + table_name + ' LIMIT 10;'
    for row in pgres_engine.execute(query).fetchall():
        print(row)
    return

def run_conversion(pgres_engine):
    # ___ process tables ____
    df = pdx.read_avro('data/hn_commentors_db.avro')
    schema_name = 'lambdaRPG'
    tables = ['commentor_data']
    df.to_sql(table_name,
              if_exists='replace',
              con=pgres_engine,
              schema=schema_name,
              chunksize=10)
    verify_output(pgres_engine, table_name)
    return

def runARVOtoSQL():
    # __ Connect to postgres (SQLalchemy.engine) ____
    dbname = ''
    user = ''
    host = ''
    password = ''
    file = open('aws.pwd', 'r')
    ctr = 1
    for line in file:
        line = line.replace('\n', '')
        if ctr == 1: dbname = line
        if ctr == 2: user = line
        if ctr == 3: host = line
        if ctr == 4: passw = line
        ctr = ctr + 1
    pgres_str = 'postgresql+psycopg2://'+user+':'+passw+'@'+host+'/'+dbname
    pgres_engine = create_engine(pgres_str)
    run_conversion(pgres_engine)
    print('Conversion successful.....')
    return

```

* This didn't work because I need to learn more dask. 
``` python
ds3['comment_sentiment_dask'] = ds3['cleaned_comment'].apply(lambda x: get_sentiment(x)).compute(scheduler='threads')```



* This also didn't work. Same reason.
``` python
dsr3 = ds2
dsr3['cleaned_comment'] = dsr3.text.apply(lambda x: noURLS(noHTML(encode_decode(x)))).compute()```

* This dask layout worked for a few parts but didn't want to thread. And it kept failing because of a deep error. 

``` python
dsr3 = dd.from_pandas(ds2, npartitions=2000)

finalDF = dsr2
def fin (daskDataframe):
    daskDataframe['comment_sentiment'] = daskDataframe.text.apply(lambda x: get_sentiment(noURLS(noHTML(encode_decode(x)))))
    daskDataframe['cleaned_comment'] = daskDataframe.text.apply(lambda x: noURLS(noHTML(encode_decode(x))))
    return finalDF

with ProgressBar():
    res = fin(dsr2).compute()``` 

* This is helpful. 

``` console
Where to find the dask distributed Bokeh dashboard on aws. 

URL of accessing Dask Dashboard will be:
https://myinstance.notebook.us-east-1.sagemaker.aws/proxy/8787/```

# Thanks for reading! 
