In [2]:
from collections import defaultdict
from itertools import *
import json
from operator import *
import pickle
import re

import matplotlib.pylab as pl
import numpy as np
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType
from scipy.sparse import csr_matrix

In [3]:
input_dir = '/data/yu_gai/cfq'
output_dir = '/work/yu_gai/cfq/data/cfq'

df = sqlCtx.read.parquet(f'{input_dir}/dataset.parquet').sort('index').persist()
df.columns

['complexityMeasures',
 'expectedResponse',
 'expectedResponseWithMids',
 'index',
 'question',
 'questionPatternModEntities',
 'questionTemplate',
 'questionWithBrackets',
 'questionWithMids',
 'ruleIds',
 'ruleTree',
 'sparql',
 'sparqlPattern',
 'sparqlPatternModEntities']

In [4]:
splits = {}
split_ids = !ls {input_dir}/splits | grep json
for split_id in [s.replace('.json', '') for s in split_ids]:
    split = splits[split_id] = json.load(open(f'{input_dir}/splits/{split_id}.json'))
    np.savez(f'{output_dir}/splits/{split_id}', **{k : np.array(v) for k, v in split.items()})
    print(split_id, len(split['trainIdxs']), len(split['devIdxs']), len(split['testIdxs']), df.count())

mcd1 95743 11968 11968 239357
mcd2 95743 11968 11968 239357
mcd3 95743 11968 11968 239357
query_complexity_split 100654 9512 9512 239357
query_pattern_split 94600 12489 12589 239357
question_complexity_split 98999 10339 10340 239357
question_pattern_split 95654 12115 11909 239357
random_split 95744 11967 11967 239357


In [5]:
w = r"(:?[a-zA-Z]+|M[0-9]|'s|,)"
p = re.compile(fr'(:?{w} )+{w}')
df.rdd.map(lambda r: re.match(p, r['questionPatternModEntities']).string).zip(df.rdd.map(lambda r: r['questionPatternModEntities'])).map(lambda r: r[0] == r[1]).reduce(and_)

True

In [6]:
def replace(q):
    for s in [
        'art director',
        'country of nationality',
        'costume designer',
        'executive producer',
        'executive produce',
        'executive produced',
        'film director',
        'film distributor',
        'film editor',
        'film producer',
        'production company',
    ]:
        q = q.replace(s, s.replace(' ', ''))
    return q

df = df.withColumn('questionPatternModEntities', udf(replace, StringType())('questionPatternModEntities')).persist()
df.rdd.map(lambda r: len(r['questionPatternModEntities'].split(' ')) == len(r['questionTemplate'].split(' '))).reduce(and_)

True

In [7]:
at = lambda i: (lambda x: x[i])
k1 = lambda r: [r, 1]
unique = lambda rdd: sorted(rdd.distinct().collect())
count = lambda rdd: dict(rdd.map(k1).reduceByKey(add).collect())

In [8]:
def find_rel(line):
    if 'FILTER' in line:
        [[src, dst, *_]] = re.findall(r'^FILTER \( ([^ ]+) != ([^ ]+) \)( .)?$', line)
        return src, '!=', dst
    else:
        [[src, typ, dst, *_]] = re.findall(r'^([^ ]+) ([^ ]+) ([^ ]+)( .)?$', line)
        return src, typ, dst

In [9]:
rels = df.rdd.flatMap(lambda r: r['sparqlPatternModEntities'].split('\n')[1 : -1]).map(find_rel).cache()
srcs, typs, dsts = map(unique, [rels.map(at(0)), rels.map(at(1)), rels.map(at(2))])
srcs, typs, dsts

