In [17]:
import dask.dataframe as dd
import dask.array as da
import dask
from dask.distributed import Client, TimeoutError, LocalCluster, progress
from dask.diagnostics import ProgressBar
import pandas as pd
import hvplot.dask
import hvplot.pandas
import re
from textblob import TextBlob
from langdetect import detect
from scipy.stats import f_oneway, ttest_ind
from bokeh.models.formatters import DatetimeTickFormatter

#code found on stack exchange
#makes a new client if one is not running, runs 6 workers
try:
    client = Client('tcp://localhost:8787', timeout='5s')
except OSError or TimeoutError:
    cluster = LocalCluster(scheduler_port=8787, n_workers=6, ip='localhost')
    client = Client(cluster)


client


0,1
Connection method: Direct,
Dashboard: http://localhost:8787/status,

0,1
Comm: tcp://127.0.0.1:8787,Workers: 4
Dashboard: http://127.0.0.1:8787/status,Total threads: 8
Started: 2 minutes ago,Total memory: 16.00 GiB

0,1
Comm: tcp://127.0.0.1:51778,Total threads: 2
Dashboard: http://127.0.0.1:51779/status,Memory: 4.00 GiB
Nanny: tcp://127.0.0.1:51760,
Local directory: /var/folders/3t/s31r6vgj44n_g5t4csrvklwh0000gn/T/dask-worker-space/worker-dwyekzb3,Local directory: /var/folders/3t/s31r6vgj44n_g5t4csrvklwh0000gn/T/dask-worker-space/worker-dwyekzb3
Tasks executing: 0,Tasks in memory: 0
Tasks ready: 0,Tasks in flight: 0
CPU usage: 2.4%,Last seen: Just now
Memory usage: 117.66 MiB,Spilled bytes: 0 B
Read bytes: 14.00 kiB,Write bytes: 14.00 kiB

0,1
Comm: tcp://127.0.0.1:51770,Total threads: 2
Dashboard: http://127.0.0.1:51772/status,Memory: 4.00 GiB
Nanny: tcp://127.0.0.1:51758,
Local directory: /var/folders/3t/s31r6vgj44n_g5t4csrvklwh0000gn/T/dask-worker-space/worker-ct94m9he,Local directory: /var/folders/3t/s31r6vgj44n_g5t4csrvklwh0000gn/T/dask-worker-space/worker-ct94m9he
Tasks executing: 0,Tasks in memory: 0
Tasks ready: 0,Tasks in flight: 0
CPU usage: 2.6%,Last seen: Just now
Memory usage: 115.75 MiB,Spilled bytes: 0 B
Read bytes: 12.00 kiB,Write bytes: 8.00 kiB

0,1
Comm: tcp://127.0.0.1:51771,Total threads: 2
Dashboard: http://127.0.0.1:51776/status,Memory: 4.00 GiB
Nanny: tcp://127.0.0.1:51759,
Local directory: /var/folders/3t/s31r6vgj44n_g5t4csrvklwh0000gn/T/dask-worker-space/worker-c7w9i7fk,Local directory: /var/folders/3t/s31r6vgj44n_g5t4csrvklwh0000gn/T/dask-worker-space/worker-c7w9i7fk
Tasks executing: 0,Tasks in memory: 0
Tasks ready: 0,Tasks in flight: 0
CPU usage: 2.5%,Last seen: Just now
Memory usage: 115.64 MiB,Spilled bytes: 0 B
Read bytes: 10.06 kiB,Write bytes: 6.03 kiB

0,1
Comm: tcp://127.0.0.1:51769,Total threads: 2
Dashboard: http://127.0.0.1:51773/status,Memory: 4.00 GiB
Nanny: tcp://127.0.0.1:51757,
Local directory: /var/folders/3t/s31r6vgj44n_g5t4csrvklwh0000gn/T/dask-worker-space/worker-_7nnmekv,Local directory: /var/folders/3t/s31r6vgj44n_g5t4csrvklwh0000gn/T/dask-worker-space/worker-_7nnmekv
Tasks executing: 0,Tasks in memory: 0
Tasks ready: 0,Tasks in flight: 0
CPU usage: 2.7%,Last seen: Just now
Memory usage: 232.00 MiB,Spilled bytes: 0 B
Read bytes: 10.00 kiB,Write bytes: 6.00 kiB


