In [None]:
import json
import numpy as np
import collections
import copy
from os import listdir
from os.path import isfile, join

In [None]:
import pickle

In [None]:
import findspark

findspark.init()
from pyspark import SparkContext
import pyspark

conf = pyspark.SparkConf().setAll(
    [
        ("spark.executor.memory", "8g"),
        ("spark.executor.cores", "2"),
        ("spark.executor.instances", "7"),
        ("spark.driver.memory", "32g"),
        ("spark.driver.maxResultSize", "10g"),
    ]
)
sc = SparkContext(conf=conf)

In [None]:
from pyspark.sql import functions as F
from pyspark.sql.types import ArrayType, FloatType, StringType
from pyspark.sql.types import Row
from pyspark.sql import SparkSession

spark = SparkSession(sc)

In [None]:
def convert_ndarray_back(x):
    x["entityCell"] = np.array(x["entityCell"])
    return x


data_dir = "../../data/"
train_tables = sc.textFile(data_dir + "train_tables.jsonl").map(lambda x: convert_ndarray_back(json.loads(x.strip())))

In [None]:
def get_table_entity(x):
    entities = []
    valid_rows = set()
    for i, j in zip(*x["entityCell"].nonzero()):
        entities.append(
            Row(
                t_id=x["_id"],
                entity=x["tableData"][i][j]["surfaceLinks"][0]["target"]["id"],
                c_id=int(j),
                c_name=x["processed_tableHeaders"][j],
                r_id=int(i),
            )
        )
        valid_rows.add(i)
    #     for i in valid_rows:
    #         if x['pgId']!=-1:
    #             entities.append(Row(
    #                 t_id=x["_id"],
    #                 entity=x['pgId'],
    #                 c_id=-1,
    #                 c_name='[TITLE]',
    #                 r_id=int(i)
    #             ))
    return entities

In [None]:
from operator import add

In [None]:
from pyspark.ml.feature import Tokenizer, StopWordsRemover

In [None]:
entity_df = spark.createDataFrame(train_tables.flatMap(get_table_entity))

In [None]:
entity_df.show()

In [None]:
row_e2e = entity_df.selectExpr('r_id','t_id','entity as e1').filter(F.col('c_id')==0).join(\
                entity_df.withColumnRenamed('entity','e2').filter(F.col('c_id')!=0),\
                ['r_id','t_id'],'inner')\

In [None]:
row_e2e.show()

In [None]:
h2h = (
    row_e2e.selectExpr("t_id as t1", "e1", "e2", "c_name as h1")
    .join(row_e2e.selectExpr("t_id as t2", "e1", "e2", "c_name as h2"), ["e1", "e2"], "inner")
    .select("t1", "t2", "h1", "h2")
    .dropDuplicates()
    .groupBy(["h1", "h2"])
    .agg(F.count("t1").alias("count"))
    .filter(F.col("h1") != F.col("h2"))
)

In [None]:
with open("../../data/n_h2h.pkl", "wb") as f:
    h2h_local = {}
    for h1, h2, count in h2h.rdd.map(lambda x: (x["h1"], x["h2"], x["count"])).collect():
        if h1 not in h2h_local:
            h2h_local[h1] = {}
        h2h_local[h1][h2] = count
    pickle.dump(h2h_local, f)

In [None]:
row_e2e = row_e2e.rdd.map(lambda x: (x["e1"], [[x["t_id"], x["c_id"], x["c_name"], x["e2"]]])).reduceByKey(add)

In [None]:
with open("../../data/e2e_row.json", "w") as f:
    json.dump(dict(row_e2e.collect()), f, indent=2)

In [None]:
row_e2e.take(2)

In [None]:
e2column = entity_df.rdd.map(lambda x: (x["entity"], [[x["t_id"], x["c_id"]]])).reduceByKey(add)
table_column2e = entity_df.rdd.map(lambda x: ("%s-%d" % (x["t_id"], x["c_id"]), [x["entity"]])).reduceByKey(add)

In [None]:
with open("../../data/e2column.json", "w") as f:
    json.dump(dict(e2column.collect()), f, indent=2)
with open("../../data/table_column2e", "w") as f:
    json.dump(dict(table_column2e.collect()), f, indent=2)

In [None]:
table_column2e.take(10)

In [None]:
e2column.take(10)