In [1]:
import numpy as np
import pandas as pd

In [2]:
from pyspark.sql import SparkSession
from pyspark.sql import types, functions as F
from pyspark.ml.linalg import Vectors, VectorUDT

spark = SparkSession \
    .builder \
    .appName("Messenger TF-IDF Analysis") \
    .getOrCreate()

In [3]:
import glob

IGNORES = "./ignore.txt"
DATA_ROOT = "./data/messages"
filepaths = list(pathname for pathname in glob.iglob(f'{DATA_ROOT}/**/message.json', recursive=True))

In [4]:
source_df = spark.read.json(filepaths, multiLine=True)
source_df.printSchema()

root
 |-- is_still_participant: boolean (nullable = true)
 |-- messages: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- audio_files: array (nullable = true)
 |    |    |    |-- element: struct (containsNull = true)
 |    |    |    |    |-- creation_timestamp: long (nullable = true)
 |    |    |    |    |-- uri: string (nullable = true)
 |    |    |-- call_duration: long (nullable = true)
 |    |    |-- content: string (nullable = true)
 |    |    |-- files: array (nullable = true)
 |    |    |    |-- element: struct (containsNull = true)
 |    |    |    |    |-- creation_timestamp: long (nullable = true)
 |    |    |    |    |-- uri: string (nullable = true)
 |    |    |-- gifs: array (nullable = true)
 |    |    |    |-- element: struct (containsNull = true)
 |    |    |    |    |-- uri: string (nullable = true)
 |    |    |-- missed: boolean (nullable = true)
 |    |    |-- payment_info: struct (nullable = true)
 |    |    |    |-- amount: long

# Data Sanitization

In [5]:
text_messages_df = source_df.select(F.explode('messages')) \
                                .select('col.*') \
                                .filter(F.col('content').isNotNull()) \
                                .filter(F.col('timestamp_ms').isNotNull()) \
                                .withColumn('is_outbound',
                                            F.when(F.col('sender_name') == 'Il Jae Lee', True) \
                                            .otherwise(False)) \
                                .withColumn('timestamp', F.col('timestamp_ms') / 1000) \
                                .withColumn('year', F.year(F.from_unixtime(F.col("timestamp")))) \
                                .filter(F.col('year') > 2010) \
                                .select('content', 'year', 'is_outbound')
text_messages_df.printSchema()

root
 |-- content: string (nullable = true)
 |-- year: integer (nullable = true)
 |-- is_outbound: boolean (nullable = false)



In [6]:
# text_messages_df.toPandas()[:10]

# Message Counts

In [7]:
message_counts = text_messages_df.groupBy(F.col('year'),
                                          F.col('is_outbound')) \
                                    .count() \
                                    .collect()

In [8]:
from itertools import groupby

message_counts_data = sorted(message_counts, key=lambda x: x.year * 10 + x.is_outbound)
message_counts_data = list(([k] + [d['count'] for d in g])
                           for k, g in 
                           groupby(message_counts_data, 
                                   key=lambda x: x.year))

In [9]:
def format_percent(x):
    return '{:.3f}%'.format(x * 100)

pd.DataFrame(message_counts_data,
             columns=['Year', 'Inbound', 'Outbound']) \
    .set_index('Year') \
    .assign(Total=lambda x: x['Inbound'] + x['Outbound']) \
    .assign(Out_scale=lambda x: x['Outbound'].divide(x['Total']),
            Tot_scale=lambda x: x['Total'].divide(x['Total'].min())) \
    .assign(Out_area=lambda x: x['Out_scale'].apply(np.sqrt),
            Tot_area=lambda x: x['Tot_scale'].apply(np.sqrt)) \
    .assign(Out_scale_per=lambda x: x['Out_scale'].apply(format_percent),
            Tot_scale_per=lambda x: x['Tot_scale'].apply(format_percent)) \
    .assign(Out_area_per=lambda x: x['Out_area'].apply(format_percent),
            Tot_area_per=lambda x: x['Tot_area'].apply(format_percent)) \