In [18]:
# reference variables
keyWords = [
    'mental health', 
    'depression',
    'depressed', 
    'anxiety',
    'anxious', 
    'ADHD', 
    'attention deficit',
    'OCD',
    'obsessive-compulsive disorder',
    'PTSD',
    'posttraumatic stress disorder',
    'trauma',
    'bipolar',
    'autism',
    'schizophrenia',
    'eating disorder',
    'anorexia',
    'bulimia',
    'psychosis',
    'psychologist',
    'psychotherapist',
    'psychiatrist',
    'psychotherapy',
    'depressant'
]

keyCats = {
    'gen' : ['mental health', 'trauma', 'psychosis', 'psychologist', 'psychotherapist', 'psychiatrist', 'psychotherapy'],
    'dep' : ['depression', 'depressed', 'depressant'],
    'anx' : ['anxiety', 'anxious'],
    'ocd' : ['OCD', 'obsessive-compulsive disorder'],
    'adhd' : ['ADHD', 'attention deficit'],
    'ptsd' : ['PTSD', 'posttraumatic stress disorder'],
    'bip' : ['bipolar'],
    'asd' : ['autism'],
    'schiz' : ['schizophrenia'],
    'ed' : ['eating disorder', 'anorexia', 'bulimia']
}

abbrCats = {
    'gen' : 'General',
    'dep' : 'Depression',
    'anx' : 'Anxiety',
    'ocd' : 'OCD',
    'adhd' : 'ADHD',
    'ptsd' : 'PTSD',
    'bip' : 'Bipolar',
    'asd' : 'Autism',
    'schiz' : 'Schizophrenia',
    'ed' : 'Eating Disorder'
}

In [19]:
#contains counts of tweets for every hour
countdf = dd.read_csv(urlpath='mental_health_tweet_count.csv', sep='\t', lineterminator='\n')

#try reading in the preprocessed tweets
# if the csv does not exist, create it
try:
    # read the preprocessed csv
    tweetdf = dd.read_parquet(path='mental_health_tweets_preprocessed_en.parquet')
except:
    #contains tweets (and time of the tweet) that contain mental health key word
    #polled every 2 minutes
    tweetdf = dd.read_csv(urlpath='mental_health_tweets.csv', sep='\t', lineterminator='\n')
    
    #pre process
    # preprocess instructions found at https://youtu.be/ujId4ipkBio
    def preprocess(text):
        # remove mentions
        text = re.sub('@[a-zA-Z0-9]+', '', text)
        
        #  remove hashtag symbol
        text = re.sub('#', '', text)
        
        # remove retweet identifier
        text = re.sub('RT[\s]+', '', text)
        
        # remove links
        text = re.sub('https?:\/\/\S+', '', text)


        text = re.sub('[^a-zA-Z\\s]', '', text)

        text = text.strip()

        return text

    tweetdf['text'] = tweetdf['tweet'].map(preprocess)

    # remove any tweets that are either empty, or not in english
    tweetdf = tweetdf.loc[tweetdf['text'] != '']
    tweetdf['language'] = tweetdf['text'].map(lambda x: detect(x))
    tweetdf = tweetdf.loc[tweetdf['language'] == 'en']

    # drop original tweet and language columns
    tweetdf = tweetdf.drop(columns=['tweet', 'language'])

    #write pre processed text to parquet (dask recommends parquet)
    dd.to_parquet(tweetdf, path='mental_health_tweets_preprocessed_en.parquet')
    
    

In [20]:

