### Distributed Text Preprocessing

This project aims conduct distributed data cleaning and wrangling processing on a huge dataset. This set of codes can be leveraged and scaled to any project that involves memory-intensive tasks. 

I crawled this dataset from scratch using the new Twitter API v2 via the twarc package. 

In [2]:
import dask.dataframe as dd
import dask.bag as db
import json
import regex as re

In [3]:
#read json file into a dask bag
df = db.read_text('*.jsonl').map(json.loads)

In [10]:
#define a function to extract columns we need, some are nested 

def flatten(record):
    return {
        'retweet_count': record['public_metrics']['retweet_count'],
        'reply_count': record['public_metrics']['reply_count'],
        'like_count': record['public_metrics']['like_count'],
        'quote_count': record['public_metrics']['quote_count'],
        'text': record['text'],
        'conversation_id': record['conversation_id'],
        'author_id': record['author_id'],
        'created_at': record['created_at'],
        'username': record['author']['username'],
        'followers_count': record['author']['public_metrics']['followers_count'],
        'following_count': record['author']['public_metrics']['following_count'],
        'lang': record['lang'],
        'text': record['text'],
        'id': record['id']
    }

In [11]:
# check the first 2 flattened data entries

df.map(flatten).take(2)

({'retweet_count': 11793,
  'reply_count': 0,
  'like_count': 0,
  'quote_count': 0,
  'text': 'RT @freya_cole: A 26yro man who was shot in the knee while protesting in Mandalay on Feb 20 has died. I have spoken directly to the volunte…',
  'conversation_id': '1364726793727401984',
  'author_id': '1357203775216377856',
  'created_at': '2021-02-24T23:59:57.000Z',
  'username': 'Wynnms83',
  'followers_count': 23,
  'following_count': 50,
  'lang': 'en',
  'id': '1364726793727401984'},
 {'retweet_count': 4,
  'reply_count': 0,
  'like_count': 0,
  'quote_count': 0,
  'text': "RT @glo87516996: Dear ASEAN, please stand with Myanmar people. Please don’t give any help on junta who violent and kill their citizens.Don'…",
  'conversation_id': '1364726788602142722',
  'author_id': '1357360747718856711',
  'created_at': '2021-02-24T23:59:56.000Z',
  'username': 'Sumyat01337195',
  'followers_count': 38,
  'following_count': 73,
  'lang': 'en',
  'id': '1364726788602142722'})

In [28]:
#save flattemed dask bag into dataframe
df2 = df.map(flatten).to_dataframe()

In [29]:
df2.dtypes

retweet_count       int64
reply_count         int64
like_count          int64
quote_count         int64
text               object
conversation_id    object
author_id          object
created_at         object
username           object
followers_count     int64
following_count     int64
lang               object
id                 object
dtype: object

In [30]:
def remove_noise(text):

    text = re.sub(r",", "", text)
    text = re.sub(r"\w+\d+", " numbers", text)
    text = re.sub(r'\d+','numbers', text)
    text = re.sub(r"\$", "dollar ", text)
    text = re.sub(r"\$+", "dollar ", text)
    text = re.sub(r"dollars", "dollar", text)
    text = re.sub(r"[^A-Za-z0-9^,!.\/'+-=]", " ", text)
    text = re.sub(r"!", " ", text)
    text = re.sub(r"what's", "what is ", text)
    text = re.sub(r"\'s", " ", text)
    text = re.sub(r"\'ve", " have ", text)
    text = re.sub(r"n't", " not ", text)
    text = re.sub(r"i'm", "i am ", text)
    text = re.sub(r":", "", text)
    text = re.sub(r" :", "", text)
    text = re.sub(r"\w+\-\w+", "", text)
    text = re.sub(r" -", "", text)
    text = re.sub(r" s ", "", text)
    text = re.sub(r" - ", "", text)
    text = re.sub(r"\'re", " are ", text)
    text = re.sub(r"\'d", " would ", text)
    text = re.sub(r"\'ll", " will ", text)
    text = re.sub(r"!", " ! ", text)
    text = re.sub(r"\/", " ", text)
    text = re.sub(r"\^", " ^ ", text)
    text = re.sub(r"\+", " + ", text)
    text = re.sub(r"\-", " - ", text)
    text = re.sub(r"\=", " = ", text)
    text = re.sub(r"'", " ", text)
    text = re.sub(r"(\d+)(k)", r"\g<1>000", text)
    text = re.sub(r":", " : ", text)
    text = re.sub(r" e g ", " eg ", text)
    text = re.sub(r" b g ", " bg ", text)
    text = re.sub(r" u s ", " american ", text)
    text = re.sub(r"\0s", "0", text)
    text = re.sub(r" 9 11 ", "911", text)
    text = re.sub(r"e - mail", "email", text)
    text = re.sub(r"j k", "jk", text)
    text = re.sub(r"\s{2,}", " ", text)
    text = re.sub(r"", "", text)
    return text

In [43]:
#remove stopwords
from nltk.corpus import stopwords
from nltk.tokenize import word_tokenize
stop_words = stopwords.words("english")
def remove_stopwords3(text):
    tokens = word_tokenize(text)
    filtered_words = [w for w in tokens if len(w) > 2 if not w in stop_words]
    
    return " ".join(filtered_words)

In [44]:
#combine functions above the one function
def clean_text(df2):

    df2["text"] = df2.text.map(lambda text:text.lower()).map(remove_noise).map(remove_stopwords3)
    
    return df2

In [45]:
df2 = df2.map_partitions(clean_text, meta=df2)

In [46]:
df2.head()

Unnamed: 0,retweet_count,reply_count,like_count,quote_count,text,conversation_id,author_id,created_at,username,followers_count,following_count,lang,id
0,11793,0,0,0,freya cole numbersyro man shot knee protesting...,1364726793727401984,1357203775216377856,2021-02-24T23:59:57.000Z,Wynnms83,23,50,en,1364726793727401984
1,4,0,0,0,numbers dear asean please stand myanmar people...,1364726788602142722,1357360747718856711,2021-02-24T23:59:56.000Z,Sumyat01337195,38,73,en,1364726788602142722
2,438,0,0,0,numbers accept army dictator appointee officia...,1364726784877490180,2546163824,2021-02-24T23:59:55.000Z,Nandarcheah,141,413,en,1364726784877490180
3,0,0,0,0,attempting prove strong expression authority g...,1364726777805869058,822335698069909504,2021-02-24T23:59:53.000Z,CasyThantSu,17,80,en,1364726777805869058
4,1968,0,0,0,dandaniemyanmar hear voice numberscoup whatsha...,1364726777294151682,1348546840917012480,2021-02-24T23:59:53.000Z,ThuThunw,206,328,en,1364726777294151682


In [None]:
# the final dataset can be saved as a csv file
df2.to_csv('df2.csv', single_file = True)