# <font color='magenta'>Load data

In [3]:
reddit = spark.read.parquet("/var/reddit-parquet") # these records are posts/Submissions (in PRAW) can tell by existence of 'num_comments', 'name', 'locked' & other attributes that are common to posts/Submissions 

In [4]:
type(reddit)

pyspark.sql.dataframe.DataFrame

In [5]:
len(reddit.columns)

70

In [6]:
reddit.printSchema()

root
 |-- _corrupt_record: string (nullable = true)
 |-- adserver_click_url: string (nullable = true)
 |-- adserver_imp_pixel: string (nullable = true)
 |-- approved_by: string (nullable = true)
 |-- archived: boolean (nullable = true)
 |-- author: string (nullable = true)
 |-- author_flair_css_class: string (nullable = true)
 |-- author_flair_text: string (nullable = true)
 |-- banned_by: string (nullable = true)
 |-- body: string (nullable = true)
 |-- body_html: string (nullable = true)
 |-- clicked: boolean (nullable = true)
 |-- controversiality: long (nullable = true)
 |-- created: long (nullable = true)
 |-- created_utc: string (nullable = true)
 |-- disable_comments: boolean (nullable = true)
 |-- distinguished: string (nullable = true)
 |-- domain: string (nullable = true)
 |-- downs: long (nullable = true)
 |-- edited: string (nullable = true)
 |-- from: string (nullable = true)
 |-- from_id: string (nullable = true)
 |-- from_kind: string (nullable = true)
 |-- gilded: long 

In [7]:
# %%time
# record_count = reddit.count()
record_count = 2859977347 #1% of this is +28 million

> ### The DataFrame we created has a fairly large number of columns (70), is deeply nested in several instances (up to 8 layers deep), and contains a significant number of records (+2.8 billion).

In [8]:
# select a subset of columns for EDA

these_cols = [
    "id",
    "parent_id",
    "subreddit",
    "author",
    "created_utc",
    "body",
    "num_comments",
    "score"
] # "created" is not as populated as created_utc, better to use created_utc

> ### For our initial EDA we primarily care about the columns indicated above so we'll subset the data appropriately.

In [9]:
cols_df = reddit.select(these_cols)
cols_df.dtypes

[('id', 'string'),
 ('parent_id', 'string'),
 ('subreddit', 'string'),
 ('author', 'string'),
 ('created_utc', 'string'),
 ('body', 'string'),
 ('num_comments', 'bigint'),
 ('score', 'bigint')]

> ### It's worthwhile to check the data types of the columns. You may notice that "created_utc" is a string. It actually contains the created date in Unix time/Epoch time (the number of seconds since January 1, 1970). We'll want to convert that to a more human readable format.

# <font color='magenta'>Created date

In [10]:
from pyspark.sql.functions import from_unixtime

In [11]:
# add new col showing just created year
cols_df = cols_df.withColumn(
    "created_utc_year", 
    from_unixtime(
        cols_df["created_utc"], 
        "yyyy" # full timestamp: yyyy-MM-dd HH:mm:ss.SS
    )
)

In [12]:
%%time
years_df = cols_df.groupby("created_utc_year").count()
years_df.orderBy("created_utc_year").show()

+----------------+---------+
|created_utc_year|    count|
+----------------+---------+
|            null|        1|
|            2005|     1086|
|            2006|   419341|
|            2007|  2745064|
|            2008|  9773673|
|            2009| 23726352|
|            2010| 55571522|
|            2011|138398080|
|            2012|289689090|
|            2013|441953216|
|            2014|584765776|
|            2015|738997386|
|            2016|573936760|
+----------------+---------+

CPU times: user 27 ms, sys: 69 µs, total: 27.1 ms
Wall time: 52.9 s


> ### The data covers 11 years from 2005 to 2016. Reddit was founded in 2005 and we can see a hint of the platform's growth just in terms of the number of posts in the dataset.

In [13]:
null_count = cols_df[cols_df["body"].isNull()].count()

In [14]:
null_count

279383793

In [15]:
round((float(null_count)/record_count)*100,2)

9.77

> ### While there are a large number of records with null values in "body" (+279 million), this is only ~10% of the overall data.

In [16]:
clean_df = cols_df[
    (cols_df["created_utc_year"].isNotNull())&
    (cols_df["body"].isNotNull())
]

In [20]:
%%time
clean_df.count() # 2,580,593,554

CPU times: user 19.5 ms, sys: 103 µs, total: 19.6 ms
Wall time: 56.6 s


2580593554

> ### We are still left with +2.5 billion records!
> ### However, using all the records would be excessive and time-intensive.
> ### A stratified, one percent sample of the total should be sufficient. 

In [23]:
these_years = clean_df.select("created_utc_year").distinct() # find all unique years
yearly_portion = these_years.count()/100.0 # calculate each year's portion

In [24]:
fraction_dict = {str(i["created_utc_year"]):yearly_portion for i in these_years.collect()}

In [31]:
# simple dict to facilitate stratification by year
fraction_dict

{'2005': 0.12,
 '2006': 0.12,
 '2007': 0.12,
 '2008': 0.12,
 '2009': 0.12,
 '2010': 0.12,
 '2011': 0.12,
 '2012': 0.12,
 '2013': 0.12,
 '2014': 0.12,
 '2015': 0.12,
 '2016': 0.12}

In [25]:
# stratified sample
one_perct_df = clean_df.sampleBy(
    col = "created_utc_year",
    fractions = fraction_dict
)

# one_perct_df.write.parquet("one_perct_sample.parquet")

In [35]:
# %%time
# # ~12 mins

# one_perct_df.count() # 309,358,005; sample is ~an order of magnitude larger than it should be. The result of accumulating rounding errors or something else? # sample is ~an order of magnitude larger than it should be. The result of accumulating rounding errors or something else? 2580593554*0.01 = 25,805,935

# END