#specify column types
tweetdf['time'] = dd.to_datetime(tweetdf['time'])
countdf['timestamp'] = dd.to_datetime(countdf['timestamp'])

#separate the components of the time for grouping
tweetdf['year'] = tweetdf['time'].dt.year
tweetdf['month'] = tweetdf['time'].dt.month
tweetdf['day'] = tweetdf['time'].dt.day
tweetdf['hour'] = tweetdf['time'].dt.hour
tweetdf['minute'] = tweetdf['time'].dt.minute

# only keep tweets from november
tweetdf = tweetdf.loc[tweetdf['month'] == 11]

countdf['year'] = countdf['timestamp'].dt.year
countdf['month'] = countdf['timestamp'].dt.month
countdf['day'] = countdf['timestamp'].dt.day
countdf['hour'] = countdf['timestamp'].dt.hour

In [21]:
#sort the dataframes based on ascening tme
countdf = countdf.sort_values(ascending=True, by=['month', 'day', 'hour'])
tweetdf = tweetdf.sort_values(ascending=True, by=['month', 'day', 'hour', 'minute'])

#make a display string for the x axis of the graph
countdf['ts_display'] = countdf['month'].astype('str') + '/' + countdf['day'].astype('str') + '\t' + countdf['hour'].astype('str') + ':00'


In [22]:


hvplot.save(countdf.hvplot(x='timestamp', y='count', title='Tweet Count', xlabel='datetime', ylabel='tweets').opts(xrotation=90, fontscale=0.75),\
    'figures/tweets-over-time.png')

In [23]:
#determine key words in tweet
def containsKey(text, key):
    return text.str.contains(key)

# create a new column for each key, with values of whether or not
# the tweet in that row contains that key
for i in keyWords:
    tweetdf[i] = tweetdf['text'].map_partitions(containsKey, i)


In [24]:
#count tweets each keyword appears in
#if a tweet contains multiple keywords, then they are added 
#to the result df as |word1|word2| etc.

def keysInTweet(df):
    keys = ''
    for k in keyWords:
        if df[k]:
            keys = keys + '|' + k
    keys = keys + '|'
    return keys

tweetdf['keys'] = tweetdf.apply(keysInTweet, axis=1, meta=('keys', 'object'))

# drop tweets with no keys
tweetdf = tweetdf.loc[tweetdf['keys'] != '|']

# convert the value counts to its own df
multiKeyCount = tweetdf['keys'].value_counts().to_frame()
# copy the key column (which actually holds counts), rename it and then drop the original
multiKeyCount['count'] = multiKeyCount['keys']
multiKeyCount = multiKeyCount.drop(labels='keys', axis=1)

multiKetCount = multiKeyCount.sort_values(by='count', ascending=False)
multiKeyCount = multiKeyCount.head(10)

In [25]:
hvplot.save(multiKeyCount.head(10).hvplot.bar(y='count', use_index=True, title='Top 10 Topic Combinations', xlabel='topic combinations', ylabel='tweets').opts(xrotation=90),\
    'figures/key-groups-count-bar.png')

In [26]:
# # create an empty dask dataframe, with 1 parition (since max rows is 24)
# keyCount = pd.DataFrame(columns=['key', 'count'])
# keyCount = dd.from_pandas(keyCount, npartitions=1)

# # given key k, find the number of times k appears in a tweet
# def countKeys(k):
#     return tweetdf[k].compute()

# # create a new row of the key count
# keyCount['count'] = keyCount['key'].map(countKeys, meta=('count', 'int64'))

# # sort rows by descending count
# keyCount = keyCount.sort_values(by='count', ascending=False)

# keyCount.persist()

In [27]:
# hvplot.save(keyCount.hvplot.bar(y='count', x='key').opts(fontscale=0.75, invert_axes=True),\
#     'figures/individual-key-count-bar.png')

In [28]:
# determine whether each tweet has keywords related to each category
def getCategories(row, keyList):
    for k in keyList:
        if row[k]:
            return True
        
    return False


