This file countains all the relevant functions to take a body of tweets and transfer them all in their vectorized form into a Mongo Database

In [33]:
from sklearn.feature_extraction.text import CountVectorizer
from pymongo import MongoClient
import numpy as np
from operator import itemgetter

In [2]:
#Mongo/Insertion/Retrieval helper functions
def matrixToDict(vocab,asList):
    tempDict={}
    for i in vocab.keys():
        innerDict={}
        index1 = int(vocab.get(i))
        for j in vocab.keys():
            index2=int(vocab.get(j))
            innerDict[j]=asList[index1][index2]
        tempDict[i]=innerDict
    return tempDict

#retrieveWordsIDs gets current words in the DB and snags their IDs as well
def retrieveWordsIDs(db,collectionName):
    wordDict={}
    cursor = db[collectionName].find({},{'word':1})
    for i in cursor:
        wordDict[i['word']]=i['_id']
    return wordDict

def createMatrix(docs):
    count_model = CountVectorizer(ngram_range=(1,1),stop_words='english') # default unigram model
    X = count_model.fit_transform(docs)
    Xc = (X.T * X) # this is co-occurrence matrix in sparse csr format
    Xc.setdiag(0) #want to fill same word cooccurence to 0
    dense = Xc.todense()
    vocab = count_model.vocabulary_
    return dense,vocab

def insertUpdate(insertList,db,collectionName):
    wordidDict = retrieveWordsIDs(db,collectionName)
    for i in insertList:
        if i['word'] in wordidDict.keys():
            cursor = db[collectionName].find({'word':i['word']})
            listCursor = list(cursor)
            if len(listCursor)!=1:
                return None #may as well give up because this should never happen
            currentDict = listCursor[0]
            currentCounts=currentDict['counts']
            currentKeys = list(currentCounts.keys())
            #updateKeys = list(set(currentKeys)-set(i['counts'].keys()))
            #i think updateKeys not needed
            for key in list(set(currentKeys) & set(i['counts'].keys())):
                #looping over intersection of keys
                currentCounts[key]+=i['counts'][key]
            newKeys = list(set(i['counts'].keys())-set(currentKeys))
            #taking advantage of set subtraction above
            for j in newKeys:
                currentCounts[j]=i['counts'][j]
            db[collectionName].find_one_and_update({'_id':currentDict['_id']},{"$set": {'counts':currentCounts}})
        elif i['word'] not in wordidDict.keys():
            #just insert
            insertDict={}
            insertDict['word']=i['word']
            insertDict['counts']=i['counts']
            db[collectionName].insert(insertDict)
    return "Maybe it did it?"
    #TO-DO; make better return value lol
    
def findTopN(word,db,collectionName,n=3):
    cursor = db[collectionName].find({'word':word})
    listCursor = list(cursor)
    if len(listCursor)!=1:
        return None #may as well give up
    counts = listCursor[0]['counts']
    returnDict = dict(sorted(counts.items(), key = itemgetter(1), reverse = True)[:n])
    return list(returnDict.keys())

def fullProcess(doc,db,collName):
    dense,vocab = createMatrix(doc)
    matrixList = dense.tolist()
    wordDict = matrixToDict(vocab,matrixList)
    wordList = []
    for i in wordDict:
        wordList.append({'word':i,'counts':wordDict[i]})
    insertUpdate(wordList,db,collName)
    return "good job diego!"

In [3]:
#below shamelessly stolen from lecture
from nltk.stem.snowball import SnowballStemmer
from pyspark.sql.functions import udf
import string

#Create the function that performs the text conversion
sn = SnowballStemmer('english')

def clean_text(text, sn=sn):
    #special case of getting rid of RT info
    rt_re = re.compile('^RT @[a-zA-Z0-9]+:')
    text = rt_re.sub(' ',text)
    #removing other @ instances
    ats_re = re.compile('^@[a-zA-Z0-9]+')
    text = ats_re.sub(' ',text)
    #remove emojis
    text = text.encode('ascii', 'ignore').decode('ascii')
    #removing punctuation
    punc_re = re.compile('[%s]' % re.escape(string.punctuation + '£'))
    text = punc_re.sub(' ', ' '+text.lower()+' ') # Pad with spaces for easier stopword removal
    # Remove numbers
    num_re = re.compile('(\\d+)')
    text = num_re.sub(' ', text)
    # Remove alphanumerical words
    alpha_num_re = re.compile("^[a-z0-9_.]+$")
    text = alpha_num_re.sub(' ', text)
    # Stemming
    text = sn.stem(text)
    # Regex for multiple spaces
    spaces_re = re.compile('\s+')
    text = spaces_re.sub(' ', text.strip())

    return text

