In [1]:
import dask
import dask.dataframe as dd
import dask.array as da
from dask.distributed import Client, LocalCluster
from dask import delayed

import pandas as pd
import numpy as np

from tqdm.notebook import tqdm 

import gensim
from gensim.models.ldamulticore import LdaMulticore as LdaModel
from gensim.corpora import Dictionary, MmCorpus
from gensim.test.utils import datapath

import nltk
from nltk.corpus import stopwords 
from nltk.stem.wordnet import WordNetLemmatizer
nltk.download('stopwords')
nltk.download('wordnet')

import string
import re
import multiprocessing as mp

[nltk_data] Downloading package stopwords to
[nltk_data]     /usr4/cs505/bbadnani/nltk_data...
[nltk_data]   Package stopwords is already up-to-date!
[nltk_data] Downloading package wordnet to
[nltk_data]     /usr4/cs505/bbadnani/nltk_data...
[nltk_data]   Package wordnet is already up-to-date!


In [2]:
client = Client(n_workers=5, threads_per_worker=4)
client

0,1
Client  Scheduler: tcp://127.0.0.1:44345  Dashboard: http://127.0.0.1:8787/status,Cluster  Workers: 5  Cores: 20  Memory: 187.37 GiB


In [3]:
# stop loss words 
stop = set(stopwords.words('english'))

# punctuation 
exclude = set(string.punctuation) 

# lemmatization
lemma = WordNetLemmatizer() 

In [4]:
import multiprocessing as mp

mp.cpu_count()

32

In [5]:
df = dd.read_csv('../ds-naacp-media-bias/TBG_unique_raw.csv', 
                 usecols = ['position_section','hl1', 'hl2', 'lede', 'body'], 
                 dtype = {'position_section': str,'hl1': object, 
                          'hl2': object, 'lede': str, 'body': str})
df = df.map_partitions(lambda x: x.fillna(''))
df['content'] = df.map_partitions(lambda x: x[['hl1', 'hl2', 'lede', 'body']].agg(' '.join, axis=1))
df = df.drop(columns = ['hl1', 'hl2', 'lede', 'body'])

In [6]:
#remove urls
re1 = r'''(?i)\b((?:https?://|www\d{0,3}[.]|[a-z0-9.\-]+[.][a-z]{2,4}/)(?:[^\s()<>]+|\(([^\s()<>]+|(\([^\s()<>]+\)))*\))+(?:\(([^\s()<>]+|(\([^\s()<>]+\)))*\)|[^\s`!()\[\]{};:'".,<>?«»“”‘’]))'''
#twitter handles
re2 = '\S*@\S*\s?'
# remove digits
re3 = "\S*\d\S*"
# remove non-ascii characters
re4 = "[^a-zA-Z' ]+"

parser = re.compile("(%s|%s|%s|%s)" % (re1, re2, re3, re4))

def preprocess_text(string_passed):
    return " ".join(y for word in re.sub(parser, ' ', string_passed).split() 
                    if (y := lemma.lemmatize(word.lower())) not in stop and len(y)> 2) # lemmatize and threshold for words > length of 2

def preprocess_text_helper(series):
    return series.map(lambda x: preprocess_text(x))


  parser = re.compile("(%s|%s|%s|%s)" % (re1, re2, re3, re4))


In [7]:
test = df.head()

In [10]:
preprocess_text(test.content[0])

"team air canada's pot issue calgary alberta bruin rely plant based diet nutritional need bruce cassidy want pain i'm bruin coach said using marijuana treat ache ailment come nhl season fact dictate help wednesday canada became largest country world legalize recreational cannabis biggest story nation plastered newspaper social medium body legal nhl city seven canada three california plus boston denver washington pot still federally illegal united state associated press reported nhl nhl player association plan change joint drug testing policy player punished positive marijuana result nfl nba suspend player multiple pot infraction mlb issue fine marijuana banned world anti doping agency among olympic athlete departing canadian trip run calgary wednesday edmonton thursday vancouver saturday ottawa tuesday bruin player given memo nhl educate cannabis legalization safety flame player meeting tuesday teaching point fit duty regardless substance know rule transporting cannabis across country 

In [100]:
lemma.lemmatize('words') # Use it once first, to "unlazify" wordnet.
preprocess = df.content.map_partitions(preprocess_text_helper, 
                                         meta=pd.Series([], dtype=str, name='content'))

#run this cell to unlazify wordnet

In [86]:
from dask.distributed import progress

result = preprocess.persist()
progress(result)

VBox()

In [87]:
result = preprocess.to_frame().to_parquet(
    "pre-processed-persist", engine="pyarrow"
)

#write it back to the disk in preprcoessed format

In [90]:
df = dd.read_parquet(
    "pre-processed-persist",
    engine="pyarrow")

#read the preprocessed data back in

In [97]:
df.content.partitions[0].compute()

#visualize the data

0        Team air Canada's pot issue calgary alberta br...
1        Treasury worker charged leak washington senior...
2        Funny Women luncheon raise record Rosie's Plac...
3        Turkish official say audio reveals torture ist...
4        Journal pull stem cell researcher's paper Conc...
                               ...                        
13335    CORRECTION correction editor's error photo che...
13336    Police defend probe teacher police standing be...
13337    Forbes sidestep tradition unorthodox campaign ...
13338    CORRECTION correction reporting error story we...
13339    Managers must decide today manager corp offere...
Name: content, Length: 13340, dtype: object

In [18]:
df.shape[0].compute()

#get the amount of articles

1621013

In [19]:
"""dictionary = Dictionary(line[1].content.split() for line in tqdm(df.iterrows(), total = 1621013))"""