(['?x0',
  '?x1',
  '?x2',
  '?x3',
  '?x4',
  '?x5',
  'M0',
  'M1',
  'M2',
  'M3',
  'M4',
  'M5',
  'M6',
  'M7',
  'M8',
  'M9'],
 ['!=',
  '^ns:people.person.gender',
  '^ns:people.person.nationality',
  'a',
  'ns:business.employer.employees/ns:business.employment_tenure.person',
  'ns:film.actor.film/ns:film.performance.character',
  'ns:film.actor.film/ns:film.performance.film',
  'ns:film.cinematographer.film',
  'ns:film.director.film',
  'ns:film.editor.film',
  'ns:film.film.cinematography',
  'ns:film.film.costume_design_by',
  'ns:film.film.directed_by',
  'ns:film.film.distributors/ns:film.film_film_distributor_relationship.distributor',
  'ns:film.film.edited_by',
  'ns:film.film.executive_produced_by',
  'ns:film.film.film_art_direction_by',
  'ns:film.film.prequel',
  'ns:film.film.produced_by|ns:film.film.production_companies',
  'ns:film.film.sequel',
  'ns:film.film.starring/ns:film.performance.actor',
  'ns:film.film.written_by',
  'ns:film.film_art_director.film

In [10]:
idx2typ = sorted(typ for typ in typs if typ not in ['a', '!=', 'ns:people.person.gender', 'ns:people.person.nationality'])
typ2idx = {typ : idx for idx, typ in enumerate(idx2typ)}
idx2typ

['^ns:people.person.gender',
 '^ns:people.person.nationality',
 'ns:business.employer.employees/ns:business.employment_tenure.person',
 'ns:film.actor.film/ns:film.performance.character',
 'ns:film.actor.film/ns:film.performance.film',
 'ns:film.cinematographer.film',
 'ns:film.director.film',
 'ns:film.editor.film',
 'ns:film.film.cinematography',
 'ns:film.film.costume_design_by',
 'ns:film.film.directed_by',
 'ns:film.film.distributors/ns:film.film_film_distributor_relationship.distributor',
 'ns:film.film.edited_by',
 'ns:film.film.executive_produced_by',
 'ns:film.film.film_art_direction_by',
 'ns:film.film.prequel',
 'ns:film.film.produced_by|ns:film.film.production_companies',
 'ns:film.film.sequel',
 'ns:film.film.starring/ns:film.performance.actor',
 'ns:film.film.written_by',
 'ns:film.film_art_director.films_art_directed',
 'ns:film.film_costumer_designer.costume_design_for_film',
 'ns:film.film_distributor.films_distributed/ns:film.film_film_distributor_relationship.film',


In [11]:
# categories
a = rels.filter(lambda r: r[1] == 'a').cache()
cats = unique(a.map(at(2)))
unique(a.map(at(0))), cats

(['?x0',
  '?x1',
  '?x2',
  '?x3',
  '?x4',
  '?x5',
  'M0',
  'M1',
  'M2',
  'M3',
  'M4',
  'M5',
  'M6'],
 ['ns:business.employer',
  'ns:fictional_universe.fictional_character',
  'ns:film.actor',
  'ns:film.cinematographer',
  'ns:film.director',
  'ns:film.editor',
  'ns:film.film',
  'ns:film.film_art_director',
  'ns:film.film_costumer_designer',
  'ns:film.film_distributor',
  'ns:film.producer',
  'ns:film.production_company',
  'ns:film.writer',
  'ns:people.person'])

In [12]:
# gender and nationality
for dst in dsts:
    if dst.startswith('ns:') and dst not in cats:
        print(dst, unique(rels.filter(lambda r: r[2] == dst).map(at(1))))

ns:m.02zsn ['ns:people.person.gender']
ns:m.0345h ['ns:people.person.nationality']
ns:m.03_3d ['ns:people.person.nationality']
ns:m.03rjj ['ns:people.person.nationality']
ns:m.059j2 ['ns:people.person.nationality']
ns:m.05zppz ['ns:people.person.gender']
ns:m.06mkj ['ns:people.person.nationality']
ns:m.07ssc ['ns:people.person.nationality']
ns:m.09c7w0 ['ns:people.person.nationality']
ns:m.0b90_r ['ns:people.person.nationality']
ns:m.0d05w3 ['ns:people.person.nationality']
ns:m.0d060g ['ns:people.person.nationality']
ns:m.0d0vqn ['ns:people.person.nationality']
ns:m.0f8l9c ['ns:people.person.nationality']


In [37]:
idx2attr = sorted(dst for dst in dsts if dst.startswith('ns:'))
attr2idx = {attr : idx for idx, attr in enumerate(idx2attr)}
pickle.dump([idx2attr, attr2idx], open(f'{output_dir}/attr-vocab.pickle', 'wb'))
idx2attr

['ns:business.employer',
 'ns:fictional_universe.fictional_character',
 'ns:film.actor',
 'ns:film.cinematographer',
 'ns:film.director',
 'ns:film.editor',
 'ns:film.film',
 'ns:film.film_art_director',
 'ns:film.film_costumer_designer',
 'ns:film.film_distributor',
 'ns:film.producer',
 'ns:film.production_company',
 'ns:film.writer',
 'ns:m.02zsn',
 'ns:m.0345h',
 'ns:m.03_3d',
 'ns:m.03rjj',
 'ns:m.059j2',
 'ns:m.05zppz',
 'ns:m.06mkj',
 'ns:m.07ssc',
 'ns:m.09c7w0',
 'ns:m.0b90_r',
 'ns:m.0d05w3',
 'ns:m.0d060g',
 'ns:m.0d0vqn',
 'ns:m.0f8l9c',
 'ns:people.person']

In [14]:
ne = rels.filter(lambda r: r[1] == '!=').cache()
unique(ne.map(at(0))), unique(ne.map(at(2)))

(['?x0',
  '?x1',
  '?x2',
  '?x3',
  '?x4',
  'M0',
  'M1',
  'M2',
  'M3',
  'M4',
  'M5',
  'M6',
  'M7'],
 ['?x0',
  '?x1',
  '?x2',
  '?x3',
  '?x4',
  '?x5',
  'M0',
  'M1',
  'M2',
  'M3',
  'M4',
  'M5',
  'M6',
  'M7',
  'M8'])

In [15]:
tok_vocab = unique(df.rdd.flatMap(lambda r: r['questionPatternModEntities'].split(' ')))
tok_vocab

["'s",
 ',',
 'American',
 'British',
 'Canadian',
 'Chinese',
 'Did',
 'Dutch',
 'French',
 'German',
 'Italian',
 'Japanese',
 'M0',
 'M1',
 'M2',
 'M3',
 'M4',
 'M5',
 'M6',
 'M7',
 'M8',
 'M9',
 'Mexican',
 'Spanish',
 'Swedish',
 'Was',
 'Were',
 'What',
 'Which',
 'Who',
 'a',
 'acquire',
 'acquired',
 'actor',
 'and',
 'artdirector',
 'by',
 'character',
 'child',
 'cinematographer',
 'company',
 'costumedesigner',
 'countryofnationality',
 'did',
 'direct',
 'directed',
 'director',
 'distribute',
 'distributed',
 'distributor',
 'edit',
 'edited',
 'editor',
 'employ',
 'employed',
 'employee',
 'employer',
 'executiveproduce',
 'executiveproduced',
 'executiveproducer',
 'female',
 'film',
 'filmdirector',
 'filmdistributor',
 'filmeditor',
 'filmproducer',
 'found',
 'founded',
 'founder',
 'gender',
 'influence',
 'influenced',
 'male',
 'married',
 'marry',
 'of',
 'parent',
 'person',
 'play',
 'played',
 'prequel',
 'produce',
 'produced',
 'producer',
 'productioncompan

In [16]:
# repeated occurence of entities
def _mapper(r):
    c = defaultdict(lambda: 0)
    for tok in r['questionPatternModEntities'].split(' '):
        if re.match('M\d', tok) is not None:
            c[tok] += 1
    return c.items()

def _fn(rdd):
    for [tok, c], n in sorted(count(rdd.flatMap(_mapper)).items()):
        if c > 1:
            print(tok, c, n)

_fn(df.rdd)

for k, v in splits.items():
    print(k)
    indices = set(chain(*splits['mcd1'].values()))
    _fn(df.rdd.filter(lambda r: r['index'] in indices))

M0 2 20
M1 2 17
M2 2 28
M3 2 13
M4 2 5
M5 2 1
mcd1
M0 2 6
M1 2 12
M2 2 11
M3 2 5
M4 2 1
mcd2
M0 2 6
M1 2 12
M2 2 11
M3 2 5
M4 2 1
mcd3
M0 2 6
M1 2 12
M2 2 11
M3 2 5
M4 2 1
query_complexity_split
M0 2 6
M1 2 12
M2 2 11
M3 2 5
M4 2 1
query_pattern_split
M0 2 6
M1 2 12
M2 2 11
M3 2 5
M4 2 1
question_complexity_split
M0 2 6
M1 2 12
M2 2 11
M3 2 5
M4 2 1
question_pattern_split
M0 2 6
M1 2 12
M2 2 11
M3 2 5
M4 2 1
random_split
M0 2 6
M1 2 12
M2 2 11
M3 2 5
M4 2 1


In [17]:
def _mapper(r):
    rels = list(map(find_rel, r['sparqlPatternModEntities'].split('\n')[1 : -1]))
    ends = set((src, dst) for src, typ, dst in rels if typ != '!=')
    return all((src, dst) in ends for src, typ, dst in rels if typ == '!=')

df.rdd.map(_mapper).reduce(and_)

True

In [18]:
sorted(rels.map(at(1)).map(k1).reduceByKey(add).collect(), key=at(1))

[('ns:film.film.costume_design_by', 56),
 ('ns:film.film.cinematography', 99),
 ('ns:film.film.film_art_direction_by', 101),
 ('^ns:people.person.gender', 1088),
 ('^ns:people.person.nationality', 2171),
 ('ns:film.film_distributor.films_distributed/ns:film.film_film_distributor_relationship.film',
  5210),
 ('ns:organization.organization.companies_acquired/ns:business.acquisition.company_acquired',
  5814),
 ('ns:film.film.prequel', 6337),
 ('ns:film.film.sequel', 6906),
 ('ns:organization.organization.acquired_by/ns:business.acquisition.acquiring_company',
  7584),
 ('ns:film.film.distributors/ns:film.film_film_distributor_relationship.distributor',
  9712),
 ('ns:film.film.starring/ns:film.performance.actor', 11668),
 ('ns:people.person.parents|ns:fictional_universe.fictional_character.parents|ns:organization.organization.parent/ns:organization.organization_relationship.parent',
  15451),
 ('ns:people.person.children|ns:fictional_universe.fictional_character.children|ns:organization

In [19]:
dsts

['?x0',
 '?x1',
 '?x2',
 '?x3',
 '?x4',
 '?x5',
 'M0',
 'M1',
 'M2',
 'M3',
 'M4',
 'M5',
 'M6',
 'M7',
 'M8',
 'M9',
 'ns:business.employer',
 'ns:fictional_universe.fictional_character',
 'ns:film.actor',
 'ns:film.cinematographer',
 'ns:film.director',
 'ns:film.editor',
 'ns:film.film',
 'ns:film.film_art_director',
 'ns:film.film_costumer_designer',
 'ns:film.film_distributor',
 'ns:film.producer',
 'ns:film.production_company',
 'ns:film.writer',
 'ns:m.02zsn',
 'ns:m.0345h',
 'ns:m.03_3d',
 'ns:m.03rjj',
 'ns:m.059j2',
 'ns:m.05zppz',
 'ns:m.06mkj',
 'ns:m.07ssc',
 'ns:m.09c7w0',
 'ns:m.0b90_r',
 'ns:m.0d05w3',
 'ns:m.0d060g',
 'ns:m.0d0vqn',
 'ns:m.0f8l9c',
 'ns:people.person']

In [36]:
idx2role = roles = df.rdd.flatMap(lambda r: re.findall(r'\[[^\]]+\]', r['questionTemplate'])).distinct().collect()
role2idx = {role : idx for idx, role in enumerate(idx2role)}
pickle.dump([idx2role, role2idx], open(f'{output_dir}/role-vocab.pickle', 'wb'))
roles

['[NP_SIMPLE]',
 '[entity]',
 '[ADJECTIVE_SIMPLE]',
 '[VP_SIMPLE]',
 '[ROLE_SIMPLE]']

In [21]:
def _mapper(r):
    d = defaultdict(set)
    for x, y in zip(r['questionPatternModEntities'].split(' '), r['questionTemplate'].split(' ')):
        if y in roles:
            d[y].add(x)
    return [[role, d[role]] for role in roles]

role2toks = dict(df.rdd.flatMap(_mapper).reduceByKey(set.union).collect())
role2toks

{'[NP_SIMPLE]': {'actor',
  'artdirector',
  'character',
  'cinematographer',
  'company',
  'costumedesigner',
  'film',
  'filmdirector',
  'filmdistributor',
  'filmeditor',
  'filmproducer',
  'person',
  'productioncompany',
  'screenwriter'},
 '[entity]': {'M0', 'M1', 'M2', 'M3', 'M4', 'M5', 'M6', 'M7', 'M8', 'M9'},
 '[ADJECTIVE_SIMPLE]': {'American',
  'British',
  'Canadian',
  'Chinese',
  'Dutch',
  'French',
  'German',
  'Italian',
  'Japanese',
  'Mexican',
  'Spanish',
  'Swedish',
  'female',
  'male'},
 '[VP_SIMPLE]': {'acquire',
  'acquired',
  'direct',
  'directed',
  'distribute',
  'distributed',
  'edit',
  'edited',
  'employ',
  'employed',
  'executiveproduce',
  'executiveproduced',
  'found',
  'founded',
  'influence',
  'influenced',
  'married',
  'marry',
  'play',
  'played',
  'produce',
  'produced',
  'star',
  'starred',
  'write',
  'written',
  'wrote'},
 '[ROLE_SIMPLE]': {'actor',
  'artdirector',
  'child',
  'cinematographer',
  'costumedesigne

In [22]:
special_roles = ['[VP_SIMPLE]', '[ROLE_SIMPLE]']

def _mapper(r):
    uniq_toks = set()
    for role in special_roles:
        for tok in role2toks[role]:
            if tok in r['questionPatternModEntities'].split(' '):
                uniq_toks.add(tok)
    _, typs, _ = list(zip(*map(find_rel, r['sparqlPatternModEntities'].split('\n')[1 : -1])))
    uniq_typs = set(typs)
    return list(product(uniq_toks, uniq_typs)), uniq_toks, uniq_typs

occs = df.rdd.map(_mapper).persist()
both, c_tok, c_rel = map(count, (occs.flatMap(at(0)), occs.flatMap(at(1)), occs.flatMap(at(2))))

In [23]:
def _fn(by):
    for key, grp in groupby(sorted(([tok, [typ, c]] if by == 'tok' else [typ, [tok, c]]
                                    for [tok, typ], c in both.items()), key=at(0)), at(0)):
        _, c = zip(*grp)
        print(key, (c_tok if by == 'tok' else c_rel)[key])
        print(*islice(sorted(c, key=at(1), reverse=True), 3), sep='\n')
        
_fn('tok')
print()
_fn('typ')

acquire 1830
['ns:organization.organization.companies_acquired/ns:business.acquisition.company_acquired', 1307]
['a', 1086]
['ns:organization.organization.acquired_by/ns:business.acquisition.acquiring_company', 540]
acquired 2625
['ns:organization.organization.acquired_by/ns:business.acquisition.acquiring_company', 2246]
['a', 1826]
['ns:organization.organization.companies_acquired/ns:business.acquisition.company_acquired', 809]
actor 20287
['a', 15062]
['ns:film.actor.film/ns:film.performance.character', 9219]
['ns:people.person.nationality', 7425]
artdirector 31822
['ns:film.film_art_director.films_art_directed', 22744]
['a', 17117]
['ns:film.editor.film', 9740]
child 14955
['ns:people.person.parents|ns:fictional_universe.fictional_character.parents|ns:organization.organization.parent/ns:organization.organization_relationship.parent', 14747]
['a', 9731]
['ns:people.person.nationality', 4293]
cinematographer 37256
['ns:film.cinematographer.film', 26135]
['a', 20010]
['ns:film.editor.f

In [24]:
SEP, NIL = '{SEP}', '{NIL}'

In [25]:
isvar = lambda tok: tok.startswith('?x')
idx2var = sorted(set(chain(filter(isvar, srcs), filter(isvar, dsts))))
idx2var

['?x0', '?x1', '?x2', '?x3', '?x4', '?x5']

In [26]:
idx2tok = [SEP, NIL] + idx2var + unique(df.rdd.flatMap(lambda r: r['questionPatternModEntities'].split(' ')))
tok2idx = {tok : idx for idx, tok in enumerate(idx2tok)}
idx2tok

['{SEP}',
 '{NIL}',
 '?x0',
 '?x1',
 '?x2',
 '?x3',
 '?x4',
 '?x5',
 "'s",
 ',',
 'American',
 'British',
 'Canadian',
 'Chinese',
 'Did',
 'Dutch',
 'French',
 'German',
 'Italian',
 'Japanese',
 'M0',
 'M1',
 'M2',
 'M3',
 'M4',
 'M5',
 'M6',
 'M7',
 'M8',
 'M9',
 'Mexican',
 'Spanish',
 'Swedish',
 'Was',
 'Were',
 'What',
 'Which',
 'Who',
 'a',
 'acquire',
 'acquired',
 'actor',
 'and',
 'artdirector',
 'by',
 'character',
 'child',
 'cinematographer',
 'company',
 'costumedesigner',
 'countryofnationality',
 'did',
 'direct',
 'directed',
 'director',
 'distribute',
 'distributed',
 'distributor',
 'edit',
 'edited',
 'editor',
 'employ',
 'employed',
 'employee',
 'employer',
 'executiveproduce',
 'executiveproduced',
 'executiveproducer',
 'female',
 'film',
 'filmdirector',
 'filmdistributor',
 'filmeditor',
 'filmproducer',
 'found',
 'founded',
 'founder',
 'gender',
 'influence',
 'influenced',
 'male',
 'married',
 'marry',
 'of',
 'parent',
 'person',
 'play',
 'played',


In [27]:
idx2tag = [SEP, NIL] + idx2var + unique(df.rdd.flatMap(lambda r: r['questionTemplate'].split(' ')))
tag2idx = {tag : idx for idx, tag in enumerate(idx2tag)}
idx2tag

['{SEP}',
 '{NIL}',
 '?x0',
 '?x1',
 '?x2',
 '?x3',
 '?x4',
 '?x5',
 "'s",
 ',',
 'Did',
 'Was',
 'Were',
 'What',
 'Which',
 'Who',
 '[ADJECTIVE_SIMPLE]',
 '[NP_SIMPLE]',
 '[ROLE_SIMPLE]',
 '[VP_SIMPLE]',
 '[entity]',
 'a',
 'and',
 'by',
 'did',
 'of',
 'that',
 'was',
 'were',
 'whose']

In [28]:
p = re.compile('|'.join(fr'{r} and {r}|(?:{r} , )+and {r}' for r in [fr'\[{role[1 : -1]}\]' for role in roles]))

In [29]:
p

re.compile(r'\[NP_SIMPLE\] and \[NP_SIMPLE\]|(?:\[NP_SIMPLE\] , )+and \[NP_SIMPLE\]|\[entity\] and \[entity\]|(?:\[entity\] , )+and \[entity\]|\[ADJECTIVE_SIMPLE\] and \[ADJECTIVE_SIMPLE\]|(?:\[ADJECTIVE_SIMPLE\] , )+and \[ADJECTIVE_SIMPLE\]|\[VP_SIMPLE\] and \[VP_SIMPLE\]|(?:\[VP_SIMPLE\] , )+and \[VP_SIMPLE\]|\[ROLE_SIMPLE\] and \[ROLE_SIMPLE\]|(?:\[ROLE_SIMPLE\] , )+and \[ROLE_SIMPLE\]',
re.UNICODE)

In [30]:
re.findall(p, '[entity] and [entity] [NP_SIMPLE] , [NP_SIMPLE] , and [NP_SIMPLE]')

['[entity] and [entity]', '[NP_SIMPLE] , [NP_SIMPLE] , and [NP_SIMPLE]']

In [31]:
def _mapper(r):
    t = r['questionTemplate']
    r = '(?:%s)' % '|'.join(fr'\[{role[1 : -1]}\]' for role in roles)  # TODO
    p0 = re.compile(fr'{r} and {r}|(?:{r} , )+and {r}')
    p1 = re.compile('|'.join(fr'{r} and {r}|(?:{r} , )+and {r}' for r in [fr'\[{role[1 : -1]}\]' for role in roles]))
    homo = lambda m: sum(role in m for role in roles) == 1
    return {m.group() for m in re.finditer(p0, t) if homo(m.group())} != {m for m in re.findall(p1, t)}

# df.rdd.filter(_mapper).count()
df.rdd.filter(_mapper).map(lambda r: r['questionTemplate']).take(10)

['Did [entity] [VP_SIMPLE] a [ADJECTIVE_SIMPLE] [NP_SIMPLE] that [entity] , [entity] , and [entity] [VP_SIMPLE] and [entity] and [entity] [VP_SIMPLE]',
 'Did [entity] [VP_SIMPLE] a [NP_SIMPLE] that [entity] , [entity] , and [entity] [VP_SIMPLE] and [entity] and [entity] were [VP_SIMPLE] by',
 'Did a [ADJECTIVE_SIMPLE] [NP_SIMPLE] that [entity] and [entity] [VP_SIMPLE] and [entity] , [entity] , and [entity] [VP_SIMPLE] [VP_SIMPLE] [entity]',
 'Did a [NP_SIMPLE] that [entity] and [entity] [VP_SIMPLE] and [entity] , [entity] , and [entity] were [VP_SIMPLE] by [VP_SIMPLE] [entity]',
 'Did a [NP_SIMPLE] that [entity] and [entity] [VP_SIMPLE] and [entity] , [entity] , [entity] , and [entity] [VP_SIMPLE] [VP_SIMPLE] [entity]',
 'Did a [NP_SIMPLE] that [entity] , [entity] , and [entity] [VP_SIMPLE] and [entity] , [entity] , and [entity] were [VP_SIMPLE] by [VP_SIMPLE] [entity]',
 'Did a [NP_SIMPLE] that [entity] and [entity] [VP_SIMPLE] and [entity] , [entity] , and [entity] were [VP_SIMPLE] b

In [32]:
def _mapper(t):
    r = '(?:%s)' % '|'.join(fr'\[{role[1 : -1]}\]' for role in roles)  # TODO
    p0 = re.compile(fr'{r} and {r}|(?:{r} , )+and {r}')
    p1 = re.compile('|'.join(fr'{r} and {r}|(?:{r} , )+and {r}' for r in [fr'\[{role[1 : -1]}\]' for role in roles]))
    homo = lambda m: sum(role in m for role in roles) == 1
    return {m for m in re.findall(p0, t) if homo(m)}, {m for m in re.findall(p1, t)}

_mapper('Did [entity] [VP_SIMPLE] a [ADJECTIVE_SIMPLE] [NP_SIMPLE] that [entity] , [entity] , and [entity] [VP_SIMPLE] and [entity] and [entity] [VP_SIMPLE]')

({'[entity] , [entity] , and [entity]'},
 {'[entity] , [entity] , and [entity]', '[entity] and [entity]'})

In [72]:
r = '(?:%s)' % '|'.join(fr'\[{role[1 : -1]}\]' for role in roles)  # TODO
# p = re.compile(fr'{r} and {r}|(?:{r} , )+and {r}')
p = re.compile('|'.join(fr'{r} and {r}|(?:{r} , )+and {r}' for r in [fr'\[{role[1 : -1]}\]' for role in roles]))
def grp_by_tag(tags):
    lens = np.array(list(map(len, tags)))
    ends = np.cumsum(lens) + np.arange(len(tags))
    starts = ends - lens

    t = ' '.join(tags)
    homo = lambda s: sum(role in s for role in roles) == 1
    matches = [m for m in re.finditer(p, t) if homo(m.group())]
    if not matches:
        grps = [[i] for i in range(len(tags))]
        return grps
    
    m_start, m_end = zip(*([m.start(), m.end()] for m in matches))
    hit = False
    grps = []
    for idx, [start, end] in enumerate(zip(starts, ends)):
        if start in m_start:
            hit = True
            grps.append([])
        if hit:
            grps[-1].append(idx)
        else:
            grps.append([idx])
        if end in m_end:
            hit = False
    
    for start, end, grp in zip(m_start, m_end, (grp for grp in grps if len(grp) > 1)):
        assert t[start : end] == ' '.join(tags[idx] for idx in grp)

    return grps

def _mapper(r):
    rels = list(map(find_rel, r['sparqlPatternModEntities'].split('\n')[1 : -1]))
    srcs, typs, dsts = zip(*rels)
    ents = sorted({x for x in chain(srcs, dsts) if re.match('M\d', x) or re.match('\?x\d', x)})

    tail = [SEP] + sorted(ent for ent in ents if ent.startswith('?x')) + [NIL]
    toks = r['questionPatternModEntities'].split(' ') + tail
    tags = r['questionTemplate'].split(' ') + tail
    grps = grp_by_tag(tags)

    seq = [tag2idx[tags[idx]] for idx, *_ in grps]
    mem = [[tok2idx[toks[grp[0]]]] if len(grp) == 1 else
           [tok2idx[toks[idx]] for idx in grp if tags[idx] in roles] for grp in grps]
#     print([[toks[idx] for idx in grp] for grp in grps])
#     print([[tags[idx] for idx in grp] for grp in grps])
#     print(mem)
    
    ent2grp = {}
    for idx, tok in zip(chain(*(len(grp) * [idx] for idx, grp in enumerate(grps))), toks):
        if tok in ents:
            ent2grp[tok] = idx
    idx2grp = sorted(set(ent2grp.values()))
    ent2idx = {ent: idx2grp.index(ent2grp[ent]) for ent in ents}
    _, idx2ent = zip(*sorted(ent2idx.items(), key=at(1)))
    
    # filters
    filters = [[ent2idx[src], ent2idx[dst]] for src, typ, dst in rels if typ == '!=']
    
    # attributes
    ent2attr = np.zeros([len(idx2grp), len(idx2attr)])
    for src, _, dst in rels:
        if dst in idx2attr:
            ent2attr[ent2idx[src], idx2attr.index(dst)] = 1

    # groundable relations
    gr_rels = [[ent2idx[src], ent2idx[dst], idx2typ.index(typ)] for src, typ, dst in rels if typ in idx2typ]

    return filters, ent2attr, gr_rels, seq, mem, idx2grp

dat = {}
collect = lambda rdd: np.array(rdd.collect())
flat_collect = lambda rdd: np.array(rdd.flatMap(lambda r: r).collect())

# print(_mapper(df.rdd.take(10)[-1]))
# _mapper(df.where(df.index == 13).rdd.take(1)[0])
rdd = df.rdd.map(_mapper).cache()
dat['n_filter'] = collect(rdd.map(at(0)).map(len))
dat['filter'] = collect(rdd.flatMap(at(0)))
dat['attr'] = np.vstack(rdd.map(at(1)).collect())
dat['n_rel'] = collect(rdd.map(at(2)).map(len))
dat['src'] = collect(rdd.flatMap(at(2)).map(at(0)))
dat['dst'] = collect(rdd.flatMap(at(2)).map(at(1)))
dat['typ'] = collect(rdd.flatMap(at(2)).map(at(2)))
dat['seq'] = collect(rdd.flatMap(at(3)))
dat['n_grp'] = collect(rdd.map(at(4)).map(len))
dat['n_mem'] = collect(rdd.flatMap(at(4)).map(len))
dat['mem'] = collect(rdd.flatMap(at(4)).flatMap(lambda r: r))
dat['n'] = collect(rdd.map(at(5)).map(len))
dat['idx2grp'] = collect(rdd.flatMap(at(5)))

In [34]:
pickle.dump([idx2tok, tok2idx], open(f'{output_dir}/tok-vocab.pickle', 'wb'))
pickle.dump([idx2tag, tag2idx], open(f'{output_dir}/tag-vocab.pickle', 'wb'))
pickle.dump([idx2typ, typ2idx], open(f'{output_dir}/typ-vocab.pickle', 'wb'))
np.savez(f'{output_dir}/data', **dat)

In [162]:
def _mapper(r):
    r, [filters, ent2attr, gr_rels, seq, mem, idx2grp] = r
    rels = set()
    for src, dst, typ in gr_rels:
        for x, y in product(mem[idx2grp[src]], mem[idx2grp[dst]]):
            rels.add(f'{idx2tok[x]} {idx2typ[typ]} {idx2tok[y]}')
#     print(sorted(rels))
    isrel = lambda r: all(s not in r for s in ['!=', ' a ', ' ns:people.person.gender', ' ns:people.person.nationality'])
    xs = ''.join(r['sparqlPatternModEntities'].split('\n')[1 : -1]).split(' .')
#     print(sorted(filter(isrel, xs)))
    return sorted(rels), sorted(filter(isrel, xs)), r['questionPatternModEntities'], r['questionTemplate'], r['index']

In [157]:
_mapper(df.rdd.zip(rdd).take(1)[0])

(['?x0 ns:film.actor.film/ns:film.performance.character M1',
  '?x0 ns:film.editor.film M0',
  '?x0 ns:film.producer.film|ns:film.production_company.films M0'],
 ['?x0 ns:film.actor.film/ns:film.performance.character M1',
  '?x0 ns:film.editor.film M0',
  '?x0 ns:film.producer.film|ns:film.production_company.films M0'],
 "Did M1 's female actor edit and produce M0",
 "Did [entity] 's [ADJECTIVE_SIMPLE] [ROLE_SIMPLE] [VP_SIMPLE] and [VP_SIMPLE] [entity]",
 0)

In [120]:
df.where(df.index == 13).rdd.map(_mapper).take(1)[0]

Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.runJob.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 314.0 failed 1 times, most recent failure: Lost task 0.0 in stage 314.0 (TID 24432, havoc.millennium.berkeley.edu, executor driver): org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/data/yu_gai/anaconda3/envs/ee227/lib/python3.8/site-packages/pyspark/python/lib/pyspark.zip/pyspark/worker.py", line 605, in main
    process()
  File "/data/yu_gai/anaconda3/envs/ee227/lib/python3.8/site-packages/pyspark/python/lib/pyspark.zip/pyspark/worker.py", line 597, in process
    serializer.dump_stream(out_iter, outfile)
  File "/data/yu_gai/anaconda3/envs/ee227/lib/python3.8/site-packages/pyspark/python/lib/pyspark.zip/pyspark/serializers.py", line 271, in dump_stream
    vs = list(itertools.islice(iterator, batch))
  File "/data/yu_gai/anaconda3/envs/cfq-yu/lib/python3.8/site-packages/pyspark/rdd.py", line 1440, in takeUpToNumLeft
    yield next(iterator)
  File "/data/yu_gai/anaconda3/envs/ee227/lib/python3.8/site-packages/pyspark/python/lib/pyspark.zip/pyspark/util.py", line 107, in wrapper
    return f(*args, **kwargs)
  File "<ipython-input-114-361e59757ddd>", line 2, in _mapper
ValueError: too many values to unpack (expected 2)

	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:503)
	at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:638)
	at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:621)
	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:456)
	at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
	at scala.collection.Iterator.foreach(Iterator.scala:941)
	at scala.collection.Iterator.foreach$(Iterator.scala:941)
	at org.apache.spark.InterruptibleIterator.foreach(InterruptibleIterator.scala:28)
	at scala.collection.generic.Growable.$plus$plus$eq(Growable.scala:62)
	at scala.collection.generic.Growable.$plus$plus$eq$(Growable.scala:53)
	at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:105)
	at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:49)
	at scala.collection.TraversableOnce.to(TraversableOnce.scala:315)
	at scala.collection.TraversableOnce.to$(TraversableOnce.scala:313)
	at org.apache.spark.InterruptibleIterator.to(InterruptibleIterator.scala:28)
	at scala.collection.TraversableOnce.toBuffer(TraversableOnce.scala:307)
	at scala.collection.TraversableOnce.toBuffer$(TraversableOnce.scala:307)
	at org.apache.spark.InterruptibleIterator.toBuffer(InterruptibleIterator.scala:28)
	at scala.collection.TraversableOnce.toArray(TraversableOnce.scala:294)
	at scala.collection.TraversableOnce.toArray$(TraversableOnce.scala:288)
	at org.apache.spark.InterruptibleIterator.toArray(InterruptibleIterator.scala:28)
	at org.apache.spark.api.python.PythonRDD$.$anonfun$runJob$1(PythonRDD.scala:154)
	at org.apache.spark.api.python.PythonRDD$$$Lambda$7680/1142513594.apply(Unknown Source)
	at org.apache.spark.SparkContext.$anonfun$runJob$5(SparkContext.scala:2139)
	at org.apache.spark.SparkContext$$Lambda$1313/612188285.apply(Unknown Source)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
	at org.apache.spark.scheduler.Task.run(Task.scala:127)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:446)
	at org.apache.spark.executor.Executor$TaskRunner$$Lambda$1275/458075417.apply(Unknown Source)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1377)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:449)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
	at java.lang.Thread.run(Thread.java:745)

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2059)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2008)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2007)
	at org.apache.spark.scheduler.DAGScheduler$$Lambda$7600/724451276.apply(Unknown Source)
	at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
	at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2007)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:973)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:973)
	at org.apache.spark.scheduler.DAGScheduler$$Lambda$7597/1084380903.apply(Unknown Source)
	at scala.Option.foreach(Option.scala:407)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:973)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2239)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2188)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2177)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:775)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2099)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2120)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2139)
	at org.apache.spark.api.python.PythonRDD$.runJob(PythonRDD.scala:154)
	at org.apache.spark.api.python.PythonRDD.runJob(PythonRDD.scala)
	at sun.reflect.GeneratedMethodAccessor1147.invoke(Unknown Source)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:497)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:238)
	at java.lang.Thread.run(Thread.java:745)
