# Code to read raw reddit json files and convert them to parquet
- working with parquet files is much faster than json ones

## First imports
- we neet to get an `sqlContext` from spark to do our work in
- The `sc` (spark context) variable is already set for us, we can just use it 

In [None]:
from pyspark.sql import SQLContext
from pyspark.sql.functions import *

sqlC = SQLContext(sc)

## Read in json data
- all the comments data is in /var/reddit
    - the comments data is in files by month and begins with `RC`
    - the submissions data (things comments reply to, "first posts" in each thread) is in files by month and begins with `RS`
- For all comments, this takes about half an hour on UM's flux hadoop cluster.
    - you can make it faster if you use just the files for a specific year or month.

In [None]:
# all of reddit:
#reddit = sqlC.read.json("/var/reddit/RC_*")

#one month
reddit_raw = sqlC.read.json("/var/reddit/RC_2010-02")
print('Done!')

In [None]:
reddit_raw.columns

In [None]:
reddit_raw.printSchema()

In [None]:
reddit_raw.count()

## Save data to a faster format
- spark works much faster on .parquet formatted files. What takes minutes or hours in json is seconds in parquet.

In [None]:
reddit_raw.write.parquet('2010_02_comments.parquet', mode='overwrite')
print('done')

In [None]:
reddit = sqlC.read.parquet('2010_02_comments.parquet')

In [None]:
reddit.count()

### Speed difference
- Difference grows exponentially, really appears in larger data or more complicated operations

In [None]:
%timeit reddit_raw.count()

In [None]:
%timeit reddit.count()

## Basic operatons

In [None]:
#dataframe style
reddit[['id', 'author', 'subreddit', 'body']].show()

In [None]:
#sql style
reddit.select('id', 'author', 'subreddit', 'body').show()

In [None]:
#dataframe style
reddit[reddit.subreddit == 'politics'].count()

In [None]:
#sql style
reddit.filter(reddit.subreddit == 'politics').count()

In [None]:
reddit[reddit.body.contains('kitten') | reddit.body.contains('Kitten')].count()

### built-in pyspark functions
e.g. lower()

In [None]:
reddit[lower(reddit.body).contains('kitten')].count()

## Changing data
- We can't modify or assign things to pyspark dataframes like normal
- There are special functions for that

In [None]:
reddit = reddit.withColumnRenamed('id', 'comment_id')
reddit.columns

In [None]:
reddit = reddit.withColumn('date', from_unixtime('created_utc'))
reddit[['comment_id', 'subreddit', 'created_utc', 'date']].show(5)

In [None]:
reddit = reddit.withColumn('body', regexp_replace('body', '\t', ' '))

## More fun examples

In [None]:
reddit.groupBy('author').count().sort(desc('count')).show()

In [None]:
reddit = reddit[reddit.author != '[deleted]']

In [None]:
reddit.groupBy('subreddit').count().sort(desc('count')).show()

In [None]:
reddit.groupBy('subreddit', 'author').count().sort(desc('count')).show()

In [None]:
tmp = reddit.groupBy('subreddit', 'author').count()
tmp = tmp.groupby('subreddit').mean()
tmp.sort(desc('avg(count)')).show()

In [None]:
tmp.withColumnRenamed('avg(count)', 'comments_per_user').show()

In [None]:
tmp.select('subreddit', col('avg(count)').alias('comments_per_user2')).show()

## Joins

### for networks
- making a network of authors who participate in the same subreddits by using a self-join

In [None]:
tmp = reddit[['subreddit', 'author']]
author_network = tmp.join(tmp[['subreddit', col('author').alias('author2')]], on='subreddit')
author_network = author_network.drop_duplicates()
author_network.show()

### for sets
- Sometimes you want everyone who *isn't* in one group, and we have ant-join for that

In [None]:
funny_users = reddit[reddit.subreddit == 'funny'][['author']].distinct()
funny_users.count()

In [None]:
science_users = reddit[reddit.subreddit == 'science'][['author']].distinct()
science_users.count()

In [None]:
#users in both subreddits
science_users.join(funny_users, on='author', how='inner').count()

In [None]:
#users in only science
science_users.join(funny_users, on='author', how='left_anti').count()

In [None]:
funny_scientists = reddit[reddit['subreddit'] == 'science']
funny_scientists = funny_scientists.join(funny_users, on='author', how='inner')
funny_scientists[['body']].show(10, truncate=False)

In [None]:
boring_scientists = reddit[reddit['subreddit'] == 'science']
boring_scientists = boring_scientists.join(funny_users, on='author', how='left_anti')
boring_scientists[['body']].show(10, truncate=False)

In [None]:
funny_scientists.count()

In [None]:
fs_df = funny_scientists.toPandas()
fs_df.head()

In [None]:
fs_df.to_csv('funny_scientists.tsv', sep='\t', index=False, encoding='UTF-8')