In [1]:
# Standard library.
import re
import os
import sys
import datetime
import sqlite3
from collections import OrderedDict

# External library.
import pandas as pd
import nltk
from nltk.tokenize import word_tokenize
from nltk import pos_tag
from nltk.corpus import stopwords
from nltk import FreqDist
from wordcloud import WordCloud
import json


# bioinforbot imports
sys.path.append(".")
from bioinfobot.utils.tweet_utils import tweet_clean
from bioinfobot.utils.collector import get_latest_tweet_data
from bioinfobot.utils.paths import TweetAnalysisPaths

nltk.download('punkt')
nltk.download('stopwords')



[nltk_data] Downloading package punkt to /home/axiomcura/nltk_data...
[nltk_data]   Package punkt is already up-to-date!
[nltk_data] Downloading package stopwords to
[nltk_data]     /home/axiomcura/nltk_data...
[nltk_data]   Package stopwords is already up-to-date!


True

In [2]:
# Tweet data utils 
def filter_tokens(words):
    nltk_stopwords = list(stopwords.words("english"))
    # TODO: Might this called as a seperate class or struct?
    more_stopwords = ['also', 'bad', 'cant', 'could', 'dont', 'day', 'great', 'get', 'good', 'hear',
                    'here', 'ive', 'im', 'like', 'latest', 'new', 'news', 'oh', 'people', 'see',
                    'today', 'top', 'the', 'twitter', 'thats', 'thanks', 'us', 'using', 'work',
                    'would','x']
    stop_words = nltk_stopwords + more_stopwords
    filtered_tokens= [word for word in words if word not in stop_words]
    return filtered_tokens


def extract_hash(tweets):
    """extracts all hastags in tweets

    Parameters
    ----------
    tweet : list
        list of all tweets

    Returns
    -------
    list
        list of hastags found in tweets

    Raises
    ------
    TypeError
        Raised if tweets is not a list
    """
    # type checking
    if not isinstance(tweets, list):
        raise TypeError('tweets must be a list')

    total_hash = []
    for tweet in tweets:
        hash_match = re.findall('#\w+', tweet.lower())
        if not hash_match:
            continue
        else:
            total_hash += hash_match

    total_hash = filter_hash(total_hash)
    return total_hash


def filter_hash(hashes):
    """Removes unwanted hashtags 

    Parameters
    ----------
    hashes : list
        list of all hastags
    """
    # type checking
    if not isinstance(hashes, list):
        raise TypeError('hashes must be a list')

    # TODO: Create a class that contains all stop words
    stop_hash = ['#twitter', '#tweeted']
    filtered_hash = [hashtag for hashtag in hashes if hashtag not in stop_hash]
    return filtered_hash


def create_wordcloud(name: str, words_freq: dict) -> None:
    """Generates word cloud image and saved int the images/ folder
    
    Parameters
    ----------
    name : str
        name 

    words_freq : dict
        dictionary containing unique words and its associated frequency
        as key value pairs
    
    Returns
    -------
        None
    """
    # instantiating paths
    paths = TweetAnalysisPaths()

    font_style = os.path.join(paths.font_path, "Actor-Regular.ttf")
    image_path = f"{paths.images_path}/{name}.png"
    
    
    # word cloud parameters
    wordcloud = WordCloud(font_path=font_style, width=1500, height=500,
                      max_words=500, stopwords=None, background_color='whitesmoke',
                      max_font_size=None, font_step=1, mode='RGB',
                      collocations=True, colormap=None, normalize_plurals=True).generate_from_frequencies(words_freq)
    
    wordcloud.to_file(image_path)


# -----
# common datatype utils
# -----
def flatten_list(nd_list):
    """flattens N-d list into 1D list

    Parameters
    ----------
    nd_list : list
        List containing nested lists

    Returns
    -------
    list
        1D list of elements
    
    Raise
    -----
    TypeError
        Raised if the input parameter is not a list
    """
    if not isinstance(nd_list, list):
        raise TypeError('nd_list must be a list')

    flat_list = []
    for elm in nd_list:
        if isinstance(elm, list):
            _flatlist = flatten_list(elm)
            flat_list += _flatlist
        else:
            flat_list.append(elm)

    return flat_list


def dict_value_sort_return_top(frequency_dict, maxreturn):
    """Sort the dictionary according to values and return a list of top n elements"""
    dictionary_sorted = OrderedDict(sorted(frequency_dict.items(), key=lambda t: t[1], reverse=True))
    # Store top values in an array
    # Change maxCount value to extract top n elements
    count = 0
    top_elements = []
    for k, v in dictionary_sorted.items():
        # Key and value pairs are stored in the form of a tuple in the topWords array
        # Another dictionary is not created here in order to preserve the sorted order
        top_elements.append((k, v))
        count += 1
        if count >= maxreturn:
            break
    return top_elements



In [3]:
#  main script 
data_path = get_latest_tweet_data()
tweet_df = pd.read_csv(data_path, sep="\t")

# Tweet data analysis 

# -- getting hash data
total_hash = extract_hash(tweet_df["Text"].tolist())

# -- cleaning tweets
tweet_df["Text"] = tweet_df["Text"].apply(tweet_clean)

# -- converting text into tokens (created a new column)
tweet_df["Tokens"] = tweet_df["Text"].apply(word_tokenize)
tweet_df["Tokens"] = tweet_df["Tokens"].apply(filter_tokens)

# -- measuring word frequency
filtered_list = flatten_list(tweet_df["Tokens"].tolist())


# -- frequency data 
freq = FreqDist(filtered_list)
n_unique_words = len(freq)
hash_freq = FreqDist(total_hash)
user_freq = FreqDist(tweet_df["ScreenName"].tolist())


# generate word cloud
date_info = tweet_df["Date"][0].split("-")
month, year = date_info[1], date_info[0]
year_month = f"{year}-{month}"

create_wordcloud(year_month, freq)

# generate json file
# Stopped here




In [24]:
year

'2022'