In [1]:
from pyspark import SparkContext, SparkConf
from pyspark.sql import SQLContext

# Request 250 cores on L41 cluster
conf = SparkConf().set("spark.cores.max", 250)
sc = SparkContext(conf=conf)

#sc = SparkContext()
sqlCtx = SQLContext(sc)

In [2]:
GZIP_CODEC = "org.apache.hadoop.io.compress.GzipCodec"

In [3]:
original_data_path = "hdfs://namenode/datasets/github/json/*"
#original_data_path = "hdfs://namenode/datasets/github/json/contents000000000000.json.gz"

In [4]:
j = sqlCtx.read.json(original_data_path)
j_rdd = j.toJSON()

In [5]:
# temporarily downselect to 100 elems for testing
#j_rdd = sc.parallelize(j_rdd.take(100))

In [6]:
def convert_json(line):
    import json
    j = json.loads(line)
    return j

In [7]:
def convert_python3(j):
    try:
        from lib2to3.refactor import RefactoringTool, get_fixers_from_package
        fixers = get_fixers_from_package('lib2to3.fixes')
        refactoring_tool = RefactoringTool(fixer_names=fixers)
        node3 = refactoring_tool.refactor_string(j["content"], 'script')
        py3_str = str(node3)
        j["content"] = py3_str
        return (True, j)
    except:
        return (False, j)

In [8]:
py3_rdd = j_rdd.map(convert_json).map(convert_python3)

In [9]:
def is_success(x):
    return x[0] # Key is True if success

def is_error(x):
    return not is_success(x)

def dump_json(x):
    import json
    return json.dumps(x)

In [10]:
py3_rdd_success, py3_rdd_error = (py3_rdd.filter(f) for f in (is_success, is_error))
py3_rdd_success = py3_rdd_success.map(lambda x: x[1])
py3_rdd_error = py3_rdd_error.map(lambda x: x[1])

In [None]:
py3_rdd_success.map(dump_json).saveAsTextFile("hdfs://namenode/datasets/github/01_py3/success", compressionCodecClass=GZIP_CODEC)

In [None]:
py3_rdd_error.map(dump_json).saveAsTextFile("hdfs://namenode/datasets/github/01_py3/errors", compressionCodecClass=GZIP_CODEC)

In [None]:
first = py3_rdd_success.first()