clean_text = udf(clean_text)

In [35]:
import pyspark
import string
import re
#no real reason to do this in spark in particular

In [36]:
spark = pyspark.sql.SparkSession.builder.getOrCreate()

In [29]:
! head -n 4 data.csv

handle,is_rt,date,tweet,qt,rt
fredjcollins,True,2021-05-27 22:32:23,RT @Ordinary1World: A commission to investigate the attack on the capital should already be in place  but thanks to republican senators it…,,A commission to investigate the attack on the capital should already be in place  but thanks to republican senators it will never happen.  All of those fuckers need to be voted out. Every single one of them.
Cat0524,True,2021-05-27 22:32:23,RT @scotshelagh: @FreeThinker2030 @Tam__Jardine @BBCScotlandNews @STVNews @BBCNews @itvnews Interesting that they weren’t concerned about U…,,@FreeThinker2030 @Tam__Jardine @BBCScotlandNews @STVNews @BBCNews @itvnews Interesting that they weren’t concerned about U.K. leaving the EU. They didn’t poke their nose into that referendum.  Anyway us Scots are no daft.  Gordon Brown is yesterday’s man and we can see right through him.   Prince William is being used as a puppet of the state.
VindicatedHobo,False,2021-05-27 22:32:23,@DylanTop5 @Sereng

In [37]:
#reading from file just for simplicities sake
data = spark.read.csv('data.csv',
                     sep=',',
                     header=True,
                     inferSchema=True)
data.show(3)

+--------------+-----+-------------------+--------------------+----+--------------------+
|        handle|is_rt|               date|               tweet|  qt|                  rt|
+--------------+-----+-------------------+--------------------+----+--------------------+
|  fredjcollins| true|2021-05-27 22:32:23|RT @Ordinary1Worl...|null|A commission to i...|
|       Cat0524| true|2021-05-27 22:32:23|RT @scotshelagh: ...|null|@FreeThinker2030 ...|
|VindicatedHobo|false|2021-05-27 22:32:23|@DylanTop5 @Seren...|null|                null|
+--------------+-----+-------------------+--------------------+----+--------------------+
only showing top 3 rows



In [38]:
data.registerTempTable('data')
nonRTs = spark.sql(r"""SELECT tweet
                            FROM data
                            WHERE is_rt='false'""")

In [39]:
nonRTs.show(3)

+--------------------+
|               tweet|
+--------------------+
|@DylanTop5 @Seren...|
|@CNN IN THE BEGIN...|
|This stuff with B...|
+--------------------+
only showing top 3 rows



In [40]:
#text pre-processing
nonRTs = nonRTs.withColumn('cleanTweet',clean_text(nonRTs['tweet']))

In [41]:
nonRTs.show(5)

+--------------------+--------------------+
|               tweet|          cleanTweet|
+--------------------+--------------------+
|@DylanTop5 @Seren...|serengetisover it...|
|@CNN IN THE BEGIN...|in the beginning ...|
|This stuff with B...|this stuff with b...|
|@janrobinjackson ...|cherylej aoc righ...|
|These words you s...|these words you s...|
+--------------------+--------------------+
only showing top 5 rows



In [42]:
fullList = [i.cleanTweet for i in nonRTs.select('cleanTweet').collect()]

In [43]:
len(fullList)

1812

In [None]:
client = MongoClient()
db= client.testing
collName = 'processTest'

In [44]:
fullProcess(fullList,db,collName)
#fullProcess does the CountVectorizing as well as insertion and updating in Mongo
#return value is just for encouragement

  db[collectionName].insert(insertDict)


'good job diego!'

Run time on that was a little less than 2 minutes- not ideal, frankly!
Generally a better idea to throw in chunks of maybe 500 or so, which runs quite quick in comparison