In [1]:
import numpy as np
import os
import sys
import time
import datetime

In [2]:
import pandas as pd

In [3]:
os.environ['JAVA_HOME'] = '/usr/lib/jvm/java-1.8.0-openjdk-1.8.0.312.b07-1.el7_9.x86_64'

In [4]:
import pyspark
from pyspark.sql import *
from pyspark import SparkContext, SparkConf
from pyspark.sql import functions as F
from pyspark.sql.types import *

In [5]:
print("Creating Spark session.")
configuation_properties = [
    ("spark.master","local[95]"),
    ("spark.ui.port","4050"),
    ("spark.executor.memory","750g"),
    ('spark.driver.memory',  '2000g'),
    ("spark.driver.maxResultSize", '0'), # unlimited
    ("spark.network.timeout",            "10000001"),
    ("spark.executor.heartbeatInterval", "10000000")
    #("spark.dynamicAllocation.enabled","true"),
    #("spark.shuffle.service.enabled","true"),
]

conf = SparkConf().setAll( configuation_properties )

# create the context
sc = pyspark.SparkContext(conf=conf)
spark = SparkSession.builder.getOrCreate()

# for logging temporarily
# sc.setLogLevel('DEBUG')
print("Spark session created.")

Creating Spark session.
Spark session created.


# README

goal: simply get the most recent 6 months worth of posts and comments for /r/photocritique and save them separately for ease

In [6]:
SUBREDDIT = 'photocritique'
# SUBREDDIT = 'AskReddit'

OUTPUT_DIR =  '/projects/bdata/bdatasets/photocritique/'

In [7]:
# posts_path = '/projects/bdata/moderation/pushshift/RS_*'
# comments_path = '/projects/bdata/moderation/pushshift/RC_*'

posts_path = '/projects/bdata/moderation/pushshift/RS_2021-*'
comments_path = '/projects/bdata/moderation/pushshift/RC_2021-*'

# posts_path = '/projects/bdata/moderation/pushshift/RS_2021-06'
# comments_path = '/projects/bdata/moderation/pushshift/RC_2021-06'

# posts_path = '/projects/bdata/moderation/pushshift/1000_posts_2019-06'
# comments_path = '/projects/bdata/moderation/pushshift/1000_comments_2019-06'

In [8]:
post_schema    = 'id STRING, subreddit STRING, author STRING, created_utc INTEGER, score INTEGER, title STRING, url STRING'
comment_schema = 'id STRING, subreddit STRING, link_id STRING, author STRING, created_utc INTEGER, score INTEGER, body STRING'

In [9]:
print('Starting loading posts from {}.'.format( posts_path ))
start_time = time.monotonic()
posts = spark.read.schema(post_schema).option("mode", "DROPMALFORMED").json( posts_path )
print("Finished loading posts in {:5.3f} minutes.".format( (time.monotonic()-start_time)/60 ))

Starting loading posts from /projects/bdata/moderation/pushshift/RS_2021-06.
Finished loading posts in 0.028 minutes.


In [10]:
print('Starting loading comments from {}.'.format( comments_path ))
start_time = time.monotonic()
comments = spark.read.schema(comment_schema).option("mode", "DROPMALFORMED").json( comments_path )
print("Finished loading comments in {:5.3f} minutes.".format( (time.monotonic()-start_time)/60 ))

Starting loading comments from /projects/bdata/moderation/pushshift/RC_2021-06.
Finished loading comments in 0.001 minutes.


In [11]:
posts    = posts.where(      posts.subreddit == SUBREDDIT)
comments = comments.where(comments.subreddit == SUBREDDIT)

In [12]:
print('Starting writing to {}.'.format( OUTPUT_DIR ))
start_time = time.monotonic()
posts.write.json(   OUTPUT_DIR+'posts_jan-jun_2021',    mode='overwrite')
comments.write.json(OUTPUT_DIR+'comments_jan-jun_2021', mode='overwrite')
print("Finished writing in {:5.3f} minutes.".format( (time.monotonic()-start_time)/60 ))

Starting writing to /projects/bdata/bdatasets/photocritique/.
Finished writing in 111.304 minutes.
