In [1]:
import os
import atexit
import sys

import pyspark
from pyspark.context import SparkContext
from pyspark.sql import SQLContext
import findspark
from sparkhpc import sparkjob

#Exit handler to clean up the Spark cluster if the script exits or crashes
def exitHandler(sj,sc):
    try:
        print('Trapped Exit cleaning up Spark Context')
        sc.stop()
    except:
        pass
    try:
        print('Trapped Exit cleaning up Spark Job')
        sj.stop()
    except:
        pass

findspark.init()

#Parameters for the Spark cluster
nodes=3
tasks_per_node=8 
memory_per_task=1024 #1 gig per process, adjust accordingly
# Please estimate walltime carefully to keep unused Spark clusters from sitting 
# idle so that others may use the resources.
walltime="1:00" #1 hour
os.environ['SBATCH_PARTITION']='breezy' #Set the appropriate ARC partition

sj = sparkjob.sparkjob(
     ncores=nodes*tasks_per_node,
     cores_per_executor=tasks_per_node,
     memory_per_core=memory_per_task,
     walltime=walltime
    )

sj.wait_to_start()
sc = sj.start_spark()

#Register the exit handler                                                                                                     
atexit.register(exitHandler,sj,sc)

#You need this line if you want to use SparkSQL
sqlCtx=SQLContext(sc)

INFO:sparkhpc.sparkjob:Submitted batch job 633575

INFO:sparkhpc.sparkjob:Submitted cluster 0


In [2]:
data = sc.textFile('yelp.json')
new_data = data.filter(lambda x: x.isascii())

In [4]:
first = new_data.first()
df = new_data.filter(lambda x: x!=first)

In [6]:
df = df.filter(lambda x: x.startswith('{"business_id"'))

In [10]:
# write your string to a file
with open("yelp_cleaned_new.txt", "w") as f:
    for line in df.collect():
        f.write((line+'\n'))

In [3]:
import json
# map jsons back to string
jsonRDD = new_data.map(json.dumps)

# reduce to one big string with one json on each line
json_string = jsonRDD.map(lambda x: x + "\n")

In [None]:
import json
# map jsons back to string
jsonRDD = new_data.map(json.dumps)

# reduce to one big string with one json on each line
json_string = jsonRDD.reduce(lambda x, y: x + "\n" + y)

# # write your string to a file
# with open("yelp_cleaned.json", "w") as f:
#     f.write(json_string.encode("utf-8"))

In [5]:
json_string.coalesce(1).saveAsTextFile('YELP-CLEAN')

In [9]:
df.take(3)

['{"business_id":"QXAEGFB4oINsVuTFxEYKFQ","name":"Emerald Chinese Restaurant","address":"30 Eglinton Avenue W","city":"Mississauga","state":"ON","postal_code":"L5R 3E7","latitude":43.6054989743,"longitude":-79.652288909,"stars":2.5,"review_count":128,"is_open":1,"attributes":{"RestaurantsReservations":"True","GoodForMeal":"{\'dessert\': False, \'latenight\': False, \'lunch\': True, \'dinner\': True, \'brunch\': False, \'breakfast\': False}","BusinessParking":"{\'garage\': False, \'street\': False, \'validated\': False, \'lot\': True, \'valet\': False}","Caters":"True","NoiseLevel":"u\'loud\'","RestaurantsTableService":"True","RestaurantsTakeOut":"True","RestaurantsPriceRange2":"2","OutdoorSeating":"False","BikeParking":"False","Ambience":"{\'romantic\': False, \'intimate\': False, \'classy\': False, \'hipster\': False, \'divey\': False, \'touristy\': False, \'trendy\': False, \'upscale\': False, \'casual\': True}","HasTV":"False","WiFi":"u\'no\'","GoodForKids":"True","Alcohol":"u\'full