In [2]:
import findspark
findspark.init("/h/224/cameron/spark-3.0.0-preview2-bin-hadoop2.7")
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.functions import date_sub
from glob import glob
from datetime import datetime
import numpy as np
import pandas as pd
import tempfile

In [3]:
spark = SparkSession.builder.getOrCreate()
spark.sparkContext.getConf().getAll()
TIME_FRAME = "monthly"
window = {
    "weekly": 7,
    "biweekly": 14,
    "monthly": None,
}

In [9]:
# Load the Parquet data
comments = spark.read.load("/comments_2019.parquet").fillna("")
subreddits = spark.read.load("dataframes/subreddits.parquet")
comments = comments.join(subreddits, ['subreddit'], 'leftsemi')
cols = ['author','subreddit','created_utc']
comments = comments.select(*cols)
comments.printSchema()

root
 |-- author: string (nullable = false)
 |-- subreddit: string (nullable = false)
 |-- created_utc: integer (nullable = true)



In [10]:
# Add date column
# Create a function that returns the desired UDF from a timestamp 
to_udf = udf(lambda ts: datetime.fromtimestamp(ts).strftime("%Y-%m-%d %H:00:00"))

comments = comments.withColumn("timestamp", to_udf(comments["created_utc"]))
if TIME_FRAME in ["weekly","biweekly"]:
    # Add column that aggregates by week 
    comments = comments.withColumn("week",date_sub(next_day(col("timestamp"),"sunday"),window[TIME_FRAME]))
    comments = comments.withColumn('subreddit', concat(lit('('),col('subreddit'),lit(','),col('week'),lit(')')))
    comments = comments.drop(*["created_utc","timestamp","week"])
else:
    comments = comments.withColumn('month', date_format(col('timestamp'), '1/M/yyyy'))
    comments = comments.withColumn('subreddit', concat(lit('('),col('subreddit'),lit(','),col('month'),lit(')')))
    comments = comments.drop(*["created_utc","timestamp","month"])

comments.show()