Unnamed: 0_level_0,Inbound,Outbound,Total,Out_scale,Tot_scale,Out_area,Tot_area,Out_scale_per,Tot_scale_per,Out_area_per,Tot_area_per
Year,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1,Unnamed: 8_level_1,Unnamed: 9_level_1,Unnamed: 10_level_1,Unnamed: 11_level_1
2011,9423,5894,15317,0.384801,1.580539,0.620323,1.257195,38.480%,158.054%,62.032%,125.719%
2012,6046,3645,9691,0.376122,1.0,0.613288,1.0,37.612%,100.000%,61.329%,100.000%
2013,21696,19442,41138,0.472604,4.24497,0.687462,2.060332,47.260%,424.497%,68.746%,206.033%
2014,20973,17103,38076,0.449181,3.929006,0.670209,1.982172,44.918%,392.901%,67.021%,198.217%
2015,15452,11643,27095,0.42971,2.795893,0.655523,1.672092,42.971%,279.589%,65.552%,167.209%
2016,16534,11808,28342,0.416626,2.924569,0.645465,1.710137,41.663%,292.457%,64.547%,171.014%
2017,43817,31724,75541,0.419957,7.794964,0.648041,2.791946,41.996%,779.496%,64.804%,279.195%
2018,57408,35752,93160,0.38377,9.613043,0.619492,3.100491,38.377%,961.304%,61.949%,310.049%


# TF-IDF

In [10]:
text_outbound_df = text_messages_df.filter(F.col('is_outbound')).drop('is_outbound')
text_inbound_df = text_messages_df.filter(~F.col('is_outbound')).drop('is_outbound')

In [11]:
text_all_year_df = text_messages_df.groupby("year") \
                                    .agg(F.concat_ws(' ', F.collect_list(F.col('content'))).alias('content'))
text_outbound_year_df = text_outbound_df.groupby("year") \
                                    .agg(F.concat_ws(' ', F.collect_list(F.col('content'))).alias('content'))
text_inbound_year_df = text_inbound_df.groupby("year") \
                                    .agg(F.concat_ws(' ', F.collect_list(F.col('content'))).alias('content'))

In [12]:
import nltk
from nltk.corpus import brown, stopwords
from nltk.tokenize import sent_tokenize, word_tokenize
from nltk.stem import WordNetLemmatizer

nltk.download('brown')
nltk.download('stopwords')
nltk.download('punkt')
nltk.download('wordnet')

words = set(brown.words())
stopwords = set(stopwords.words('english'))
lemma = WordNetLemmatizer()
ignorewords = set(word.strip() for word in open(IGNORES, 'r').readlines())

[nltk_data] Downloading package brown to /home/iljae/nltk_data...
[nltk_data]   Package brown is already up-to-date!
[nltk_data] Downloading package stopwords to /home/iljae/nltk_data...
[nltk_data]   Package stopwords is already up-to-date!
[nltk_data] Downloading package punkt to /home/iljae/nltk_data...
[nltk_data]   Package punkt is already up-to-date!
[nltk_data] Downloading package wordnet to /home/iljae/nltk_data...
[nltk_data]   Package wordnet is already up-to-date!


In [13]:
from pyspark.ml.feature import CountVectorizer, IDF

def tokenize(cont):
    raw_tokens = [word.lower().strip()
#                     lemma.lemmatize(word.lower().strip())
                  for sent in sent_tokenize(cont)
                  for word in word_tokenize(sent)]
    
#     isascii = lambda s: len(s) == len(s.encode())
    
    return [token for token in raw_tokens
            if token \
            and len(token) >= 3
            and token not in stopwords \
            and token.isalpha()
            and token not in ignorewords \
#             and token in words \
            and not token.replace('.','',1).isdigit()]

def transform_to_words(df):
    return df.withColumn('words', F.udf(tokenize,
                                        types.ArrayType(types.StringType()))(F.col('content')))

