* Generate data for the polarization levels over time of yearly cohorts.
* Can look at all political activity or just left, right.

In [11]:
partisan_dimen = "partisan"
political_activity_category = "all" # can be 'all_unfiltered' to include all activity, including apolitical

In [12]:
import sys
import os
sys.path.append("..")
from commembed.jupyter import *
import commembed.linalg as linalg
import commembed.dimens as dimens
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import datetime
from tqdm.notebook import tqdm
tqdm.pandas()
%config InlineBackend.figure_format = 'retina'
%config InlineBackend.print_figure_kwargs={'facecolor' : "w"}
from pyspark.sql.functions import *

import commembed.data as data
import commembed.analysis as analysis


spark = data.spark_context()

%load_ext autoreload
%autoreload 2

Spark WebUI: http://ada.ais.sandbox:4046
The autoreload extension is already loaded. To reload it, use:
  %reload_ext autoreload


In [13]:
user_counts = data.load("all_objects_monthly_user_counts")
user_counts.createOrReplaceTempView("user_counts")

scores_filtered, scores_df = load_abs_z_df(partisan_dimen, political_activity_category)

Spark WebUI: http://ada.ais.sandbox:4046
10006 political subreddits selected


In [14]:
filtered_counts = spark.sql("select * from user_counts inner join scores on subreddit = community")
filtered_counts.createOrReplaceTempView("filtered_counts")

In [15]:
author_first_months = filtered_counts.groupBy("author").agg(min("month").alias("first_month"))
author_first_months.cache()
author_first_months.createOrReplaceTempView("author_first_months")

In [16]:
min_month = "2012-01"

# Can first aggregate by subreddit since score is constant for a given subreddit
subreddit_aggregated_rows = spark.sql(f"""
    select substr(greatest(first_month, '{min_month}'), 0, 4) as first_year, month, subreddit,
        sum(num_comments) as num_comments,
        first(partisan_dimen) as partisan_dimen
        
    from filtered_counts
    
    left join author_first_months
    on author_first_months.author = filtered_counts.author
    
    where filtered_counts.author != "[deleted]"
    
    group by 1, 2, 3
""")
subreddit_aggregated_rows.cache()
subreddit_aggregated_rows.createOrReplaceTempView("subreddit_aggregated_rows")

means = spark.sql(f"""
    select first_year, month, sum(num_comments*partisan_dimen)/sum(num_comments) as avg_abs_z_score
    from subreddit_aggregated_rows
    group by 1, 2
""")
means.cache()
means.createOrReplaceTempView("means")

user_cohorts = spark.sql("""
    select subreddit_aggregated_rows.first_year,
            subreddit_aggregated_rows.month, 
            sum(num_comments) as num_comments,
            first(means.avg_abs_z_score) as avg_abs_z_score,
            sqrt(sum(pow(partisan_dimen-avg_abs_z_score, 2)*num_comments)/sum(num_comments)) as std_dev
        
    from subreddit_aggregated_rows
    
    inner join means
    on means.first_year = subreddit_aggregated_rows.first_year
        and means.month = subreddit_aggregated_rows.month
        
    group by 1, 2
""")

In [17]:
# We also calculate population level standard deviation
overall_means = spark.sql("""
    select month, sum(num_comments*partisan_dimen)/sum(num_comments) as avg_abs_z_score
    from subreddit_aggregated_rows
    group by 1
""")
overall_means.cache()
overall_means.createOrReplaceTempView("overall_means")

overall_stats = spark.sql("""
    select subreddit_aggregated_rows.month, 
            sum(num_comments) as num_comments,
            first(overall_means.avg_abs_z_score) as avg_abs_z_score,
            sqrt(sum(pow(partisan_dimen-avg_abs_z_score, 2)*num_comments)/sum(num_comments)) as std_dev
        
    from subreddit_aggregated_rows
    
    inner join overall_means
    on overall_means.month = subreddit_aggregated_rows.month
        
    group by 1
""")

In [18]:
# Save
user_cohorts.write.parquet(
    os.path.join(data.DATA_PATH, "cohort_data_%s_%s.parquet" % (partisan_dimen, political_activity_category)),
    mode='overwrite')

In [19]:
# Save
overall_stats.write.parquet(
    os.path.join(data.DATA_PATH, "cohort_data_overall_%s_%s.parquet" % (partisan_dimen, political_activity_category)),
    mode='overwrite')