#uncomment these the first time you run it

  0%|          | 0/1621013 [00:00<?, ?it/s]

In [20]:
"""dictionary.save('dictionary')"""
#uncomment these the first time you run it

In [7]:
dictionary = Dictionary.load('dictionary')

In [8]:
len(dictionary)

2000715

In [28]:
dictionary.filter_extremes(no_below=10)
#dont keep any words that occur less than 10 times
#look up gensim's filter_extremes documentation - default parameter keeps 100000 in the dictionary

In [29]:
len(dictionary)

100000

In [18]:
words = set(k for k, _ in dictionary.token2id.items())
#words is a set of all the words in the dictionary

In [19]:
words

{'outlasvegasing',
 'blodry',
 'wwwseifaorgmonthlyshowshtml',
 'shigenobu',
 'cycleoriented',
 'icemasters',
 'atmeac',
 'wwwbostoncommcas',
 'gospelfriendly',
 'plazacom',
 'liwill',
 'demartine',
 'chapelgate',
 'torrancenorth',
 'duparc',
 'skepticisim',
 'mamat',
 'englishteacher',
 'loungefrequent',
 'wwwfloridacanoecom',
 'kral',
 'kjernisted',
 'brownpapered',
 'greenechandos',
 'mocean',
 'contemporaryskrtskrt',
 'madrugada',
 'mchoice',
 'terra',
 'headrillaz',
 'incredibletheyll',
 'zelinda',
 'dsheriff',
 'gormley',
 'insurancecompanies',
 'freebiesquasibribes',
 'democratwas',
 'shieldwhat',
 'bluescolored',
 'airchamber',
 'immigrationthe',
 'baabridgeport',
 'gudiwada',
 'winmaking',
 'pubrecordscom',
 'kavrakos',
 'thinnestskinned',
 'wwwallasiafoodexpocom',
 'doharriet',
 'squarcia',
 'brooksson',
 'bonacca',
 'oftenbut',
 'middelesex',
 'luijkenstraat',
 'railwalk',
 'wwwperiodcraftsmencom',
 'nwide',
 'hnic',
 'mcishirleys',
 'impending',
 'povertyplagued',
 'toldyous

In [11]:
dictionary.doc2bow('eject')
#look into this later

TypeError: doc2bow expects an array of unicode tokens on input, not a single string

In [88]:
class Corpus:
    def __init__(self, n_gram = 'Unigram'):
        self.df = df = dd.read_parquet("pre-processed-persist",engine="pyarrow")
        self.words = set(k for k, _ in dictionary.token2id.items())
        
    def __iter__(self):
        for line in self.df.iterrows(): 
            # assume there's one document per line, tokens separated by whitespace
            yield dictionary.doc2bow(word for word in line[1].content.split() if word in self.words) #like return but only one at a time

In [89]:
corpus = Corpus()

In [90]:
MmCorpus.serialize('corpus.mm', tqdm(corpus, total = 1621013) )
#serialize the corpus to disk in specified format so that LDA can read it one at a time
#can take a minute or two to run

  0%|          | 0/1621013 [00:00<?, ?it/s]

In [None]:
corpus = MmCorpus(datapath('corpus.mm'))

In [39]:
lda_model = LdaModel(corpus = tqdm(corpus, total = 1621013), id2word=dictionary, 
                     num_topics=30, eval_every=5, per_word_topics=True, chunksize = 100, passes=10, 
                     workers=4)

  0%|          | 0/1621013 [00:00<?, ?it/s]

KeyboardInterrupt: 

In [10]:
lda_model

<gensim.models.ldamodel.LdaModel at 0x2b92b75e9df0>

In [14]:
for idx, topic in lda_model.print_topics(-1):
    print("Topic: {} \nWords: {}".format(idx, topic))
    print("\n")

Topic: 0 
Words: 0.000*"report" + 0.000*"treasury" + 0.000*"edward" + 0.000*"suspicious" + 0.000*"story" + 0.000*"leak" + 0.000*"agent" + 0.000*"news" + 0.000*"charged" + 0.000*"activity"


Topic: 1 
Words: 0.000*"report" + 0.000*"treasury" + 0.000*"financial" + 0.000*"activity" + 0.000*"court" + 0.000*"suspicious" + 0.000*"charged" + 0.000*"edward" + 0.000*"story" + 0.000*"senior"


Topic: 2 
Words: 0.000*"treasury" + 0.000*"senior" + 0.000*"report" + 0.000*"court" + 0.000*"activity" + 0.000*"suspicious" + 0.000*"financial" + 0.000*"story" + 0.000*"edward" + 0.000*"charged"


Topic: 3 
Words: 0.000*"report" + 0.000*"treasury" + 0.000*"court" + 0.000*"edward" + 0.000*"senior" + 0.000*"financial" + 0.000*"charged" + 0.000*"trump" + 0.000*"suspicious" + 0.000*"agent"


Topic: 4 
Words: 0.000*"treasury" + 0.000*"report" + 0.000*"edward" + 0.000*"story" + 0.000*"financial" + 0.000*"charged" + 0.000*"suspicious" + 0.000*"senior" + 0.000*"filed" + 0.000*"court"


Topic: 5 
Words: 0.000*"said

In [None]:
# these cells are just badman attempting shit

id2word = Dictionary()
id2word.filter_extremes(no_below=15, no_above=0.4, keep_n=80000)

In [None]:
lda_model = LdaModel(id2word=id2word, 
                     num_topics=7, alpha='auto', eval_every=5, per_word_topics=True, chunksize = 100, passes=10)

In [38]:
test = dask.compute(*result.partitions[1])