all_words_df = transform_to_words(text_all_year_df)
outbound_words_df = transform_to_words(text_outbound_year_df)
inbound_words_df = transform_to_words(text_inbound_year_df)

In [14]:
def transform_to_embeddings(df):
    cv = CountVectorizer(inputCol="words", outputCol="vectors")
    words_model = cv.fit(df)
    embedding_df = words_model.transform(df)
    return embedding_df, words_model

all_embeddings_df, all_words_model = transform_to_embeddings(all_words_df)
outbound_embeddings_df, outbound_words_model = transform_to_embeddings(outbound_words_df)
inbound_embeddings_df, inbound_words_model = transform_to_embeddings(inbound_words_df)

In [15]:
def transform_to_features(df):
    idf = IDF(inputCol="vectors", outputCol="features")
    idf_model = idf.fit(df)
    features_df = idf_model.transform(df)
    return features_df
    
all_features_df = transform_to_features(all_embeddings_df)
outbound_features_df = transform_to_features(outbound_embeddings_df)
inbound_features_df = transform_to_features(inbound_embeddings_df)

In [16]:
NUM_WORDS = 15

def transform_to_tokens(df):
    return df.withColumn('tokens', F.udf(lambda vector: (vector.toArray() * -1).argsort()[:NUM_WORDS].tolist(),
                                                  types.ArrayType(types.IntegerType()))(F.col('features')))

all_tokens_df = transform_to_tokens(all_features_df)
outbound_tokens_df = transform_to_tokens(outbound_features_df)
inbound_tokens_df = transform_to_tokens(inbound_features_df)

In [17]:
def fetch_words(data, model):
    return list([d.year] + list(model.vocabulary[idx] for idx in d.tokens)
                for d in sorted(data, key=lambda d: d.year))

all_words = fetch_words(all_tokens_df.select('year', 'tokens').collect(), all_words_model)
outbound_words = fetch_words(outbound_tokens_df.select('year', 'tokens').collect(), outbound_words_model)
inbound_words = fetch_words(inbound_tokens_df.select('year', 'tokens').collect(), inbound_words_model)

In [18]:
pd.DataFrame(all_words,
             columns=['Year'] + ['Word {}'.format(i+1) for i in range(NUM_WORDS)]) \
    .set_index('Year')

Unnamed: 0_level_0,Word 1,Word 2,Word 3,Word 4,Word 5,Word 6,Word 7,Word 8,Word 9,Word 10,Word 11,Word 12,Word 13,Word 14,Word 15
Year,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1,Unnamed: 8_level_1,Unnamed: 9_level_1,Unnamed: 10_level_1,Unnamed: 11_level_1,Unnamed: 12_level_1,Unnamed: 13_level_1,Unnamed: 14_level_1,Unnamed: 15_level_1
2011,putnam,caleb,dnt,judy,donna,yuna,thts,guyz,bryan,campused,dre,tht,kkkkk,paranormal,jaee
2012,burial,georgy,funeral,smallnum,norton,boarders,bowl,conner,nayan,kohl,bryan,zaahid,parp,caleb,genki
2013,moai,mcs,digipen,genki,kinect,mhacks,nawh,getz,donna,damian,snowboarding,esenthel,barrons,stanford,fragments
2014,craneshout,lemoneda,unity,domo,rust,nawh,donna,wot,sticker,readly,dcd,launch,kthdigit,cranetalk,snowday
2015,nick,eric,tony,drone,vaporwave,thipok,torchlight,sticker,jonah,scs,sin,cit,kelly,equipotential,machine
2016,robin,terrance,nate,beatrix,sophus,noop,gord,tendi,lisa,tender,argo,onboardiq,dolo,housing,dan
2017,wya,lmao,accuracy,jarrid,perry,jeehee,lmk,salesforce,caltrain,sia,entropy,cristina,theta,whatchu,uber
2018,wya,jarrid,lmao,hug,anton,lmk,corso,barzan,daston,thach,pooja,maliev,nyc,whatchu,baby