Caused by: org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/data/yu_gai/anaconda3/envs/ee227/lib/python3.8/site-packages/pyspark/python/lib/pyspark.zip/pyspark/worker.py", line 605, in main
    process()
  File "/data/yu_gai/anaconda3/envs/ee227/lib/python3.8/site-packages/pyspark/python/lib/pyspark.zip/pyspark/worker.py", line 597, in process
    serializer.dump_stream(out_iter, outfile)
  File "/data/yu_gai/anaconda3/envs/ee227/lib/python3.8/site-packages/pyspark/python/lib/pyspark.zip/pyspark/serializers.py", line 271, in dump_stream
    vs = list(itertools.islice(iterator, batch))
  File "/data/yu_gai/anaconda3/envs/cfq-yu/lib/python3.8/site-packages/pyspark/rdd.py", line 1440, in takeUpToNumLeft
    yield next(iterator)
  File "/data/yu_gai/anaconda3/envs/ee227/lib/python3.8/site-packages/pyspark/python/lib/pyspark.zip/pyspark/util.py", line 107, in wrapper
    return f(*args, **kwargs)
  File "<ipython-input-114-361e59757ddd>", line 2, in _mapper
ValueError: too many values to unpack (expected 2)

	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:503)
	at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:638)
	at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:621)
	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:456)
	at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
	at scala.collection.Iterator.foreach(Iterator.scala:941)
	at scala.collection.Iterator.foreach$(Iterator.scala:941)
	at org.apache.spark.InterruptibleIterator.foreach(InterruptibleIterator.scala:28)
	at scala.collection.generic.Growable.$plus$plus$eq(Growable.scala:62)
	at scala.collection.generic.Growable.$plus$plus$eq$(Growable.scala:53)
	at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:105)
	at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:49)
	at scala.collection.TraversableOnce.to(TraversableOnce.scala:315)
	at scala.collection.TraversableOnce.to$(TraversableOnce.scala:313)
	at org.apache.spark.InterruptibleIterator.to(InterruptibleIterator.scala:28)
	at scala.collection.TraversableOnce.toBuffer(TraversableOnce.scala:307)
	at scala.collection.TraversableOnce.toBuffer$(TraversableOnce.scala:307)
	at org.apache.spark.InterruptibleIterator.toBuffer(InterruptibleIterator.scala:28)
	at scala.collection.TraversableOnce.toArray(TraversableOnce.scala:294)
	at scala.collection.TraversableOnce.toArray$(TraversableOnce.scala:288)
	at org.apache.spark.InterruptibleIterator.toArray(InterruptibleIterator.scala:28)
	at org.apache.spark.api.python.PythonRDD$.$anonfun$runJob$1(PythonRDD.scala:154)
	at org.apache.spark.api.python.PythonRDD$$$Lambda$7680/1142513594.apply(Unknown Source)
	at org.apache.spark.SparkContext.$anonfun$runJob$5(SparkContext.scala:2139)
	at org.apache.spark.SparkContext$$Lambda$1313/612188285.apply(Unknown Source)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
	at org.apache.spark.scheduler.Task.run(Task.scala:127)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:446)
	at org.apache.spark.executor.Executor$TaskRunner$$Lambda$1275/458075417.apply(Unknown Source)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1377)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:449)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
	... 1 more