# create a new column for each category
for c in keyCats.keys():
    tweetdf[c] = tweetdf.apply(getCategories, args=([keyCats[c]]), axis=1, meta=(c, 'boolean'))

# drop the original keyword columns
tweetdf = tweetdf.drop(columns=keyWords)


In [33]:
# TODO category count graph
catCount = pd.DataFrame(columns=['category', 'count'], data=catCounts)

newRows = []
for c in keyCats.keys():
    newRows.append(dask.delayed(pd.from_dict)({'category' : c, 'count' : dask.delayed(len)(tweetdf.loc[tweetdf['gen']])}))


catCount = dd.from_pandas(catCount, npartitions=1)


newRows = client.compute(*newRows)

TypeError: Truth of Delayed objects is not supported

In [None]:
# get subjectivity of each tweet [0, 1]
def compSubjectivity(text):
    return TextBlob(text).sentiment.subjectivity

# get polarity of tweet [-1, 1]
def compPolarity(text):
    return TextBlob(text).sentiment.polarity

# add subjectivity and polarity as new columns
tweetdf['subjectivity'] = tweetdf['text'].map(compSubjectivity)
tweetdf['polarity'] = tweetdf['text'].map(compPolarity)

# speeds up computation of mean and std of sub and pol for each cat
tweetdf.compute()
tweetdf = tweetdf.persist()

tweetdf

Unnamed: 0_level_0,time,text,year,month,day,hour,minute,keys,gen,dep,anx,ocd,adhd,ptsd,bip,asd,schiz,ed,subjectivity,polarity
npartitions=1,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1,Unnamed: 8_level_1,Unnamed: 9_level_1,Unnamed: 10_level_1,Unnamed: 11_level_1,Unnamed: 12_level_1,Unnamed: 13_level_1,Unnamed: 14_level_1,Unnamed: 15_level_1,Unnamed: 16_level_1,Unnamed: 17_level_1,Unnamed: 18_level_1,Unnamed: 19_level_1,Unnamed: 20_level_1
,datetime64[ns],object,int64,int64,int64,int64,int64,object,boolean,boolean,boolean,boolean,boolean,boolean,boolean,boolean,boolean,boolean,float64,float64
,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...


In [None]:
# can store results in a pandas dataframe since we know it will fit in memory
mlres = pd.DataFrame()

# for each category, make a series of sub and pol mean and std
# then add it to the output pandas df
for c in keyCats.keys():
    tmp = tweetdf.loc[tweetdf[c]]
    data = (tmp['subjectivity'].mean().compute(),\
            tmp['polarity'].mean().compute(),\
            tmp['subjectivity'].std().compute(),\
            tmp['polarity'].std().compute()
            )
    ser = pd.Series(data=data, index=['subjectivity mean', 'polarity mean', 'subjectivity std', 'polarity std'], name=c)
    mlres = pd.concat([mlres, ser], axis=1)

mlres = mlres.rename(abbrCats)


mlres

Unnamed: 0,gen,dep,anx,ocd,adhd,ptsd,bip,asd,schiz,ed
subjectivity mean,0.39752,0.419006,0.4454,0.399785,0.390705,0.394681,0.411633,0.3823,0.375183,0.403433
polarity mean,0.024636,0.058078,0.000861,0.041443,0.065732,0.046187,0.020745,0.052398,0.010546,0.025317
subjectivity std,0.222554,0.293004,0.342618,0.302956,0.282955,0.286201,0.303714,0.298593,0.295602,0.303563
polarity std,0.221259,0.283656,0.244954,0.259299,0.247543,0.250127,0.276415,0.251685,0.26133,0.277414


In [None]:
hvplot.save(mlres.drop(index=['subjectivity std', 'polarity std', 'polarity mean']).hvplot.bar(use_index=True, title='Subjectivity Means', xlabel='Topic', ylabel='Subjectivity', ylim=(0, 1)),\
    'figures/subjectivity-mean.png')

