In [1]:
import sys
import os
import re
import json
import gzip
from io import BytesIO
from random import sample

sys.path.append('/opt/cloudera/parcels/SPARK2/lib/spark2/python')
sys.path.append('/opt/cloudera/parcels/SPARK2/lib/spark2/python/lib/py4j-0.10.4-src.zip')

from itertools import chain, zip_longest

from functools import partial

from subprocess import check_output, Popen, PIPE, check_call
from pyspark import SparkConf, SparkContext
from pyspark.accumulators import AccumulatorParam
from pyspark.sql import SQLContext, Row
from pyspark.sql.types import *
from pyspark.storagelevel import StorageLevel
from pyspark.sql.functions import input_file_name
from pyspark.sql import SQLContext

from spark_utils.spark_env import (read_spark_env_json, read_spark_conf_json, 
                       set_spark_env, create_spark_conf)
from spark_utils.clean_dataworld import process_file, process_record

spark_env = read_spark_env_json()
spark_conf = read_spark_conf_json()
conf = create_spark_conf()
conf.set('spark.executor.memory', str(7 * 1024 - 881) + 'm')
conf.set('spark.dynamicAllocation.minExecutors', '18')
conf.set('spark.dynamicAllocation.initialExecutors', '18')
conf.set('spark.executor.instances', '18')
conf.set('spark.yarn.executor.memoryOverhead', str(int((7 * 1024 - 881) * 0.2)) + 'm')
conf.set('spark.executor.cores', '2')

cmd = ['zip', '-r', 'spark_utils.zip', 'spark_utils', '-i']
cmd.extend(['spark_utils/' + f for f in os.listdir('spark_utils') if f.endswith('.py')])
check_call(cmd)

set_spark_env(spark_env)

sc = SparkContext(conf=conf, pyFiles=['spark_utils.zip'])
sql = SQLContext(sc)

with open('spark-conf.json', 'w') as f:
    f.write(json.dumps(dict(conf.getAll()), indent=4))
# keys = list(dict(conf.getAll()).keys())

In [2]:
conf.get('spark.yarn.executor.memoryOverhead')

'1257m'

In [None]:
ontology = '\n'.join(sc.textFile('/user/hdfs/dataworld-linked-acs/Ontology/acs_schema.ttl').collect())
ontology = re.split('\n\.\n', ontology)
# fnames = check_output(['hdfs', 'dfs', '-ls', '/user/hdfs/dataworld/EnhancedData/ss*']).decode().split('\n')

In [9]:
fnames = check_output(['hdfs', 'dfs', '-ls', '/user/hdfs/dataworld-linked-acs']).decode().split('\n')
reg = re.compile('.*ss[0-9]{2}p.*\.ttl\.gz$')
person_files = [f2.split()[-1] for f2 in [f.strip() for f in fnames] if reg.search(f2)]
hive_fnames = check_output(['hdfs', 'dfs', '-ls', '/user/hive/warehouse/dataworld/person/**/**']).decode().split('\n')
hive_reg = re.compile('.*/state=(?P<state>[a-z]{2})/year=(?P<year>[0-9]{2})/.*parquet$')
state_reg = re.compile('.*ss(?P<year>[0-9]{2})(h|p)(?P<state>[a-z]{2})(\.[0-9])?\.ttl\.gz$')
state_year_hive = [(h['state'], h['year']) for h in [hive_reg.search(f) for f in hive_fnames] if h]


In [11]:
process_closure = partial(process_file, kind='person')
state_reg = re.compile('.*ss(?P<year>[0-9]{2})(h|p)(?P<state>[a-z]{2})(\.[0-9])?\.ttl\.gz$')
# f = [p for p in person_files if p.endswith('15pfl.1.ttl.gz')]
i = 0
n_executors = int(sc.getConf().get('spark.dynamicAllocation.initialExecutors'))

def grouper(iterable, n, fillvalue=None):
    "Collect data into fixed-length chunks or blocks"
    # grouper('ABCDEFG', 3, 'x') --> ABC DEF Gxx"
    args = [iter(iterable)] * n
    return zip_longest(*args, fillvalue=fillvalue)



In [14]:
def get_keys(x):
    '''Take a sample from each file and get the questions of the survey.
    '''
    return list(set(chain(*[list(d.keys()) for d in x])))

for group in grouper(person_files, n_executors):
    state_years = [(r['state'], r['year']) not in state_year_hive for r in [state_reg.search(f) for f in group if f] if r]
    group_filtered = [g for g, f in zip(group, state_years) if f and g]
    if not group_filtered:
        print('skipping group starting with file {}'.format(group[0]))
        continue
    files = sc.parallelize(group_filtered, n_executors)
    data = files.flatMap(process_closure)
    data.persist(StorageLevel.MEMORY_AND_DISK)
    header = data.sample(False, 0.01).mapPartitions(get_keys)\
        .filter(lambda x: x is not None).distinct().collect()
    header = sc.broadcast(header)
    data = data.map(lambda x: Row(**{h: x.get(h, None) for h in header.value}))
    # #     Infer the schema, and register the DataFrame as a table.
    person = sql.createDataFrame(data, samplingRatio=0.4)
    person.createOrReplaceTempView("person")
    person.write.parquet('/user/hive/warehouse/dataworld/person/', 
                             mode='append', compression='snappy', partitionBy=['state', 'year'])
    data.unpersist()
# afew = data.take(20)

skipping group starting with file /user/hdfs/dataworld-linked-acs/ss14pak.ttl.gz
skipping group starting with file /user/hdfs/dataworld-linked-acs/ss14pil.1.ttl.gz
skipping group starting with file /user/hdfs/dataworld-linked-acs/ss14pnj.ttl.gz
skipping group starting with file /user/hdfs/dataworld-linked-acs/ss14ptx.3.ttl.gz
skipping group starting with file /user/hdfs/dataworld-linked-acs/ss15pdc.ttl.gz
skipping group starting with file /user/hdfs/dataworld-linked-acs/ss15pmn.ttl.gz


In [13]:
group_filtered

[]

In [None]:
def get_keys(x):
    '''Take a sample from each file and get the questions of the survey.
    '''
    return list(set(chain(*[list(d.keys()) for d in x])))

header = sc.broadcast(header)
data = data.map(lambda x: Row(**{h: x.get(h, None) for h in header.value}))
# #     Infer the schema, and register the DataFrame as a table.
person = sql.createDataFrame(data, samplingRatio=0.4)
person.createOrReplaceTempView("person")
person.write.parquet('/user/hive/warehouse/dataworld/person/', 
                         mode='append', compression='snappy', partitionBy=['state', 'year'])


In [None]:
print('\n'.join(["ALTER TABLE person ADD IF NOT EXISTS PARTITION (state='{}', year='{}');".format(state, year) for state, year in state_year_hive]))

In [None]:
df = sql.read.parquet('/user/hive/warehouse/dataworld/person')
df.count()

In [None]:
df.schema

In [None]:
from spark_utils.clean_dataworld import process_file, process_record
from inspect import getsource

print(getsource(process_file))