In [163]:
df.rdd.zip(rdd).map(_mapper).filter(lambda r: r[0] != r[1]).count()

51

In [164]:
df.rdd.zip(rdd).map(_mapper).filter(lambda r: r[0] != r[1]).take(10)

[(['?x0 ns:people.person.sibling_s/ns:people.sibling_relationship.sibling|ns:fictional_universe.fictional_character.siblings/ns:fictional_universe.sibling_relationship_of_fictional_characters.siblings ?x1',
   '?x1 ns:people.person.spouse_s/ns:people.marriage.spouse|ns:fictional_universe.fictional_character.married_to/ns:fictional_universe.marriage_of_fictional_characters.spouses ?x2',
   '?x2 ns:film.actor.film/ns:film.performance.film M1',
   'M2 ns:business.employer.employees/ns:business.employment_tenure.person ?x0',
   'M2 ns:business.employer.employees/ns:business.employment_tenure.person M3',
   'M4 ns:business.employer.employees/ns:business.employment_tenure.person ?x0',
   'M4 ns:business.employer.employees/ns:business.employment_tenure.person M3'],
  ['?x0 ns:people.person.sibling_s/ns:people.sibling_relationship.sibling|ns:fictional_universe.fictional_character.siblings/ns:fictional_universe.sibling_relationship_of_fictional_characters.siblings ?x1',
   '?x1 ns:people.person

In [165]:
'M2 ns:business.employer.employees/ns:business.employment_tenure.person ?x0' == 'M2 ns:business.employer.employees/ns:business.employment_tenure.person ?x0'

True

In [61]:
print(df.rdd.map(lambda r: r['sparqlPatternModEntities']).take(1)[0])

SELECT count(*) WHERE {
?x0 ns:film.actor.film/ns:film.performance.character M1 .
?x0 ns:film.editor.film M0 .
?x0 ns:film.producer.film|ns:film.production_company.films M0 .
?x0 ns:people.person.gender ns:m.02zsn
}


## Variable prediction

In [68]:
n_var = lambda r: sum(1 for idx in r if idx2tag[idx].startswith('?x'))
# n = lambda r: sum(1 for idx in r if idx2tag[idx] in ['[NP_SIMPLE]', '[ROLE_SIMPLE]', 'Who'])
n = lambda r: sum(1 for idx in r if idx2tag[idx] in ['[NP_SIMPLE]', '[ROLE_SIMPLE]', 'Who']) + (seq2str(r).startswith('What did') or seq2str(r).startswith('What was'))

In [69]:
seq2str = lambda seq: ' '.join(idx2tag[idx] for idx in seq)
str2seq = lambda r: [idx2tag.index(tag) for tag in r.split(' ')]

In [70]:
rdd_seq = rdd.map(at(3)).map(lambda r: list(takewhile(lambda idx: idx2tag[idx] != '{SEP}', r)))
dat_var = {
    'seq' : flat_collect(rdd_seq),
    'len' : collect(rdd_seq.map(len)),
    'n' : collect(rdd.map(at(3)).map(lambda r: n_var(r) - n(r) + 1))
}
np.savez(f'{output_dir}/var', **dat_var)

NameError: name 'flat_collect' is not defined

In [261]:
rdd_x = rdd.map(at(3)).filter(lambda r: n_var(r) > 0 and n_var(r) != n(r)).map(lambda r: ' '.join(idx2tag[idx] for idx in r)).distinct().cache()

In [262]:
rdd.map(at(3)).map(lambda r: n(r) - n_var(r)).distinct().collect()

[0, 1, -1]

In [271]:
count(rdd.map(at(3)).map(lambda r: ' '.join([idx2tag[idx] for idx in r])).distinct().map(lambda r: r.split(' ')).map(lambda r: n(r) - n_var(r)))

Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.collectAndServe.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 59 in stage 654.0 failed 1 times, most recent failure: Lost task 59.0 in stage 654.0 (TID 49179, havoc.millennium.berkeley.edu, executor driver): org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/data/yu_gai/anaconda3/envs/ee227/lib/python3.8/site-packages/pyspark/python/lib/pyspark.zip/pyspark/worker.py", line 605, in main
    process()
  File "/data/yu_gai/anaconda3/envs/ee227/lib/python3.8/site-packages/pyspark/python/lib/pyspark.zip/pyspark/worker.py", line 595, in process
    out_iter = func(split_index, iterator)
  File "/data/yu_gai/anaconda3/envs/cfq-yu/lib/python3.8/site-packages/pyspark/rdd.py", line 2596, in pipeline_func
    return func(split, prev_func(split, iterator))
  File "/data/yu_gai/anaconda3/envs/cfq-yu/lib/python3.8/site-packages/pyspark/rdd.py", line 2596, in pipeline_func
    return func(split, prev_func(split, iterator))
  File "/data/yu_gai/anaconda3/envs/cfq-yu/lib/python3.8/site-packages/pyspark/rdd.py", line 425, in func
    return f(iterator)
  File "/data/yu_gai/anaconda3/envs/cfq-yu/lib/python3.8/site-packages/pyspark/rdd.py", line 1946, in combineLocally
    merger.mergeValues(iterator)
  File "/data/yu_gai/anaconda3/envs/ee227/lib/python3.8/site-packages/pyspark/python/lib/pyspark.zip/pyspark/shuffle.py", line 238, in mergeValues
    for k, v in iterator:
  File "/data/yu_gai/anaconda3/envs/ee227/lib/python3.8/site-packages/pyspark/python/lib/pyspark.zip/pyspark/util.py", line 107, in wrapper
    return f(*args, **kwargs)
  File "<ipython-input-271-138f853d8e76>", line 1, in <lambda>
  File "<ipython-input-261-696bf5fe079f>", line 2, in <lambda>
  File "<ipython-input-261-696bf5fe079f>", line 2, in <genexpr>
TypeError: list indices must be integers or slices, not str

	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:503)
	at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:638)
	at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:621)
	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:456)
	at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
	at scala.collection.Iterator$GroupedIterator.fill(Iterator.scala:1209)
	at scala.collection.Iterator$GroupedIterator.hasNext(Iterator.scala:1215)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458)
	at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:132)
	at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:52)
	at org.apache.spark.scheduler.Task.run(Task.scala:127)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:446)
	at org.apache.spark.executor.Executor$TaskRunner$$Lambda$1275/458075417.apply(Unknown Source)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1377)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:449)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
	at java.lang.Thread.run(Thread.java:745)

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2059)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2008)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2007)
	at org.apache.spark.scheduler.DAGScheduler$$Lambda$7600/724451276.apply(Unknown Source)
	at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
	at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2007)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:973)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:973)
	at org.apache.spark.scheduler.DAGScheduler$$Lambda$7597/1084380903.apply(Unknown Source)
	at scala.Option.foreach(Option.scala:407)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:973)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2239)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2188)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2177)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:775)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2099)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2120)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2139)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2164)
	at org.apache.spark.rdd.RDD.$anonfun$collect$1(RDD.scala:1004)
	at org.apache.spark.rdd.RDD$$Lambda$1003/1661322438.apply(Unknown Source)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:388)
	at org.apache.spark.rdd.RDD.collect(RDD.scala:1003)
	at org.apache.spark.api.python.PythonRDD$.collectAndServe(PythonRDD.scala:168)
	at org.apache.spark.api.python.PythonRDD.collectAndServe(PythonRDD.scala)
	at sun.reflect.GeneratedMethodAccessor1087.invoke(Unknown Source)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:497)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:238)
	at java.lang.Thread.run(Thread.java:745)