In [19]:
pd.DataFrame(outbound_words,
             columns=['Year'] + ['Word {}'.format(i+1) for i in range(NUM_WORDS)]) \
    .set_index('Year')

Unnamed: 0_level_0,Word 1,Word 2,Word 3,Word 4,Word 5,Word 6,Word 7,Word 8,Word 9,Word 10,Word 11,Word 12,Word 13,Word 14,Word 15
Year,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1,Unnamed: 8_level_1,Unnamed: 9_level_1,Unnamed: 10_level_1,Unnamed: 11_level_1,Unnamed: 12_level_1,Unnamed: 13_level_1,Unnamed: 14_level_1,Unnamed: 15_level_1
2011,caleb,costume,donna,fob,judy,rachel,hao,dre,ziwen,bryan,mudkip,zzzz,xie,goin,bio
2012,conner,hydrogen,chia,kohl,norton,bon,genki,clay,spectrum,yearbook,holden,religions,carl,allright,smashthewindow
2013,mcs,moai,joe,genki,mhacks,bham,barrons,fragments,lool,kinect,liz,esenthel,getz,digipen,leap
2014,craneshout,lemoneda,rust,john,allen,snowday,digitcount,heroku,expansion,launch,probs,kthdigit,cranetalk,domo,theres
2015,photo,drone,eric,torchlight,kelly,umich,chun,jonah,tony,dope,probs,data,machina,vegan,vaporwave
2016,sophus,photo,noop,onboardiq,argo,skydiving,housing,san,francisco,yup,montreal,econ,beq,fetch,cpt
2017,lmao,okie,idk,wya,photo,caltrain,salesforce,uber,accuracy,palo,alto,gpu,hmmm,ugh,branden
2018,lmao,idk,wya,photo,corso,nyc,tripping,lmk,ehhh,anton,hug,vikas,hmmm,gif,phd


In [20]:
pd.DataFrame(inbound_words,
             columns=['Year'] + ['Word {}'.format(i+1) for i in range(NUM_WORDS)]) \
    .set_index('Year')

Unnamed: 0_level_0,Word 1,Word 2,Word 3,Word 4,Word 5,Word 6,Word 7,Word 8,Word 9,Word 10,Word 11,Word 12,Word 13,Word 14,Word 15
Year,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1,Unnamed: 8_level_1,Unnamed: 9_level_1,Unnamed: 10_level_1,Unnamed: 11_level_1,Unnamed: 12_level_1,Unnamed: 13_level_1,Unnamed: 14_level_1,Unnamed: 15_level_1
2011,putnam,tht,dnt,yuna,thts,caleb,kkk,guyz,kkkkk,jaee,doetze,judy,kay,ding,campused
2012,burial,georgy,funeral,norton,boarders,smallnum,olympiad,lizzy,nayan,zaahid,bowl,crannet,caleb,scio,gump
2013,moai,nawh,donna,digipen,warrant,meixia,mcs,snowboarding,damian,subramanian,peace,julicher,deng,kinect,php
2014,lemoneda,craneshout,unity,donna,nawh,wot,lsd,domo,arviso,crannet,sticker,dcd,chayce,dvc,rust
2015,nick,eric,vaporwave,tony,thipok,sticker,equipotential,drone,ryuni,seizures,recital,kyoto,canon,isoceles,epilepsy
2016,nate,robin,terrance,beatrix,dan,tendi,tender,gord,octopus,lisa,tahoe,dolo,mission,bootie,perry
2017,accuracy,perry,wya,jarrid,lmao,jeehee,entropy,sia,lmk,whatchu,cristina,branden,giuseppe,autoencoder,theta
2018,jarrid,wya,hug,lmao,anton,barzan,lmk,thach,daston,angela,baby,mao,tru,icml,whatchu
