#### Batch vs. Streamin Processing
*[Source](https://thenewstack.io/the-big-data-debate-batch-processing-vs-streaming-processing/)*
## Definitions
- A batch is a collection of data points that have been grouped together within a specific time interval. Another term often used for this is a window of data. 
- Streaming processing deals with continuous data and is key to turning big data into fast data. Both models are valuable and each can be used to address different use cases. 

While the batch processing model requires a set of data collected over time, streaming processing requires data to be fed into an analytics tool, and in real-time. Batch processing is often used when dealing with large volumes of data or data sources from legacy systems, where it’s not feasible to deliver data in streams. Batch data also by definition requires all the data needed for the batch to be loaded to some type of storage, a database or file system to then be processed. 

Data streams can also be involved in processing large quantities of data, but batch works best when you don’t need real-time analytics. 


#### Mining Newsfeed data
## 1. API setup
1. Create an account at: https://newsapi.org/register
2. Save API Key
  - Optional: save as environment variable.
      - Linux/OS: https://www.digitalocean.com/community/tutorials/how-to-read-and-set-environmental-and-shell-variables-on-linux
          - Command Line
          - Write file ~/.bashrc
              - `NEWS_API_KEY="<Your Key>"`
          - run `source ~/.bashrc`
      - Windows: http://codevba.com/office/environ.htm#.YAkatZP0lhE

First, let's understand how does the API work. https://newsapi.org/docs/get-started

Data example for `sources` endpoint
```JSON
{
    "status": "ok",
    -"sources": [
    -{
        "id": "abc-news",
        "name": "ABC News",
        "description": "Your trusted source for breaking news, analysis, exclusive interviews, headlines, and videos at ABCNews.com.",
        "url": "https://abcnews.go.com",
        "category": "general",
        "language": "en",
        "country": "us"
        },
    -{
        "id": "abc-news-au",
        "name": "ABC News (AU)",
        "description": "Australia's most trusted source of local, national and world news. Comprehensive, independent, in-depth analysis, the latest business, sport, weather and more.",
        "url": "http://www.abc.net.au/news",
        "category": "general",
        "language": "en",
        "country": "au"
    },
    ...
}
```

---

## 2. Let's start defining our class
```Python
class NewsConsumer:
    NEWS_API_KEY_NAME = "NEWS_API_KEY"
    BASE_URL="https://newsapi.org/v2/everything?"

    def __init__(self):
        global NEWS_API_KEY_NAME
        global BASE_URL
        self.num_requests=0
```

---

### Questions to ask
- What is my API limit? Can I find it in the documentation?
- Should I stablish a limit? How to determine a limit? Can I input the limit to the API?
- Is it going to be a flow? batches? 

Let's also check: https://newsapi.org/pricing

Let's set up a limit of 50 pages.

## 3. Make a request

```Python
import os
import requests
import urllib.parse as urlparse
from urllib.parse import urlencode
class NewsConsumer:
    
    NEWS_API_KEY_NAME = "NEWS_API_KEY"
    BASE_URL = "https://newsapi.org/v2/everything?"
    REQUESTS_LIMIT = 100

    def __init__(self):
        self.num_requests=0
        
    def makeRequest(self, q: str, page: int, language: str = "en", page_size: int = 100) -> str:
        if self.num_requests > NewsConsumer.REQUESTS_LIMIT:
            return ""
        assert page_size > 0, "page_size can't be lesser than 0"
        assert page > 0, "pagination variable can't be a negative number"
        url_parts = list(urlparse.urlparse(NewsConsumer.BASE_URL))
        query = dict(urlparse.parse_qsl(url_parts[4]))
        query.update({'q':q, 'language':language, 'pageSize':page_size, 'page':page, 'apiKey':os.getenv(NewsConsumer.NEWS_API_KEY_NAME)})
        url_parts[4] = urlencode(query)
        self.num_requests+=page_size
        return requests.get(urlparse.urlunparse(url_parts))
```

# *Pickle
Python pickle module is used for serializing and de-serializing a Python object structure. Any object in Python can be pickled so that it can be saved on disk. What pickle does is that it “serializes” the object first before writing it to file. Pickling is a way to convert a python object (list, dict, etc.) into a character stream. The idea is that this character stream contains all the information necessary to reconstruct the object in another python script.

[Examples](https://www.geeksforgeeks.org/understanding-python-pickling-example/)
```Python
import pickle
consumer = NewsConsumer()
articles = consumer.makeRequest("vaccine", 1)
# more on modes: https://www.w3schools.com/python/ref_func_open.asp
pickle.dump( articles, open( "save.p", "wb" ) )
```

## 4. Schema Definition

- What information is relevant?
- Pre-processing vs. Raw Information
- What is the objective?
- What are the restrictions? Memory restrictions, Network restrictions, etc. 

---

Columns:
- Id: good practice to include an identifier
- text: raw text 

install library: `!pip3 install psycopg2-binary`

Script
```Python
import psycopg2 
conn = psycopg2.connect("dbname=postgres user=aaronsapa")
cur = conn.cursor()
cur.execute("""
CREATE TABLE documents_raw(
   id SERIAL PRIMARY KEY,
   text VARCHAR NOT NULL
);
""")
cur.close()
conn.commit()
```

## 5. Batch insert

```Python
import psycopg2.extras
from typing import Dict
# Let's move this later to a util file.
def saveTextBatch(connection, articles: Dict[str, Any])-> None:
    with connection.cursor() as cursor:
        iter_articles = ({'text': article['title']} for article in articles)
        psycopg2.extras.execute_batch(cursor, """
        INSERT INTO documents_raw(text) VALUES (%(text)s);
        """, iter_articles)
        cursor.close()
    connection.commit()
```

Let's test it!

```Python
consumer = NewConsumer()
articles = consumer.makeRequest("vaccine", 3, page_size=10)
connection = psycopg2.connect("dbname=postgres user=aaronsapa")
saveTextBatch(connection, articles['articles'])
connection = psycopg2.connect("dbname=postgres user=aaronsapa")
cur = connection.cursor()
# Never fetch all into local!
cur.execute("SELECT * FROM documents_raw;")
cur.fetchall()
cur.close()
conn.close()
````

In [27]:
import pandas as pd

def getTextDF() -> pd.DataFrame:
    connection = psycopg2.connect("dbname=postgres user=aaronsapa")
    cur = connection.cursor()
    cur.execute("SELECT * FROM documents_raw;")
    df = pd.DataFrame(cur.fetchall(), columns=['id','text'])
    df.set_index(['id'], inplace=True)
    cur.close()
    connection.close()
    return df

## Some code refactor

In [3]:
import psycopg2
import psycopg2.extras
from typing import Dict, Any, NoReturn
import pandas as pd
class Connector():
    def __init__(self, db_name: str, user: str):
        self.db_name=db_name
        self.user=user
        
    def queryTransaction(self, query: str)-> NoReturn:
        if not query:
            return
        connection = psycopg2.connect(f"dbname={self.db_name} user={self.user}")
        with connection.cursor() as cursor:
            cursor.execute(query)
            cursor.closer()
        connection.commit()
        connection.close()
        
    def saveTextBatch(self, articles: Dict[str, Any])-> NoReturn:
        if not articles:
            return
        connection = psycopg2.connect(f"dbname={self.db_name} user={self.user}")
        with connection.cursor() as cursor:
            iter_articles = ({'text': article['title']} for article in articles)
            psycopg2.extras.execute_batch(cursor, """
            INSERT INTO documents_raw(text) VALUES (%(text)s);
            """, iter_articles)
            cursor.close()
        connection.commit()

    def getTextDF(self) -> pd.DataFrame:
        connection = psycopg2.connect(f"dbname={self.db_name} user={self.user}")
        cur = connection.cursor()
        cur.execute("SELECT * FROM documents_raw;")
        df = pd.DataFrame(cur.fetchall(), columns=['id','text'])
        df.set_index(['id'], inplace=True)
        cur.close()
        connection.close()
        return df

Let's test it:
```Python
c = Connector('postgres', 'aaronsapa')
df = c.getTextDF()
```

In [34]:
df.head()

Unnamed: 0_level_0,text
id,Unnamed: 1_level_1
101,What We Know About Allergic Reactions to the C...
102,All Adults Over 65 May Soon Be Eligible for th...
103,FDA tells US health providers not to modify CO...
104,Resolve to Watch These 8 Shows in 2021
105,Track COVID Vaccinations With These Websites


---

## NLTK 
*[Source](https://en.wikipedia.org/wiki/Natural_Language_Toolkit)*

NLTK is intended to support research and teaching in NLP or closely related areas, including empirical linguistics, cognitive science, artificial intelligence, information retrieval, and machine learning.[7] NLTK has been used successfully as a teaching tool, as an individual study tool, and as a platform for prototyping and building research systems. There are 32 universities in the US and 25 countries using NLTK in their courses. NLTK supports classification, tokenization, stemming, tagging, parsing, and semantic reasoning functionalities.[8]

```Python
!pip3 install nltk
nltk.download('stopwords')
nltk.download('wordnet')
```


In [31]:
import nltk
from nltk.tokenize import word_tokenize
from nltk.corpus import stopwords
from nltk.stem.porter import PorterStemmer
from nltk.stem import WordNetLemmatizer
import re

def cleanText():
    en_stop_words = stopwords.words('english')
    wordnet_lemmatizer = WordNetLemmatizer()
    exclusion_list = '|'.join(['[^a-zA-Z]','rt', 'http', 'co', 'RT'])
    def clean(text: str) -> str:
        words = re.sub(exclusion_list,  ' ', text).lower().split()
        words = [word for word in words if word not in en_stop_words]
        words = [wordnet_lemmatizer.lemmatize(word) for word in words]
        # list comprehension performs better in most cases than built-in func
        # clean_words= filter(lambda word: word not in en_stop_words, words)
        # clean_words = map(lambda word: wordnet_lemmatizer.lemmatize(word), clean_words)
        return ' '.join(words)
    return clean
    

Cleaning text:
```Python
clean_text_func = cleanText()
df['clean_text'] = df.apply(lambda row: clean_text_func(row['text']), axis=1)
```

In [33]:
df.head()

Unnamed: 0_level_0,text,clean_text
id,Unnamed: 1_level_1,Unnamed: 2_level_1
101,What We Know About Allergic Reactions to the C...,know allergic reaction covid vaccine
102,All Adults Over 65 May Soon Be Eligible for th...,adult may soon eligible covid vaccine
103,FDA tells US health providers not to modify CO...,fda tell u health provider modify covid vaccin...
104,Resolve to Watch These 8 Shows in 2021,resolve watch show
105,Track COVID Vaccinations With These Websites,track covid vaccination website


In [39]:
from textblob import TextBlob

def sentiment(text: str) -> int:
    analysis = TextBlob(text)
    polarity = analysis.sentiment.polarity
    if polarity > 0:
        return 1
    elif polarity == 0:
        return 0
    else:
        return -1

In [37]:
df['sentiment'] = df.apply(lambda row : sentiment(row['clean_text']), axis=1)

In [38]:
df.head()

Unnamed: 0_level_0,text,clean_text,sentiment
id,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1
101,What We Know About Allergic Reactions to the C...,know allergic reaction covid vaccine,0
102,All Adults Over 65 May Soon Be Eligible for th...,adult may soon eligible covid vaccine,1
103,FDA tells US health providers not to modify CO...,fda tell u health provider modify covid vaccin...,0
104,Resolve to Watch These 8 Shows in 2021,resolve watch show,0
105,Track COVID Vaccinations With These Websites,track covid vaccination website,0


# TBD

Install 
`!pip install wordcloud`

```Python
import matplotlib.pyplot as plt
from wordcloud import WordCloud
def wordCloud(words: pd.Series) -> NoReturn:
    plt.subplots(figsize = (12,10))
    wordcloud = WordCloud(background_color = 'white', width = 1000, height = 800).generate(' '.join(words))
    plt.imshow(wordcloud)
    plt.axis('off')
    plt.show()
    
wordCloud(df[df['sentiment'] == 1]['text_clean'])
```