Caused by: org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/data/yu_gai/anaconda3/envs/ee227/lib/python3.8/site-packages/pyspark/python/lib/pyspark.zip/pyspark/worker.py", line 605, in main
    process()
  File "/data/yu_gai/anaconda3/envs/ee227/lib/python3.8/site-packages/pyspark/python/lib/pyspark.zip/pyspark/worker.py", line 595, in process
    out_iter = func(split_index, iterator)
  File "/data/yu_gai/anaconda3/envs/cfq-yu/lib/python3.8/site-packages/pyspark/rdd.py", line 2596, in pipeline_func
    return func(split, prev_func(split, iterator))
  File "/data/yu_gai/anaconda3/envs/cfq-yu/lib/python3.8/site-packages/pyspark/rdd.py", line 2596, in pipeline_func
    return func(split, prev_func(split, iterator))
  File "/data/yu_gai/anaconda3/envs/cfq-yu/lib/python3.8/site-packages/pyspark/rdd.py", line 425, in func
    return f(iterator)
  File "/data/yu_gai/anaconda3/envs/cfq-yu/lib/python3.8/site-packages/pyspark/rdd.py", line 1946, in combineLocally
    merger.mergeValues(iterator)
  File "/data/yu_gai/anaconda3/envs/ee227/lib/python3.8/site-packages/pyspark/python/lib/pyspark.zip/pyspark/shuffle.py", line 238, in mergeValues
    for k, v in iterator:
  File "/data/yu_gai/anaconda3/envs/ee227/lib/python3.8/site-packages/pyspark/python/lib/pyspark.zip/pyspark/util.py", line 107, in wrapper
    return f(*args, **kwargs)
  File "<ipython-input-271-138f853d8e76>", line 1, in <lambda>
  File "<ipython-input-261-696bf5fe079f>", line 2, in <lambda>
  File "<ipython-input-261-696bf5fe079f>", line 2, in <genexpr>
TypeError: list indices must be integers or slices, not str

	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:503)
	at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:638)
	at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:621)
	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:456)
	at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
	at scala.collection.Iterator$GroupedIterator.fill(Iterator.scala:1209)
	at scala.collection.Iterator$GroupedIterator.hasNext(Iterator.scala:1215)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458)
	at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:132)
	at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:52)
	at org.apache.spark.scheduler.Task.run(Task.scala:127)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:446)
	at org.apache.spark.executor.Executor$TaskRunner$$Lambda$1275/458075417.apply(Unknown Source)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1377)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:449)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
	... 1 more


In [301]:
count(rdd.map(at(3)).map(seq2str).distinct().map(str2seq).map(lambda r: n(r) - n_var(r)))

{0: 13738, 1: 5129}

In [304]:
lt = count(rdd.map(at(3)).filter(lambda r: n(r) - n_var(r) == 1).map(lambda r: r[:2]).map(seq2str))

In [305]:
eq = count(rdd.map(at(3)).filter(lambda r: n(r) - n_var(r) == 0).map(lambda r: r[:2]).map(seq2str))

In [319]:
rdd.map(at(3)).filter(lambda r: n(r) - n_var(r) == 1 and idx2tag.index('[ROLE_SIMPLE]') not in r and n_var(r) > 0).map(seq2str).distinct().take(10)