+----------------+--------------------+
|          author|           subreddit|
+----------------+--------------------+
|     jncummins86|(stilltrying,1/3/...|
|     andrewmyles|(NintendoSwitch,1...|
|           -QBM-|(LofiHipHop,1/3/2...|
|    BlazeGiraffe|  (cocaine,1/3/2019)|
|         SeeDeez|   (nyjets,1/3/2019)|
|   Serious_Sam_2|(RocketLeagueExch...|
|       [deleted]|(moist_memes,1/3/...|
|       [deleted]|(worldnews,1/3/2019)|
|IrishEyesRsmilin|(StevenAveryIsGui...|
|  RanietsSharvas|(AnthemTheGame,1/...|
|          aeolid|(Sourdough,1/3/2019)|
|        jobn2021|(WaltDisneyWorld,...|
|  KillUrselfAcne|(8BallPool,1/3/2019)|
|       Bankertov|(StreetFighter,1/...|
|       HiImDavid|(guineapigs,1/3/2...|
|        Nole2424|(apexlegends,1/3/...|
|      1000Dragon|(Maplestory,1/3/2...|
|     mrfuckhead1|(synthesizers,1/3...|
|        phcullen|(Charlotte,1/3/2019)|
|   thelucidvegan|   (movies,1/3/2019)|
+----------------+--------------------+
only showing top 20 rows



## Word2Vecf Files
[Word2vecf](https://github.com/BIU-NLP/word2vecf/blob/master/README.md) requires three inputs
* training_data: text file of word-context pairs (space delimited)
* word_vocabulary: file mapping subreddits (strings) to their counts
* count_vocabulary: file mapping users (contexts -> subreddit commenters) to their counts

### Training Data

We want to avoid having to load the raw data as there are 1 billion+ rows. Working with aggregates from the start makes things much easier. 

*Since this is the temporal embedding we've already added the week into the subreddit name. Each subreddit/week combo is considered a new word with possible different contexts*

In [11]:
training_data = comments.groupBy(["subreddit","author"]).count().cache()
training_data.show()

+--------------------+-------------------+-----+
|           subreddit|             author|count|
+--------------------+-------------------+-----+
|   (hockey,1/3/2019)|        Intylerable|   12|
|(CanadaPublicServ...|        AntonBanton|    5|
|(entitledparents,...|        viscool8332|  118|
|(todayilearned,1/...|             hilti2|    4|
|      (sex,1/3/2019)|         realistnic|    1|
| (politics,1/3/2019)|       smikelsmikel|   93|
|(elderscrollsonli...|   the_scarlet_ibis|   12|
|(h3h3productions,...|              2dros|    1|
|(AskReddit,1/3/2019)|     Levicorpyutani|    9|
|(solotravel,1/3/2...|            EmmalNz|   93|
|   (GoNets,1/3/2019)|         BlaackkOuT|  297|
| (startrek,1/3/2019)|          rebbsitor|   13|
|(legaladvice,1/3/...|          [deleted]|20628|
|(microgrowery,1/3...|Ihavenobusinesshere|   12|
|(StarWarsLeaks,1/...|      kingpenguinJG|   57|
|(traaaaaaannnnnnn...|             Rota_u|   30|
|(unpopularopinion...| YodasRedditAccount|  421|
|(marvelstudios,1/..

In [12]:
training_data.count()

319777858

### Word Vocabulary

In [13]:
from pyspark.sql.functions import sum as _sum
word_vocabulary = training_data.groupBy("subreddit").agg(_sum('count').alias('count')).cache()
word_vocabulary.show()

+--------------------+-------+
|           subreddit|  count|
+--------------------+-------+
|(Allergies,1/3/2019)|   2138|
|(FortNiteBR,1/3/2...| 652991|
|(BuyItForLife,1/3...|  11572|
|   (iphone,1/3/2019)|  53763|
|(dankmemes,1/3/2019)|1303112|
|   (Tgirls,1/3/2019)|   3693|
|(DissidiaFFOO,1/3...|  27247|
|     (cats,1/3/2019)|  94074|
|   (AFROTC,1/3/2019)|   1423|
|  (gaybros,1/3/2019)|  28645|
|    (Jokes,1/3/2019)| 114812|
|   (occult,1/3/2019)|  15956|
|   (mexico,1/3/2019)|  52126|
|     (bdsm,1/3/2019)|   5622|
|(DeepIntoYouTube,...|   7981|
|(NFL_Draft,1/3/2019)|  25266|
|(wildhearthstone,...|   5615|
|(Splatoon_2,1/3/2...|   3915|
|(TaylorSwift,1/3/...|  14943|
|     (army,1/3/2019)|  52576|
+--------------------+-------+
only showing top 20 rows



In [14]:
word_vocabulary.count()

120825

### Context Vocabulary

In [15]:
context_vocabulary = training_data.groupBy("author").agg(_sum('count').alias('count')).cache()
context_vocabulary.show()

+----------------+-----+
|          author|count|
+----------------+-----+
|the_scarlet_ibis|   82|
|       zeppeIans| 2118|
|      SirDeVinci|  417|
|      _Erindera_| 3913|
| thundershocker1|  269|
|          Sqiddd| 7489|
|    weedwhacking| 2026|
|    TeaTreeTeach|  613|
|       oFaceless|  326|
|  TheFirstUserID|  344|
|         jingz13|   53|
|     JustinBilyj|  400|
|       deucemc26| 1996|
|     tanngrisnit| 7821|
|         hdv2017|  171|
|    Mustache_Guy| 1841|
|      uglygaming|  426|
|        Embossis|   28|
|     kevin123245|   66|
|     SabrinaHiss|  110|
+----------------+-----+
only showing top 20 rows



In [16]:
context_vocabulary.count()

19338041

## Write Vocabularies and Training Data to File

In [17]:
# Create a temp context for the word and context vocabulary files (which get passed to the word2vecf script)
import subprocess
import sys
import os
temp_dir = "/h/224/cameron/Political-Subreddit-Embedding/temp/temporal/"
subprocess.run("mkdir -p {}".format(temp_dir), shell=True)

CompletedProcess(args='mkdir -p /h/224/cameron/Political-Subreddit-Embedding/temp/temporal/', returncode=0)

In [18]:
# Create temp files
file_data = os.path.join(temp_dir, '{}_data'.format(TIME_FRAME))
file_wv = os.path.join(temp_dir, '{}_wv'.format(TIME_FRAME))
file_cv = os.path.join(temp_dir, '{}_cv'.format(TIME_FRAME))

In [None]:
print("Writing training data to {}...".format(file_data))
training_data.write.csv(file_data,header=False,sep=' ')
# training_data.toPandas().to_csv(file_data, header=False, index=False, sep=' ')
# training_data.unpersist()

Writing training data to /h/224/cameron/Political-Subreddit-Embedding/temp/temporal/monthly_data...


In [None]:
print("Writing word vocab data to {}...".format(file_wv))
word_vocabulary.write.csv(file_wv,header=False,sep=' ')
# word_vocabulary.toPandas().to_csv(file_wv, header=False, index=False, sep=' ')
# word_vocabulary.unpersist()

In [None]:
print("Writing context vocab data to {}...".format(file_cv))
context_vocabulary.write.csv(file_cv,header=False,sep=' ')
# context_vocabulary.toPandas().to_csv(file_cv, header=False, index=False, sep=' ')
# context_vocabulary.unpersist()

In [None]:
from utils import coalese_csvs
file_data = coalese_csvs(file_data,"{}.txt".format(file_data))

In [None]:
file_wv = coalese_csvs(file_data,"{}.txt".format(file_wv))
file_cv = coalese_csvs(file_data,"{}.txt".format(file_cv))

## Train Embedding

In [None]:
# Word2vec parameters, using negative sampling
# -alpha 0.18 -negative 35 -sample 0.0043 -size 150
from utils import generate_embedding, load_embedding
embedding_args = {
                    "param1": "sample", 
                    "p1": 0.0043, 
                    "param2": "negative", 
                    "p2": 35, 
                    "file_data": file_data , 
                    "file_wv": file_wv, 
                    "file_cv": file_cv,
                    "size": 150,
                    "alpha": 0.18
                 }
embedding = generate_embedding(embedding_args)
embedding

In [None]:
subreddits, vectors = load_embedding(embedding)
subreddits

### Parse Out Subreddit from Week Again

Since we've already trained all of the seperate emebeddings there isn't a need for them to be in the same column anymore. This will make animating the emebedding over time easier.

In [None]:
from utils import parse_tup

sub_df = pd.DataFrame(subreddits.apply(parse_tup).tolist())
sub_df.columns = ["subreddit","week"]
sub_df

## Visualize/Animate
1. Reduce to 3/2 dimensions
2. Add subreddit/week columns to factored dataframe
3. Visualize

In [None]:
from sklearn.decomposition import PCA
import plotly.express as px
subprocess.run("mkdir -p visualizations/temporal", shell=True)
left_subreddits = ["JoeBiden","Pete_Buttigieg","Kamala",
                        "SandersForPresident","BetoORourke","ElizabethWarren",
                        "BaemyKlobaechar","YangForPresidentHQ","politics","progressive",
                        "demsocialist","SocialDemocracy","centerleftpolitics",
                        "ConservativeDemocrat","moderatepolitics","ChapoTrapHouse"]
right_subreddits = ["The_Donald","Conservative","ShitPoliticsSays","progun","Republican","Capitalism"]

In [None]:
# PCA Dim Reduction -> 2 dimensions
pca =  PCA(n_components = 2)
two_dim =  pd.DataFrame(pca.fit_transform(vectors))
two_dim[["subreddit","week"]] = sub_df
idx = pd.MultiIndex.from_product([two_dim['week'].unique(), two_dim['subreddit'].unique()],
                                 names=['week', 'subreddit'])

# In the case that there isn't a vector for a specific week/subreddit we bacfill the vector from the previous
two_dim = two_dim.set_index(['week', 'subreddit']).reindex(idx).reset_index().sort_values('week').bfill()
two_dim = two_dim[two_dim["subreddit"].isin(left_subreddits) | two_dim["subreddit"].isin(right_subreddits)]
two_dim["partisan"] = np.where(two_dim["subreddit"].isin(left_subreddits), 'left', 'right')
two_dim

In [1]:
(max_x, max_y), (min_x, min_y) = two_dim[[0,1]].max(axis=0), two_dim[[0,1]].min(axis=0)
args = {
    "x": 0,
    "y": 1,
    "hover_name": "subreddit",
    "text": "subreddit",
    "opacity": 0.7,
    "color": "partisan",
    "animation_frame": two_dim.week.astype(str),
    "animation_group": "subreddit",
    "range_x": [min_x-3,max_x+3],
    "range_y": [min_y-3,min_y+3],
}
fig = px.scatter(two_dim,**args)
fig.update_traces(marker=dict(size=12,
                              line=dict(width=2,
                                        color='DarkSlateGrey')),
                  selector=dict(mode='markers'))
fig.write_html("visualizations/temporal/{}_2d_scatter.html".format(TIME_FRAME))
fig.show()

NameError: name 'two_dim' is not defined

In [2]:
# PCA Dim Reduction -> 3 dimensions
pca =  PCA(n_components = 3)
three_dim =  pd.DataFrame(pca.fit_transform(vectors))
three_dim[["subreddit","week"]] = sub_df
idx = pd.MultiIndex.from_product([three_dim['week'].unique(), three_dim['subreddit'].unique()],
                                 names=['week', 'subreddit'])

# In the case that there isn't a vector for a specific week/subreddit we bacfill the vector from the previous
three_dim = three_dim.set_index(['week', 'subreddit']).reindex(idx).reset_index().sort_values('week').bfill()
three_dim = three_dim[three_dim["subreddit"].isin(left_subreddits) | three_dim["subreddit"].isin(right_subreddits)]
three_dim["partisan"] = np.where(three_dim["subreddit"].isin(left_subreddits), 'left', 'right')
three_dim

NameError: name 'PCA' is not defined

In [None]:
(max_x, max_y, max_z), (min_x, min_y, min_z) = three_dim[[0,1,2]].max(axis=0), three_dim[[0,1,2]].min(axis=0)
args = {
    "x": 0,
    "y": 1,
    "z": 2,
    "hover_name": "subreddit",
#     "text": "subreddit",
    "opacity": 0.7,
    "color": "partisan",
    "animation_frame": three_dim.week.astype(str),
    "animation_group": "subreddit",
    "range_x": [min_x-3,max_x+3],
    "range_y": [min_y-3,min_y+3],
    "range_z": [min_z-3,max_z+1]

}
fig = px.scatter_3d(three_dim,**args)
fig.update_traces(marker=dict(size=12,
                              line=dict(width=2,
                                        color='DarkSlateGrey')),
                  selector=dict(mode='markers'))
fig.write_html("visualizations/temporal/{}_3d_scatter.html".format(TIME_FRAME))
fig.show()