In [1]:
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns

import pyspark
from pyspark import SparkConf, SparkContext
from pyspark.sql.functions import udf,desc,abs
from datetime import datetime

In [2]:
conf = SparkConf()
sc = SparkContext(conf = conf)
spark = pyspark.SQLContext(sc)

### Read json

In [3]:
reddit_df = spark.read.json('submissions_full.json')

In [4]:
reddit_df.printSchema()

root
 |-- author: string (nullable = true)
 |-- author_fullname: string (nullable = true)
 |-- author_fullname_lvl: string (nullable = true)
 |-- created_utc: string (nullable = true)
 |-- full_link: string (nullable = true)
 |-- id: string (nullable = true)
 |-- num_comments: string (nullable = true)
 |-- num_crossposts: string (nullable = true)
 |-- score: string (nullable = true)
 |-- selftext: string (nullable = true)
 |-- subreddit: string (nullable = true)
 |-- subreddit_id: string (nullable = true)
 |-- subreddit_id_lvl: string (nullable = true)
 |-- subreddit_subscribers: string (nullable = true)
 |-- subreddit_type: string (nullable = true)
 |-- title: string (nullable = true)



In [5]:
reddit_df.take(2)

[Row(author='CautiousInvestor', author_fullname='2gcxulwq', author_fullname_lvl='t2', created_utc='2019-01-01 08:01:42', full_link='https://www.reddit.com/r/investing/comments/ab8cj7/how_can_the_us_stock_market_gain_7_on_average_per/', id='ab8cj7', num_comments='412', num_crossposts='0', score='1', selftext='', subreddit='investing', subreddit_id='2qhhq', subreddit_id_lvl='t5', subreddit_subscribers='643895', subreddit_type='public', title='How can the US stock market gain 7% on average per year, while the economy only grows 1-3% annually?'),
 Row(author='Sofla328', author_fullname='f30ge', author_fullname_lvl='t2', created_utc='2019-01-01 08:03:29', full_link='https://www.reddit.com/r/stocks/comments/ab8d5o/term_for_a_broker_buying_worthless_positions_from/', id='ab8d5o', num_comments='1', num_crossposts='0', score='1', selftext='There\'s a term like "closet trade" or something like that, that is when a broker will purchase worthless positions from a client for a nominal price so the 

### Change the utc form into local time(string)

In [6]:
# create week, month, year functions
# get week, month, year values
week =  udf(lambda x: datetime.strptime(x, '%Y-%m-%d %H:%M:%S').strftime('%W'))
reddit_df = reddit_df.withColumn('week', week(reddit_df['created_utc']))

month = udf(lambda x: datetime.strptime(x, '%Y-%m-%d %H:%M:%S').strftime('%m'))
reddit_df = reddit_df.withColumn('month', month(reddit_df['created_utc']))

year = udf(lambda x: datetime.strptime(x, '%Y-%m-%d %H:%M:%S').strftime('%Y'))
reddit_df = reddit_df.withColumn('year', year(reddit_df['created_utc']))

day = udf(lambda x: datetime.strptime(x, '%Y-%m-%d %H:%M:%S').strftime('%x'))
reddit_df = reddit_df.withColumn('date', day(reddit_df['created_utc']))


### RDD

In [7]:
reddit_rdd = reddit_df.rdd

In [8]:
# get unique subreddit id and name
subreddit = reddit_rdd.map(lambda x:(x['subreddit'],x['subreddit_id'])).distinct()

In [9]:
subreddit = subreddit.collect()
subreddit 

[('SecurityAnalysis', '2s7v0'),
 ('stocks', '2qjfk'),
 ('wallstreetbets', '2th52'),
 ('investing', '2qhhq'),
 ('RobinHood', '2uud8')]

## 1. Author count for each subreddit by day

In [22]:
# get author subreddit pairs
author_subreddit_pairs = reddit_rdd.map(lambda x:((x['author_fullname'],x['subreddit_id'],x['date']),1))\
.filter(lambda x: x[0][0] != 'nan')\
.reduceByKey(lambda x,y: x+y).mapValues(lambda x: 1)

# get author count
author_subreddit_cnt = author_subreddit_pairs.map(lambda x:((x[0][1],x[0][2]),1))\
.reduceByKey(lambda x,y:x+y)

In [23]:
# save the result to the file
deptColumns = ["temp","author_cnt"]
df1 = author_subreddit_cnt.toDF(deptColumns)

sid = udf(lambda x: x[0] )
df1 = df1.withColumn('SubredditID', sid(df1['temp']))

date = udf(lambda x: x[1] )
df1 = df1.withColumn('date', date(df1['temp']))

df1 = df1.drop('temp')

df1.coalesce(1).write.csv('author_cnt')

#### Stats

In [41]:
#author_cnt = pd.read_csv('author_cnt.csv',header=None)
author_cnt.columns =['count','subreddit','date']
author_cnt.groupby(['subreddit'])['count'].describe()

Unnamed: 0_level_0,count,mean,std,min,25%,50%,75%,max
subreddit,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1,Unnamed: 8_level_1
RobinHood,787.0,44.200762,29.825495,6.0,24.0,36.0,60.0,247.0
SecurityAnalysis,784.0,7.096939,3.344929,1.0,5.0,7.0,9.0,30.0
investing,787.0,73.771283,28.339518,20.0,53.0,72.0,89.0,223.0
stocks,787.0,70.988564,42.307593,10.0,36.0,62.0,101.0,255.0
wallstreetbets,787.0,134.560356,49.617466,36.0,94.0,129.0,169.0,296.0


## 2. Post count for each subreddit by day

In [24]:
# count the id in each subreddit in every day
ID_subreddit_pairs = reddit_rdd.map(lambda x:((x['id'],x['subreddit_id'],x['date']),1))\
.filter(lambda x: x[0][0] != 'nan')\
.reduceByKey(lambda x,y: x+y).mapValues(lambda x: 1)

ID_subreddit_cnt = ID_subreddit_pairs.map(lambda x:((x[0][1],x[0][2]),1))\
.reduceByKey(lambda x,y:x+y)

In [25]:
# save the result to the file
deptColumns = ["temp","id_cnt"]
df1 = ID_subreddit_cnt.toDF(deptColumns)

sid = udf(lambda x: x[0] )
df1 = df1.withColumn('SubredditID', sid(df1['temp']))

date = udf(lambda x: x[1] )
df1 = df1.withColumn('date', date(df1['temp']))

df1 = df1.drop('temp')

df1.coalesce(1).write.csv('id_cnt')

#### Stats

In [40]:
#id_cnt = pd.read_csv('id_cnt.csv',header=None)
id_cnt.columns =['count','subreddit','date']
id_cnt.groupby(['subreddit'])['count'].describe()

Unnamed: 0_level_0,count,mean,std,min,25%,50%,75%,max
subreddit,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1,Unnamed: 8_level_1
RobinHood,787.0,48.623888,34.080407,6.0,24.0,38.0,68.0,258.0
SecurityAnalysis,784.0,8.196429,4.019707,1.0,5.0,8.0,10.25,31.0
investing,787.0,82.658196,33.939121,21.0,58.0,80.0,100.0,268.0
stocks,787.0,81.621347,50.449571,11.0,39.0,70.0,115.0,300.0
wallstreetbets,787.0,155.496823,59.844994,39.0,104.0,148.0,200.0,300.0


## 3. Avg subscribers for each subreddit by day

In [33]:
# count avg Number Of subscribers in a week
def averagelist(lst):
    s=0
    for i in lst:
        s+=i
        return (s // len(lst))

In [27]:
subscr_subreddit_pairs = reddit_rdd.map(lambda x:((x['author_fullname'],x['subreddit_id'],str(x['subreddit_subscribers']),x['date']),1))\
.filter(lambda x: x[0][0] != 'nan')\
.reduceByKey(lambda x,y: x+y).mapValues(lambda x: 1)

subscr_subreddit_pairs_cnt = subscr_subreddit_pairs.map(lambda x:((x[0][1],x[0][3]),x[0][2] ))\
.reduceByKey(lambda x,y: x+' '+y )

subscr_subreddit_pairs_cnt1 = subscr_subreddit_pairs_cnt.map(  lambda d:  (d[0],  [ int(x) for x in  d[1].split(" ") ]  )  )
subscr_subreddit_pairs_cnt = subscr_subreddit_pairs_cnt1.map(lambda d:(d[0], averagelist(d[1])))

In [28]:
# save the result to the file
deptColumns = ["temp","AverageSubscribers"]
df1 = subscr_subreddit_pairs_cnt.toDF(deptColumns)

sid = udf(lambda x: x[0] )
df1 = df1.withColumn('SubredditID', sid(df1['temp']))

date = udf(lambda x: x[1] )
df1 = df1.withColumn('date', date(df1['temp']))

df1 = df1.drop('temp')

df1.coalesce(1).write.csv('subscribers_avg')

#### Stats

In [39]:
#subscribers_avg = pd.read_csv('subscribers_avg.csv',header=None)
subscribers_avg.columns =['count','subreddit','date']
subscribers_avg.groupby(['subreddit'])['count'].describe()

Unnamed: 0_level_0,count,mean,std,min,25%,50%,75%,max
subreddit,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1,Unnamed: 8_level_1
RobinHood,787.0,7204.022872,4670.172583,1574.0,4003.0,5691.0,8730.5,33703.0
SecurityAnalysis,784.0,12030.364796,11165.964576,2145.0,6371.0,9490.0,14099.25,153019.0
investing,787.0,13111.307497,4691.328889,4761.0,10143.0,12077.0,15000.0,56348.0
stocks,787.0,9465.649301,8392.757268,3228.0,6141.5,8308.0,11163.0,213112.0
wallstreetbets,787.0,7988.834816,7035.974612,2896.0,5250.0,6524.0,8126.0,92528.0


## 4. Retention rate for each subreddit pair by day

In [13]:
df = pd.read_csv('submissions_full.csv')
df = df[['author_fullname','subreddit_id','created_utc']]

df['date'] = df['created_utc'].apply(lambda x: datetime.strptime(x, '%Y-%m-%d %H:%M:%S').strftime('%x'))
df['date'] = pd.to_datetime(df.date, format='%m/%d/%y')

count = df.groupby(['subreddit_id','date'])['author_fullname'].count()
count = pd.DataFrame(count)

In [14]:
numerator = count
denominator = count

denominator = denominator.reset_index()
denominator = denominator.sort_values(by = 'date')

reddits = denominator['subreddit_id'].unique().tolist()
subgroup = denominator.groupby(['subreddit_id'])
df1 = subgroup.get_group('2qhhq').sort_values(by = 'date')
df2 = subgroup.get_group('2th52').sort_values(by = 'date').reset_index(drop = True)
df3 = subgroup.get_group('2s7v0').sort_values(by = 'date').reset_index(drop = True)
df4 = subgroup.get_group('2qjfk').sort_values(by = 'date').reset_index(drop = True)
df5 = subgroup.get_group('2uud8').sort_values(by = 'date').reset_index(drop = True)

In [15]:
def get_sum(dataset):
    for j in range(len(dataset)):
        datanew = dataset
        a = j-30
        b = j
        c = datanew.iloc[a:b,2].sum()
        datanew.loc[j,'total'] = c
    return datanew

In [16]:
df11 = get_sum(df1)
df21 = get_sum(df2)
df31 = get_sum(df3)
df41 = get_sum(df4)
df51 = get_sum(df5)

rate = pd.DataFrame()

rate = rate.append(df11)
rate = rate.append(df21)
rate = rate.append(df31)
rate = rate.append(df41)
rate = rate.append(df51)

In [17]:
rate = rate.replace(0,np.nan)
rate['retention_rate'] = rate['author_fullname'] / rate['total']
rate = rate.drop(columns = ['author_fullname','total'])
rate = rate.reset_index(drop = True)

In [None]:
rate.to_csv('retention_rate.csv')

## 5. Avg score for each subreddit by day

In [14]:
subreddit_score_pairs = reddit_rdd.map(lambda x:((x['author_fullname'],x['subreddit_id'],str(x['score']),x['date']),1))\
.filter(lambda x: x[0][0] != 'nan')\
.reduceByKey(lambda x,y: x+y).mapValues(lambda x: 1)

In [16]:
subreddit_score_pairs_cnt = subreddit_score_pairs.map(lambda x:((x[0][1],x[0][3]),x[0][2] ))\
.reduceByKey(lambda x,y: x+' '+y )

In [18]:
subreddit_score_pairs_cnt_split = subreddit_score_pairs_cnt.map(  lambda d:  (d[0],  [ int(x) for x in  d[1].split(" ") ]  )  )

In [20]:
subreddit_score_pairs_cnt = subreddit_score_pairs_cnt_split.map(lambda d:(d[0], averagelist(d[1])))

In [26]:
# save the result to the file
deptColumns = ["temp","AverageScore"]
df1 = subreddit_score_pairs_cnt.toDF(deptColumns)

sid = udf(lambda x: x[0] )
df1 = df1.withColumn('SubredditID', sid(df1['temp']))

date = udf(lambda x: x[1] )
df1 = df1.withColumn('date', date(df1['temp']))

df1 = df1.drop('temp')

df1.coalesce(1).write.csv('score_avg')

#### Stats

In [12]:
#score_avg = pd.read_csv('score_avg.csv',header=None)
score_avg.columns =['count','subreddit','date']
score_avg.groupby(['subreddit'])['count'].describe()

Unnamed: 0_level_0,count,mean,std,min,25%,50%,75%,max
subreddit,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1,Unnamed: 8_level_1
RobinHood,787.0,0.250318,2.173659,0.0,0.0,0.0,0.0,48.0
SecurityAnalysis,784.0,1.125,4.398675,0.0,0.0,0.0,0.0,48.0
investing,787.0,0.270648,2.677622,0.0,0.0,0.0,0.0,44.0
stocks,787.0,0.175349,1.176423,0.0,0.0,0.0,0.0,13.0
wallstreetbets,787.0,0.668361,6.20683,0.0,0.0,0.0,0.0,121.0


## 6. Average comments per post for each subreddit by day

In [23]:
subreddit_cmt_pairs = reddit_rdd.map(lambda x:((x['author_fullname'],x['subreddit_id'],str(x['num_comments']),x['date']),1))\
.filter(lambda x: x[0][0] != 'nan')\
.reduceByKey(lambda x,y: x+y).mapValues(lambda x: 1)

In [26]:
subreddit_cmt_pairs_cnt = subreddit_cmt_pairs.map(lambda x:((x[0][1],x[0][3]),x[0][2] ))\
.reduceByKey(lambda x,y: x+' '+y )

In [28]:
subreddit_cmt_pairs_cnt_split = subreddit_cmt_pairs_cnt.map(  lambda d:  (d[0],  [ int(x) for x in  d[1].split(" ") ]  )  )

In [34]:
subreddit_cmt_pairs_cnt_final = subreddit_cmt_pairs_cnt_split.map(lambda d:(d[0], averagelist(d[1])))

In [36]:
# save the result to the file
deptColumns = ["temp","AverageComment"]
df1 = subreddit_cmt_pairs_cnt_final.toDF(deptColumns)

sid = udf(lambda x: x[0] )
df1 = df1.withColumn('SubredditID', sid(df1['temp']))

date = udf(lambda x: x[1] )
df1 = df1.withColumn('date', date(df1['temp']))

df1 = df1.drop('temp')

df1.coalesce(1).write.csv('comment_avg')

#### Stats

In [44]:
#comment_avg = pd.read_csv('comment_avg.csv',header=None)
comment_avg.columns =['count','subreddit','date']
comment_avg.groupby(['subreddit'])['count'].describe()

Unnamed: 0_level_0,count,mean,std,min,25%,50%,75%,max
subreddit,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1,Unnamed: 8_level_1
RobinHood,787.0,0.141042,0.9751,0.0,0.0,0.0,0.0,21.0
SecurityAnalysis,784.0,1.391582,3.897601,0.0,0.0,0.0,1.0,44.0
investing,787.0,0.182973,1.043367,0.0,0.0,0.0,0.0,17.0
stocks,787.0,0.259212,2.130931,0.0,0.0,0.0,0.0,54.0
wallstreetbets,787.0,1.459975,10.918125,0.0,0.0,0.0,0.0,173.0


## 7. Connectivity among subreddits by day

In [10]:
comment = pd.read_csv('comments_full.csv')

In [11]:
comment['date'] = comment['created_utc'].apply(lambda x: datetime.strptime(x, '%Y-%m-%d %H:%M:%S').strftime('%x'))
comment['date'] = pd.to_datetime(comment['date'], format='%m/%d/%y')

reddit_cross = pd.DataFrame(comment.groupby(['date','author'])['subreddit'].count()).reset_index()
reddit_cross['subreddit'] = reddit_cross['subreddit']-1

df = pd.DataFrame(reddit_cross.groupby(['date'])['subreddit'].mean().reset_index())
df.rename(columns={'subreddit': 'connectivity'}, inplace=True)
#df.to_csv('connectivity.csv',index=False)