['Was a [ADJECTIVE_SIMPLE] [NP_SIMPLE] that [entity] were [VP_SIMPLE] by a [NP_SIMPLE] {SEP} ?x0 {NIL}',
 'What [NP_SIMPLE] was a [NP_SIMPLE] [VP_SIMPLE] by [entity] and [VP_SIMPLE] by [entity] {SEP} ?x0 {NIL}',
 'What was a [NP_SIMPLE] that was [VP_SIMPLE] by and [VP_SIMPLE] [entity] {SEP} ?x0 {NIL}',
 'Which [ADJECTIVE_SIMPLE] [NP_SIMPLE] was a [NP_SIMPLE] that [entity] were [VP_SIMPLE] by {SEP} ?x0 {NIL}',
 'Was [entity] a [NP_SIMPLE] that a [NP_SIMPLE] [VP_SIMPLE] and a [ADJECTIVE_SIMPLE] [NP_SIMPLE] [VP_SIMPLE] {SEP} ?x0 ?x1 {NIL}',
 'Was a [NP_SIMPLE] a [NP_SIMPLE] that [entity] [VP_SIMPLE] {SEP} ?x0 {NIL}',
 'Was a [NP_SIMPLE] a [ADJECTIVE_SIMPLE] [ADJECTIVE_SIMPLE] [NP_SIMPLE] that was [VP_SIMPLE] by [entity] {SEP} ?x0 {NIL}',
 'Was a [NP_SIMPLE] that [entity] was [VP_SIMPLE] by and was [VP_SIMPLE] by a [ADJECTIVE_SIMPLE] [NP_SIMPLE] {SEP} ?x0 {NIL}',
 'Was [entity] a [NP_SIMPLE] [VP_SIMPLE] by [entity] and [VP_SIMPLE] by a [NP_SIMPLE] {SEP} ?x0 {NIL}',
 'Was [entity] a [NP_SIM

In [492]:
df.rdd.zip(rdd).filter(lambda r: seq2str(r[1][3]) ==  'What [NP_SIMPLE] did a [NP_SIMPLE] [VP_SIMPLE] and [entity] and [entity] [VP_SIMPLE] {SEP} ?x0 ?x1 {NIL}').map(lambda r: r[0]['questionPatternModEntities']).take(10)

['What actor did a person influence and M1 and M2 marry',
 'What artdirector did a cinematographer marry and M1 and M2 influence',
 'What film did a cinematographer write and M1 and M2 direct',
 'What film did a costumedesigner executiveproduce and M1 and M2 direct',
 'What film did a filmeditor edit and M1 and M2 edit',
 'What filmdistributor did a filmproducer found and M1 and M2 found',
 'What filmeditor did a artdirector influence and M1 and M2 influence',
 'What filmproducer did a person influence and M1 and M2 marry',
 'What productioncompany did a costumedesigner found and M1 and M2 found']

In [387]:
df.rdd.zip(rdd).filter(lambda r: seq2str(r[1][3]) == 'Who was a [ADJECTIVE_SIMPLE] [NP_SIMPLE] [VP_SIMPLE] by [entity] , [VP_SIMPLE] by [entity] , and [VP_SIMPLE] by [entity] {SEP} ?x0 {NIL}'
).map(lambda r: r[0]['questionPatternModEntities']).take(10)

['Who was a British cinematographer influenced by M2 , influenced by M3 , and influenced by M4',
 'Who was a Canadian filmdirector influenced by M2 , influenced by M3 , and influenced by M4 and M5',
 'Who was a Chinese screenwriter influenced by M2 , influenced by M3 and M4 , and influenced by M5',
 'Who was a French filmeditor influenced by M2 , influenced by M3 and M4 , and influenced by M5',
 'Who was a Spanish filmdirector employed by M2 , employed by M3 and M4 , and employed by M5',
 'Who was a Swedish screenwriter influenced by M2 , influenced by M3 and M4 , and influenced by M5']

In [310]:
count(rdd.map(at(3)).filter(lambda r: n(r) - n_var(r) == 1).filter(lambda r: seq2str(r[:2]) == 'Was a').map(n_var))

{0: 3798, 1: 9793, 2: 4760, 3: 621, 4: 74, 5: 2}

In [311]:
rdd.map(at(3)).filter(lambda r: n(r) - n_var(r) == 1).filter(lambda r: seq2str(r[:2]) == 'What [NP_SIMPLE]').map(seq2str).distinct().take(10)

["What [NP_SIMPLE] that [VP_SIMPLE] [entity] and [VP_SIMPLE] [entity] was a [NP_SIMPLE] 's [ROLE_SIMPLE] {SEP} ?x0 ?x1 {NIL}",
 'What [NP_SIMPLE] was a [ROLE_SIMPLE] of a [ADJECTIVE_SIMPLE] [ROLE_SIMPLE] of [entity] {SEP} ?x0 ?x1 {NIL}',
 'What [NP_SIMPLE] was a [NP_SIMPLE] [VP_SIMPLE] by [entity] and [VP_SIMPLE] by [entity] {SEP} ?x0 {NIL}',
 "What [NP_SIMPLE] that [entity] were [VP_SIMPLE] by and [entity] [VP_SIMPLE] was a [NP_SIMPLE] 's [ROLE_SIMPLE] {SEP} ?x0 ?x1 {NIL}",
 'What [NP_SIMPLE] that [entity] [VP_SIMPLE] was a [ADJECTIVE_SIMPLE] [ADJECTIVE_SIMPLE] [ROLE_SIMPLE] of [entity] {SEP} ?x0 {NIL}',
 "What [NP_SIMPLE] that was [VP_SIMPLE] by and was [VP_SIMPLE] by [entity] was [entity] 's [ROLE_SIMPLE] {SEP} ?x0 {NIL}",
 "What [NP_SIMPLE] that [entity] [VP_SIMPLE] was a [NP_SIMPLE] 's [ROLE_SIMPLE] 's [ADJECTIVE_SIMPLE] [ROLE_SIMPLE] {SEP} ?x0 ?x1 ?x2 {NIL}",
 'What [NP_SIMPLE] that [entity] were [VP_SIMPLE] by was a [ADJECTIVE_SIMPLE] [ADJECTIVE_SIMPLE] [NP_SIMPLE] {SEP} ?x0 {NI

In [312]:
rdd.map(at(3)).filter(lambda r: n(r) - n_var(r) == 0).filter(lambda r: seq2str(r[:2]) == 'What [NP_SIMPLE]').map(seq2str).distinct().take(10)

["What [NP_SIMPLE] was [VP_SIMPLE] by [entity] and was [VP_SIMPLE] by a [ADJECTIVE_SIMPLE] [NP_SIMPLE] 's [ROLE_SIMPLE] {SEP} ?x0 ?x1 ?x2 {NIL}",
 "What [NP_SIMPLE] [VP_SIMPLE] a [NP_SIMPLE] 's [ROLE_SIMPLE] 's [ADJECTIVE_SIMPLE] [ROLE_SIMPLE] and [VP_SIMPLE] [entity] {SEP} ?x0 ?x1 ?x2 ?x3 {NIL}",
 'What [NP_SIMPLE] was [VP_SIMPLE] by a [NP_SIMPLE] and [VP_SIMPLE] by [entity] {SEP} ?x0 ?x1 {NIL}',
 "What [NP_SIMPLE] was [VP_SIMPLE] by [entity] , [VP_SIMPLE] by a [NP_SIMPLE] , and [VP_SIMPLE] by [entity] 's [ROLE_SIMPLE] {SEP} ?x0 ?x1 ?x2 {NIL}",
 "What [NP_SIMPLE] [VP_SIMPLE] [entity] 's [ROLE_SIMPLE] and [VP_SIMPLE] a [ROLE_SIMPLE] of [entity] {SEP} ?x0 ?x1 ?x2 {NIL}",
 'What [NP_SIMPLE] whose [ROLE_SIMPLE] [VP_SIMPLE] [entity] and [VP_SIMPLE] [entity] [VP_SIMPLE] [entity] {SEP} ?x0 ?x1 {NIL}',
 "What [NP_SIMPLE] did [entity] 's [ROLE_SIMPLE] [VP_SIMPLE] and a [ROLE_SIMPLE] of [entity] [VP_SIMPLE] {SEP} ?x0 ?x1 ?x2 {NIL}",
 'What [NP_SIMPLE] [VP_SIMPLE] by , [VP_SIMPLE] by , and [VP_S

In [314]:
print(df.rdd.filter(lambda r: r['questionTemplate'] == "What [NP_SIMPLE] was [VP_SIMPLE] by [entity] and was [VP_SIMPLE] by a [ADJECTIVE_SIMPLE] [NP_SIMPLE] 's [ROLE_SIMPLE]").map(lambda r: r['sparqlPatternModEntities']).distinct().take(1)[0])

SELECT DISTINCT ?x0 WHERE {
?x0 a ns:film.film .
?x0 ns:film.film.distributors/ns:film.film_film_distributor_relationship.distributor M1 .
?x0 ns:film.film.produced_by|ns:film.film.production_companies ?x1 .
?x1 ns:people.person.spouse_s/ns:people.marriage.spouse|ns:fictional_universe.fictional_character.married_to/ns:fictional_universe.marriage_of_fictional_characters.spouses ?x2 .
?x2 a ns:people.person .
?x2 ns:people.person.nationality ns:m.03_3d .
FILTER ( ?x1 != ?x2 )
}


In [306]:
lt

{'Was a': 19048,
 'What [NP_SIMPLE]': 3115,
 'Which [NP_SIMPLE]': 3159,
 'Which [ROLE_SIMPLE]': 2408,
 'What [ROLE_SIMPLE]': 2619,
 'What was': 2197,
 'Was [entity]': 21456,
 'Who was': 12266,
 'What [ADJECTIVE_SIMPLE]': 1765,
 'Which [ADJECTIVE_SIMPLE]': 1652}

In [307]:
eq

{'Was a': 4071,
 'What [NP_SIMPLE]': 7281,
 'Did [entity]': 25876,
 'Were [entity]': 19457,
 'Which [NP_SIMPLE]': 7529,
 'Which [ROLE_SIMPLE]': 5911,
 'What [ROLE_SIMPLE]': 5981,
 'Who did': 3005,
 'What did': 10234,
 'What was': 17609,
 'Did a': 17175,
 'Was [entity]': 23488,
 'Who was': 4796,
 'What [ADJECTIVE_SIMPLE]': 5815,
 'Who [VP_SIMPLE]': 5637,
 'Which [ADJECTIVE_SIMPLE]': 5807}

In [280]:
unique(df.rdd.map(lambda r: r['questionTemplate']).filter(lambda r: r.startswith('What')).map(lambda r: ' '.join(r.split(' ')[:2])))

['What [ADJECTIVE_SIMPLE]',
 'What [NP_SIMPLE]',
 'What [ROLE_SIMPLE]',
 'What did',
 'What was']

In [300]:
count(rdd.map(at(3)).filter(lambda r: n(r) - n_var(r) == -1).map(lambda r: r[:2]).map(seq2str))

{}

In [252]:
rdd_x.count()

6171

In [253]:
rdd_x.take(10)

['Was [entity] a [ADJECTIVE_SIMPLE] [ROLE_SIMPLE] of a [ADJECTIVE_SIMPLE] [NP_SIMPLE] {SEP} ?x0 {NIL}',
 "Was [entity] a [NP_SIMPLE] 's [ROLE_SIMPLE] 's [ADJECTIVE_SIMPLE] [ROLE_SIMPLE] 's [ADJECTIVE_SIMPLE] [ROLE_SIMPLE] 's [ROLE_SIMPLE] {SEP} ?x0 ?x1 ?x2 ?x3 {NIL}",
 "Was [entity] a [ADJECTIVE_SIMPLE] [NP_SIMPLE] 's [ROLE_SIMPLE] 's [ADJECTIVE_SIMPLE] [ROLE_SIMPLE] 's [ROLE_SIMPLE] {SEP} ?x0 ?x1 ?x2 {NIL}",
 'Was a [ADJECTIVE_SIMPLE] [NP_SIMPLE] that [entity] were [VP_SIMPLE] by a [NP_SIMPLE] {SEP} ?x0 {NIL}',
 "Was a [ROLE_SIMPLE] of a [NP_SIMPLE] 's [ADJECTIVE_SIMPLE] [ADJECTIVE_SIMPLE] [ROLE_SIMPLE] [entity] {SEP} ?x0 ?x1 {NIL}",
 "What [ADJECTIVE_SIMPLE] [NP_SIMPLE] that [entity] was [VP_SIMPLE] by and [entity] was [VP_SIMPLE] by was [entity] 's [ROLE_SIMPLE] {SEP} ?x0 {NIL}",
 'What [ADJECTIVE_SIMPLE] [ROLE_SIMPLE] of [entity] was a [NP_SIMPLE] whose [ROLE_SIMPLE] [VP_SIMPLE] [entity] {SEP} ?x0 ?x1 {NIL}',
 "What [NP_SIMPLE] that [VP_SIMPLE] [entity] and [VP_SIMPLE] [entity] was

In [234]:
rdd.map(at(3)).take(1)

[[10, 20, 8, 16, 18, 19, 20, 0, 2, 1]]

In [257]:
df.rdd.map(lambda r: r['questionTemplate'].split(' ')).flatMap(lambda r: [' '.join([tok, r[idx + 1]]) for idx, tok in enumerate(r[:-1]) if tok == '[ADJECTIVE_SIMPLE]']).distinct().collect()

['[ADJECTIVE_SIMPLE] [ADJECTIVE_SIMPLE]',
 '[ADJECTIVE_SIMPLE] [ROLE_SIMPLE]',
 '[ADJECTIVE_SIMPLE] [NP_SIMPLE]']

In [258]:
df.rdd.map(lambda r: r['questionTemplate'].split(' ')).flatMap(lambda r: [' '.join([r[idx - 1], tok]) for idx, tok in enumerate(r[1:]) if tok == '[ADJECTIVE_SIMPLE]']).distinct().collect()

['[ROLE_SIMPLE] [ADJECTIVE_SIMPLE]',
 'was [ADJECTIVE_SIMPLE]',
 'that [ADJECTIVE_SIMPLE]',
 '[ADJECTIVE_SIMPLE] [ADJECTIVE_SIMPLE]',
 'of [ADJECTIVE_SIMPLE]',
 'did [ADJECTIVE_SIMPLE]',
 'Did [ADJECTIVE_SIMPLE]',
 '[entity] [ADJECTIVE_SIMPLE]',
 'whose [ADJECTIVE_SIMPLE]',
 '[NP_SIMPLE] [ADJECTIVE_SIMPLE]',
 'and [ADJECTIVE_SIMPLE]',
 'a [ADJECTIVE_SIMPLE]',
 'by [ADJECTIVE_SIMPLE]',
 ', [ADJECTIVE_SIMPLE]',
 '[VP_SIMPLE] [ADJECTIVE_SIMPLE]',
 'What [ADJECTIVE_SIMPLE]',
 'Which [ADJECTIVE_SIMPLE]',
 "'s [ADJECTIVE_SIMPLE]",
 'Was [ADJECTIVE_SIMPLE]']

In [259]:
df.rdd.map(lambda r: r['questionTemplate'].split(' ')).flatMap(lambda r: [' '.join([r[idx - 1], tok, r[idx + 1]]) for idx, tok in enumerate(r[1 : -1]) if tok == '[ADJECTIVE_SIMPLE]']).distinct().collect()

[', [ADJECTIVE_SIMPLE] [ADJECTIVE_SIMPLE]',
 '[entity] [ADJECTIVE_SIMPLE] [ADJECTIVE_SIMPLE]',
 '[VP_SIMPLE] [ADJECTIVE_SIMPLE] [ADJECTIVE_SIMPLE]',
 'and [ADJECTIVE_SIMPLE] [ADJECTIVE_SIMPLE]',
 '[ADJECTIVE_SIMPLE] [ADJECTIVE_SIMPLE] [ADJECTIVE_SIMPLE]',
 'a [ADJECTIVE_SIMPLE] [ADJECTIVE_SIMPLE]',
 'Which [ADJECTIVE_SIMPLE] [ADJECTIVE_SIMPLE]',
 'Did [ADJECTIVE_SIMPLE] [ADJECTIVE_SIMPLE]',
 '[NP_SIMPLE] [ADJECTIVE_SIMPLE] [ADJECTIVE_SIMPLE]',
 'was [ADJECTIVE_SIMPLE] [ADJECTIVE_SIMPLE]',
 "'s [ADJECTIVE_SIMPLE] [ADJECTIVE_SIMPLE]",
 'Was [ADJECTIVE_SIMPLE] [ADJECTIVE_SIMPLE]',
 'whose [ADJECTIVE_SIMPLE] [ADJECTIVE_SIMPLE]',
 '[ROLE_SIMPLE] [ADJECTIVE_SIMPLE] [ADJECTIVE_SIMPLE]',
 'by [ADJECTIVE_SIMPLE] [ADJECTIVE_SIMPLE]',
 'What [ADJECTIVE_SIMPLE] [ADJECTIVE_SIMPLE]',
 'did [ADJECTIVE_SIMPLE] [ADJECTIVE_SIMPLE]',
 'of [ADJECTIVE_SIMPLE] [ADJECTIVE_SIMPLE]',
 'that [ADJECTIVE_SIMPLE] [ADJECTIVE_SIMPLE]']

In [38]:
import nltk

In [466]:
uniq_seq = rdd_seq.map(seq2str).distinct().persist()
uniq_seq.count()

18867

In [39]:
grammar = nltk.CFG.fromstring("""
    S -> NP
    DT -> 'a' | 'an'
    JJ -> 'ADJECTIVE_SIMPLE'
    N -> 'NP_SIMPLE' | 'ROLE_SIMPLE' | JJ N
    NP -> 'entity' | DT N | 'What' | 'What' N | 'Which' N | 'Who' | NP POS N | NP PP NP
    POS -> "'s"
    PP -> 'of'
""")
parser = nltk.ChartParser(grammar)

def find_noun_phrases(tags):
    hits = []
    start = 0
    while start < len(tags):
        for end in range(len(tags), start, -1):
            try:
                trees = list(parser.parse(tags[start : end]))
            except ValueError:
                trees = []
            if len(trees) > 0:
                hits.append([start, end, trees])
                start = end - 1
                break
        start += 1
    return hits
            
for start, end, trees in find_noun_phrases("What".split(' ')):
# for start, end, trees in find_noun_phrases("entity 's ADJECTIVE_SIMPLE ADJECTIVE_SIMPLE ADJECTIVE_SIMPLE ROLE_SIMPLE".split(' ')):
    for tree in trees:
        tree.pretty_print()

 S  
 |   
 NP 
 |   
What



In [42]:
for tree in parser.parse("a ADJECTIVE_SIMPLE ROLE_SIMPLE of entity 's ADJECTIVE_SIMPLE ROLE_SIMPLE".split(' ')):
    tree.pretty_print()

                                      S                                             
                                      |                                              
                                      NP                                            
                               _______|_______________________________               
                              NP                 |                    |             
            __________________|____________      |                    |              
           NP                         |    |     |                    |             
  _________|__________                |    |     |                    |              
 |                    N               |    |     |                    N             
 |          __________|_______        |    |     |          __________|_______       
 DT        JJ                 N       PP   NP   POS        JJ                 N     
 |         |                  |       |    |     |         |

In [609]:
find_ambiguities("a ADJECTIVE_SIMPLE ROLE_SIMPLE of entity 's ADJECTIVE_SIMPLE ROLE_SIMPLE".split(' '))

[[0,
  8,
  [Tree('S', [Tree('NP', [Tree('NP', [Tree('NP', [Tree('DT', ['a']), Tree('N', [Tree('JJ', ['ADJECTIVE_SIMPLE']), Tree('N', ['ROLE_SIMPLE'])])]), Tree('PP', ['of']), Tree('NP', ['entity'])]), Tree('POS', ["'s"]), Tree('N', [Tree('JJ', ['ADJECTIVE_SIMPLE']), Tree('N', ['ROLE_SIMPLE'])])])]),
   Tree('S', [Tree('NP', [Tree('NP', [Tree('DT', ['a']), Tree('N', [Tree('JJ', ['ADJECTIVE_SIMPLE']), Tree('N', ['ROLE_SIMPLE'])])]), Tree('PP', ['of']), Tree('NP', [Tree('NP', ['entity']), Tree('POS', ["'s"]), Tree('N', [Tree('JJ', ['ADJECTIVE_SIMPLE']), Tree('N', ['ROLE_SIMPLE'])])])])])]]]

In [614]:
def find_ambiguities(tags):
    hits = []
    start = 0
    while start < len(tags):
        for end in range(len(tags), start, -1):
            try:
                trees = list(parser.parse(tags[start : end]))
            except ValueError:
                trees = []
            if len(trees) > 1:
                hits.append([start, end, trees])
            if len(trees) > 0:
                start = end - 1
                break
        start += 1
    return hits

for hits, seq in uniq_seq.map(lambda r: find_ambiguities(r.replace('[', '').replace(']', '').split(' '))).zip(uniq_seq).filter(lambda r: len(r[0]) > 0).take(10):
    print(seq)
    for start, end, trees in hits:
        print(seq.split(' ')[start : end])
        for tree in trees:
            tree.pretty_print()

Did [entity] [VP_SIMPLE] a [ADJECTIVE_SIMPLE] [ROLE_SIMPLE] of [entity] 's [ADJECTIVE_SIMPLE] [ROLE_SIMPLE]
['a', '[ADJECTIVE_SIMPLE]', '[ROLE_SIMPLE]', 'of', '[entity]', "'s", '[ADJECTIVE_SIMPLE]', '[ROLE_SIMPLE]']
                                      S                                             
                                      |                                              
                                      NP                                            
                               _______|_______________________________               
                              NP                 |                    |             
            __________________|____________      |                    |              
           NP                         |    |     |                    |             
  _________|__________                |    |     |                    |              
 |                    N               |    |     |                    N             
 |          ___

In [67]:
def _mapper(r):
    idx2tag_ = [tag.replace('[', '').replace(']', '') for tag in idx2tag]
    templ = [idx2tag_[idx] for idx in r]
    hits = find_noun_phrases(templ)

    idx = 0
    starts, ends, _ = zip(*hits)
    tags = []
    while idx < len(templ):
        if idx in starts:
            tags.append(len(idx2tag_))
            idx = ends[starts.index(idx)]
        else:
            tags.append(idx2tag_.index(templ[idx]))
            idx += 1
    
    noun_phrases = [[idx2tag_.index(tag) for tag in templ[start : end]] for start, end, _ in hits]
    positions = [idx for idx, tag in enumerate(tags) if tag == len(idx2tag_)]

    return tags, noun_phrases, positions

dat_np = {}
rdd_np = rdd.map(at(3)).map(lambda r: list(takewhile(lambda idx: idx2tag[idx] != '{SEP}', r))).map(_mapper).cache()
dat_np['len_tag'] = collect(rdd_np.map(at(0)).map(len))
dat_np['seq_tag'] = collect(rdd_np.flatMap(at(0)))
dat_np['len_np'] = collect(rdd_np.map(at(1)).map(len))
rdd_np_ = rdd_np.flatMap(at(1)).cache()
dat_np['len_noun'] = collect(rdd_np_.map(len))
dat_np['seq_noun'] = flat_collect(rdd_np_)
dat_np['pos_np'] = collect(rdd_np.flatMap(at(2)))
dat_np['n_var'] = dat_var['n']
np.savez(f'{output_dir}/var', **dat_np)

In [113]:
seq2str(rdd.map(at(3)).take(1)[0])

"Did [entity] 's [ADJECTIVE_SIMPLE] [ROLE_SIMPLE] [VP_SIMPLE] [entity] {SEP} ?x0 {NIL}"

In [45]:
def _mapper(r):
    idx2tag_ = [tag.replace('[', '').replace(']', '') for tag in idx2tag]
    templ = [idx2tag_[idx] for idx in r]
    hits = find_noun_phrases(templ)
    idx_sep, idx_nil = templ.index(SEP), templ.index(NIL)
    if idx_sep + 1 < idx_nil:
        hits.append([idx_sep + 1, idx_nil, None])

    idx = 0
    starts, ends, _ = zip(*hits)
    tags = []
    while idx < len(templ):
        if idx in starts:
            tags.append(len(idx2tag_))
            idx = ends[starts.index(idx)]
        else:
            tags.append(idx2tag_.index(templ[idx]))
            idx += 1
    
    noun_phrases = [[idx2tag_.index(tag) for tag in templ[start : end]] for start, end, _ in hits]
    positions = [idx for idx, tag in enumerate(tags) if tag == len(idx2tag_)]

    return tags, noun_phrases, positions

# dat_np = {}
# rdd_np = rdd.map(at(3)).map(_mapper).cache()
# dat_np['len_tag'] = collect(rdd_np.map(at(0)).map(len))
# dat_np['seq_tag'] = collect(rdd_np.flatMap(at(0)))
# dat_np['len_np'] = collect(rdd_np.map(at(1)).map(len))
# rdd_np_ = rdd_np.flatMap(at(1)).cache()
# dat_np['len_noun'] = collect(rdd_np_.map(len))
# dat_np['seq_noun'] = flat_collect(rdd_np_)
# dat_np['pos_np'] = collect(rdd_np.flatMap(at(2)))
# dat_np['n_var'] = dat_var['n']
# np.savez(f'{output_dir}/data', **dat)

PythonRDD[292] at RDD at PythonRDD.scala:53

In [46]:
_mapper(rdd.map(at(3)).take(1)[0])

KeyboardInterrupt: 

In [639]:
def _mapper(r):
    idx2tag_ = [tag.replace('[', '').replace(']', '') for tag in idx2tag]
    templ = [idx2tag_[idx] for idx in r]
    hits = find_noun_phrases(templ)
    tags, _ = zip(*groupby(len(idx2tag_) if any(start <= idx < end for start, end, _ in hits)
                           else idx2tag_.index(tag) for idx, tag in enumerate(templ)))
    noun_phrases = [[idx2tag_.index(tag) for tag in templ[start : end]] for start, end, _ in hits]
    positions = [idx for idx, tag in enumerate(tags) if tag == len(idx2tag_)]
    return len(noun_phrases) != len(positions)

rdd.map(at(3)).map(lambda r: list(takewhile(lambda idx: idx2tag[idx] != '{SEP}', r))).filter(_mapper).take(10)

[[11, 20, 8, 18, 20, 8, 18],
 [11, 20, 8, 18, 20, 8, 18],
 [11, 20, 8, 18, 20, 8, 18, 8, 18],
 [11, 20, 8, 18, 21, 18, 25, 20],
 [11, 20, 8, 18, 21, 18, 25, 20],
 [11, 20, 8, 18, 8, 18, 20],
 [11, 20, 8, 18, 8, 18, 20],
 [11, 20, 8, 18, 21, 18, 25, 21, 17],
 [11, 20, 8, 18, 20, 8, 16, 18],
 [11, 20, 20, 8, 16, 16, 16, 18, 8, 18]]

In [650]:
def _mapper(r):
    idx2tag_ = [tag.replace('[', '').replace(']', '') for tag in idx2tag]
    templ = [idx2tag_[idx] for idx in r]
    hits = find_noun_phrases(templ)

    idx = 0
    starts, ends, _ = zip(*hits)
    tags = []
    while idx < len(templ):
        if idx in starts:
            tags.append(len(idx2tag_))
            idx = ends[starts.index(idx)]
        else:
            tags.append(idx2tag_.index(templ[idx]))
            idx += 1
    
    noun_phrases = [[idx2tag_.index(tag) for tag in templ[start : end]] for start, end, _ in hits]
    positions = [idx for idx, tag in enumerate(tags) if tag == len(idx2tag_)]

    return tags, noun_phrases, positions

_mapper([11, 20, 8, 18, 20, 8, 18])

([11, 30, 30], [[20, 8, 18], [20, 8, 18]], [1, 2])

In [634]:
len(dat_np['pos_np'])

558937

In [635]:
sum(dat_np['len_np'])

597365

In [621]:
rdd_seq.take(1)

[[10, 20, 8, 16, 18, 19, 20]]

In [619]:
def parseable(r):
    r = r.replace('[', '').replace(']', '').split(' ')
    hits = find_noun_phrases(r)
    return all(start < end for start, end, _ in hits)
#     starts, ends, _ = zip(*sorted(hits, key=at(0)))
#     return all(end <= start for start, end in zip(starts[1:], ends[:-1]))
#     return all(any(start <= idx < end for start, end, _ in hits)
#                for idx, tok in enumerate(r) if tok in {'[entity]', '[NP_SIMPLE]', '[ROLE_SIMPLE]'})

uniq_seq.map(parseable).reduce(and_)

True

In [598]:
uniq_seq.filter(lambda r: not parseable(r)).take(10)

["Did [entity] [VP_SIMPLE] a [ADJECTIVE_SIMPLE] [ROLE_SIMPLE] of [entity] 's [ADJECTIVE_SIMPLE] [ROLE_SIMPLE]",
 "Was a [ROLE_SIMPLE] of [entity] 's [ROLE_SIMPLE] [VP_SIMPLE] by and [VP_SIMPLE] by [entity]",
 "Was a [ROLE_SIMPLE] of a [NP_SIMPLE] 's [ROLE_SIMPLE] 's [ROLE_SIMPLE] [entity] 's [ROLE_SIMPLE]",
 "What [ADJECTIVE_SIMPLE] [ROLE_SIMPLE] of [entity] 's [ROLE_SIMPLE] [VP_SIMPLE] [entity] and was [VP_SIMPLE] by [entity]",
 "What was [VP_SIMPLE] by a [ROLE_SIMPLE] of [entity] 's [ROLE_SIMPLE]",
 "Who did [entity] [VP_SIMPLE] , a [ROLE_SIMPLE] of [entity] 's [ROLE_SIMPLE] [VP_SIMPLE] , and [entity] and [entity] [VP_SIMPLE]",
 "Who was a [ADJECTIVE_SIMPLE] [ROLE_SIMPLE] of a [ADJECTIVE_SIMPLE] [ADJECTIVE_SIMPLE] [NP_SIMPLE] 's [ROLE_SIMPLE]",
 "Did a [ROLE_SIMPLE] of [entity] 's [ROLE_SIMPLE] [VP_SIMPLE] [entity] and [VP_SIMPLE] [entity]",
 "Did a [ROLE_SIMPLE] of [entity] 's [ROLE_SIMPLE] [VP_SIMPLE] [entity] 's [ROLE_SIMPLE] 's [ROLE_SIMPLE]",
 "Was a [ADJECTIVE_SIMPLE] [ADJECTIV

In [589]:
for hits, seq in uniq_seq.map(lambda r: find_noun_phrases(r.replace('[', '').replace(']', '').split(' '))).zip(uniq_seq).take(10):
    print(seq)
    for start, end, tree in hits:
        print(seq.split(' ')[start : end])
        tree.pretty_print()

Did [entity] 's [ROLE_SIMPLE] [VP_SIMPLE] [entity] , [VP_SIMPLE] a [NP_SIMPLE] , and [VP_SIMPLE] [entity]
['[entity]', "'s", '[ROLE_SIMPLE]']
        S             
        |              
        NP            
   _____|_______       
  NP   POS      N     
  |     |       |      
entity  's ROLE_SIMPLE

['[entity]']
  S   
  |    
  NP  
  |    
entity

['a', '[NP_SIMPLE]']
     S           
     |            
     NP          
  ___|______      
 DT         N    
 |          |     
 a      NP_SIMPLE

['[entity]']
  S   
  |    
  NP  
  |    
entity

Did [entity] [VP_SIMPLE] a [ADJECTIVE_SIMPLE] [ROLE_SIMPLE] of [entity] 's [ADJECTIVE_SIMPLE] [ROLE_SIMPLE]
['[entity]']
  S   
  |    
  NP  
  |    
entity

['a', '[ADJECTIVE_SIMPLE]', '[ROLE_SIMPLE]']
           S                        
           |                         
           NP                       
  _________|__________               
 |                    N             
 |          __________|_______       
 DT        

In [567]:
tree.pretty_print()

     S           
     |            
     NP          
  ___|______      
 DT         N    
 |          |     
 a      NP_SIMPLE



In [583]:
list(tree.subtrees())

[Tree('S', [Tree('NP', [Tree('DT', ['a']), Tree('N', ['NP_SIMPLE'])])]),
 Tree('NP', [Tree('DT', ['a']), Tree('N', ['NP_SIMPLE'])]),
 Tree('DT', ['a']),
 Tree('N', ['NP_SIMPLE'])]

In [587]:
def _mapper(hits):
    return sum(sum(1 for subtree in tree.subtrees() if subtree.label() == 'NP') for _, _, tree in hits)

In [592]:
hits = uniq_seq.map(lambda r: find_noun_phrases(r.replace('[', '').replace(']', '').split(' '))).cache()

In [594]:
count(hits.map(_mapper))

{1: 192, 2: 2300, 3: 5223, 4: 5869, 5: 3781, 6: 1262, 7: 230, 8: 10}

In [561]:
find_noun_phrases((uniq_seq.filter(lambda r: not parseable(r)).take(1)[0].replace('[', '').replace(']', '').split(' ')))

IndexError: list index out of range

In [551]:
uniq_seq.filter(lambda r: not parseable(r)).take(10)

["Did [entity] 's [ADJECTIVE_SIMPLE] [ADJECTIVE_SIMPLE] [ADJECTIVE_SIMPLE] [ROLE_SIMPLE] [VP_SIMPLE] [entity] and [VP_SIMPLE] [entity]",
 "Was [entity] a [ADJECTIVE_SIMPLE] [ADJECTIVE_SIMPLE] [ADJECTIVE_SIMPLE] [NP_SIMPLE] 's [ROLE_SIMPLE]",
 'Was [entity] a [ADJECTIVE_SIMPLE] [ADJECTIVE_SIMPLE] [NP_SIMPLE] that [entity] was [VP_SIMPLE] by and [entity] [VP_SIMPLE]',
 'Was a [ADJECTIVE_SIMPLE] [ADJECTIVE_SIMPLE] [ADJECTIVE_SIMPLE] [NP_SIMPLE] a [ROLE_SIMPLE] of [entity]',
 "What [NP_SIMPLE] did a [ADJECTIVE_SIMPLE] [NP_SIMPLE] 's [ADJECTIVE_SIMPLE] [ADJECTIVE_SIMPLE] [ROLE_SIMPLE] [VP_SIMPLE]",
 "What was a [ADJECTIVE_SIMPLE] [NP_SIMPLE] 's [ADJECTIVE_SIMPLE] [ADJECTIVE_SIMPLE] [ROLE_SIMPLE] 's [ROLE_SIMPLE]",
 "What was [VP_SIMPLE] by [entity] and [VP_SIMPLE] by a [ADJECTIVE_SIMPLE] [ADJECTIVE_SIMPLE] [NP_SIMPLE] 's [ADJECTIVE_SIMPLE] [ROLE_SIMPLE]",
 'Which [ADJECTIVE_SIMPLE] [NP_SIMPLE] was a [ADJECTIVE_SIMPLE] [ADJECTIVE_SIMPLE] [ROLE_SIMPLE] of [entity]',
 "Who [VP_SIMPLE] a [ADJEC

In [539]:
find_noun_phrases((uniq_seq.take(1)[0].replace('[', '').replace(']', '').split(' ')))

TclError: no display name and no $DISPLAY environment variable

Tree('S', [Tree('NP', [Tree('NP', ['entity']), Tree('POS', ["'s"]), Tree('N', ['ROLE_SIMPLE'])])])

In [509]:
grammar = nltk.CFG.fromstring("""
    S -> WHNP VP | WHNP 'did' NP V | 'Was' NP NP | 'Did' NP VP | 'Was' NP V PP | 'Were' NP V PP
    CC -> 'and'
    DT -> 'a' | 'an'
    JJ -> 'ADJECTIVE_SIMPLE' | JJ JJ
    N -> 'NP_SIMPLE' | 'ROLE_SIMPLE'
    NP -> 'entity' | NP CC NP | DT N | DT JJ N | NP POS N | NP POS JJ N | NP OF NP | NP 'that' NP V | NP 'that' VP | NP 'that' NP 'was' V P | NP 'that' NP VP | DT N V PP | DT JJ N V PP | DT N 'whose' N VP | DT N 'whose' JJ N VP
    V -> 'VP_SIMPLE' | V CC V
    VP -> 'was' NP | 'was' V PP | V NP | V PP | VP CC VP | VP ',' VP ',' 'and' VP | VP ',' VP ',' VP ',' 'and' VP | 'was' V 'by' 'and' V 'by' NP | 'was' V 'by' ',' V 'by' ',' 'and' V 'by' NP | 'was' V 'by' ',' V 'by' ',' V 'by' ',' 'and' V 'by' NP
    WHNP -> 'Who' | 'What' | 'What' N | 'Which' N | 'What' N OF NP | 'What' JJ N OF NP | 'What' N 'that' NP V | 'What' NP 'that' VP | 'Which' N OF NP | 'Which' JJ N | 'Which' JJ N OF NP | 'Which' N 'that' NP V | 'Which' NP 'that' VP
    OF -> 'of'
    POS -> "'s"
    P -> 'by'
    PP -> P NP
""")
parser = nltk.ChartParser(grammar)
# parser = nltk.ShiftReduceParser(grammar)

sent = 'Did [entity] [VP_SIMPLE] a [ADJECTIVE_SIMPLE] [NP_SIMPLE] that [entity] was [VP_SIMPLE] by and [entity] [VP_SIMPLE]'.replace('[', '').replace(']', '')
trees = list(parser.parse(sent.split(' ')))
for tree in trees:
    tree.pretty_print()

In [513]:
def parseable(r):
    try:
        return len(list(parser.parse(r.replace('[', '').replace(']', '').replace('were', 'was').split(' ')))) > 0
    except:
        return False

uniq_seq.map(parseable).sum()

10562

In [511]:
uniq_seq.filter(lambda r: not parseable(r)).take(1000)

['Did [entity] [VP_SIMPLE] a [ADJECTIVE_SIMPLE] [NP_SIMPLE] that [entity] were [VP_SIMPLE] by and [entity] [VP_SIMPLE]',
 'Did [entity] [VP_SIMPLE] a [NP_SIMPLE] that [entity] [VP_SIMPLE] , [entity] [VP_SIMPLE] , and [entity] was [VP_SIMPLE] by',
 "Did a [NP_SIMPLE] [VP_SIMPLE] by , [VP_SIMPLE] by , and [VP_SIMPLE] by [entity] [VP_SIMPLE] [entity] 's [ROLE_SIMPLE] 's [ROLE_SIMPLE]",
 'Did a [NP_SIMPLE] that a [NP_SIMPLE] was [VP_SIMPLE] by and [VP_SIMPLE] [VP_SIMPLE] [entity]',
 'Was [entity] a [NP_SIMPLE] that was [VP_SIMPLE] by and [VP_SIMPLE] a [NP_SIMPLE]',
 "Was [entity] [VP_SIMPLE] by [entity] 's [ROLE_SIMPLE] and [VP_SIMPLE] by [entity] 's [ROLE_SIMPLE] 's [ROLE_SIMPLE]",
 'Was [entity] a [ADJECTIVE_SIMPLE] [ADJECTIVE_SIMPLE] [NP_SIMPLE] that [entity] was [VP_SIMPLE] by and [entity] [VP_SIMPLE]',
 "Was [entity] 's [ROLE_SIMPLE] a [NP_SIMPLE] that a [NP_SIMPLE] was [VP_SIMPLE] by , [VP_SIMPLE] by , [VP_SIMPLE] by , and [VP_SIMPLE] by",
 'Was a [ADJECTIVE_SIMPLE] [ROLE_SIMPLE] of 

In [455]:
df.rdd.map(lambda r: r['questionTemplate']).distinct().count()

49320

In [464]:
two = lambda rule: f"{rule} 'and' {rule}"
grammar = nltk.CFG.fromstring(f"""
    S -> NP
    DT -> 'a' | 'an'
    JJ -> 'ADJECTIVE_SIMPLE'
    N -> 'NP_SIMPLE' | 'ROLE_SIMPLE' | JJ N
    NP -> 'entity' | {two('NP')} | DT N
""")

In [465]:
grammar.productions()

[S -> NP,
 DT -> 'a',
 DT -> 'an',
 JJ -> 'ADJECTIVE_SIMPLE',
 N -> 'NP_SIMPLE',
 N -> 'ROLE_SIMPLE',
 N -> JJ N,
 NP -> 'entity',
 NP -> NP 'and' NP,
 NP -> DT N]