In [None]:
hvplot.save(mlres.drop(index=['subjectivity std', 'polarity std', 'subjectivity mean']).hvplot.bar(use_index=True, title='Polarity Means', xlabel='Topic', ylabel='Polarity', ylim=(-1, 1)),\
    'figures/polarity-mean.png')

In [None]:
# One way ANOVA on column
def owANOVA(col):
    catArrs = []
    for c in keyCats.keys():
        tmp = tweetdf.loc[tweetdf[c]]
        tmp = tmp[col]
        tmp = tmp.to_dask_array()
        catArrs.append(tmp)

    return f_oneway(*catArrs)

print('One Way ANOVA Results:')
print(f"Subjectivity: {owANOVA('subjectivity')}")
print(f"Polarity: {owANOVA('polarity')}")

One Way ANOVA Results:
Subjectivity: F_onewayResult(statistic=84.64774594100781, pvalue=1.0084386752365306e-157)
Polarity: F_onewayResult(statistic=122.01905191680397, pvalue=1.056804917650946e-229)


In [None]:
# test on each pair of categories
def tTest(col, a, b):
    a = tweetdf.loc[tweetdf[a]]
    a = a[col]
    a = a.to_dask_array()
    b = tweetdf.loc[tweetdf[b]]
    b = b[col]
    b = b.to_dask_array()
    return ttest_ind(a=a, b=b, equal_var=False)
    
def allTTests(col):
    out = []
    keys = list(keyCats.keys())
    for x in range(len(keys)):
        for y in range(x+1, len(keys)):
            out.append((keys[x], keys[y], tTest(col, keys[x], keys[y])))
    return out

subjT = allTTests('subjectivity')
polT = allTTests('polarity')

def statSigT(res, alpha):
    out = []
    for i in res:
        sig = i[2][1] < alpha
        isSig = (abbrCats[i[0]], abbrCats[i[1]], sig)
        out.append(isSig)
    return out

subjSig = statSigT(subjT, 0.05)
polSig = statSigT(polT, 0.05)

for i in range(len(subjSig)):
    print(f"Subjectivity: {subjSig[i]}")
    print(f"Polarity: {polSig[i]}")

Task exception was never retrieved
future: <Task finished name='Task-44197' coro=<Client._gather.<locals>.wait() done, defined at /Users/derek/opt/anaconda3/lib/python3.9/site-packages/distributed/client.py:2002> exception=AllExit()>
Traceback (most recent call last):
  File "/Users/derek/opt/anaconda3/lib/python3.9/site-packages/distributed/client.py", line 2011, in wait
    raise AllExit()
distributed.client.AllExit


Subjectivity: ('General', 'Depression', True)
Polarity: ('General', 'Depression', True)
Subjectivity: ('General', 'Anxiety', True)
Polarity: ('General', 'Anxiety', True)
Subjectivity: ('General', 'OCD', False)
Polarity: ('General', 'OCD', True)
Subjectivity: ('General', 'ADHD', False)
Polarity: ('General', 'ADHD', True)
Subjectivity: ('General', 'PTSD', False)
Polarity: ('General', 'PTSD', True)
Subjectivity: ('General', 'Bipolar', True)
Polarity: ('General', 'Bipolar', False)
Subjectivity: ('General', 'Autism', True)
Polarity: ('General', 'Autism', True)
Subjectivity: ('General', 'Schizophrenia', True)
Polarity: ('General', 'Schizophrenia', False)
Subjectivity: ('General', 'Eating Disorder', False)
Polarity: ('General', 'Eating Disorder', False)
Subjectivity: ('Depression', 'Anxiety', True)
Polarity: ('Depression', 'Anxiety', True)
Subjectivity: ('Depression', 'OCD', True)
Polarity: ('Depression', 'OCD', True)
Subjectivity: ('Depression', 'ADHD', True)
Polarity: ('Depression', 'ADHD',