In [1]:
import os
import json
from collections import defaultdict
from preprocess_func import *
from tqdm import tqdm

In [2]:
data_dir = "/harddisk/data/nlp_data/kb/wikipedia/20220620/enwiki-20220620/output/blocks.ann/"
files = os.listdir(data_dir)
data = []
for file in tqdm(files):
    with open(os.path.join(data_dir, file)) as f:
        line = f.readline()
        while line:
            data.append(line)
            line = f.readline()
    break

  0%|          | 0/2216 [00:00<?, ?it/s]


In [3]:
property_mapping_file_path = "/harddisk/data/nlp_data/kb/wikidata/20210520/mapping/property_names.json"
entity_mapping_file_path = "/harddisk/data/nlp_data/kb/wikidata/20210520/mapping/qid2sitelinks.enwiki.title.json"
mongodb_config = {"host": '10.12.192.31', "port": 27017}

In [4]:
preprocess = Preprocess(mongodb_config, entity_mapping_file_path,property_mapping_file_path,entity_mapping_file_path)

In [5]:
preprocess(data[20])

{'id': '29900346',
 'title': 'Neva (community), Wisconsin',
 'cleaned_text': ['Neva is an unincorporated community located in the town of Neva, Langlade County, Wisconsin, United States.',
  'Neva is north-northeast of Antigo.'],
 'ents': [[{'end': 35,
    'id': '232346',
    'start': 11,
    'text': 'unincorporated community',
    'title': 'Unincorporated area',
    'qid': 'Q269528',
    'description': {'english': 'region of land not governed by own local government'},
    'aliases': {'english': ['unincorporated territory',
      'non-municipal area',
      'non-municipal territory',
      'unincorporated place',
      'unincorporated community']},
    'type': []},
   {'end': 63,
    'id': '139277',
    'start': 59,
    'text': 'Neva',
    'title': 'Neva, Wisconsin',
    'qid': 'Q2224982',
    'description': {'english': 'human settlement in Langlade County, Wisconsin, United States of America'},
    'aliases': {},
    'type': []},
   {'end': 80,
    'id': '90960',
    'start': 65,
   

In [4]:
def initialize():
    global preprocess

def run(line):
    return preprocess(line)

In [None]:
from multiprocessing.pool import Pool

outputs = []
with Pool(64, initializer=initialize) as pool:
    for output in tqdm(pool.imap_unordered(run, data)):
        outputs.append(output)

In [1]:
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, LongType
from preprocess_func import *
import json

In [2]:
spark = SparkSession.builder\
    .appName("Pretrained Data Pipeline")\
    .master("local[*]")\
    .getOrCreate()
sc = spark.sparkContext
sc.setLogLevel("ERROR")

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


23/02/25 02:49:50 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [3]:
src_files = "/harddisk/user/keminglu/pretrained_data_wikipedia_kb_ent_type/*"
raw_data = sc.textFile(src_files)

In [4]:
raw_data.count()

                                                                                

6510960

In [5]:
data = raw_data.map(json.loads)
statistics = data.map(lambda x: (sum(map(len, x['cleaned_text'])), x['n_ents'], x['n_mapped_ent'], x['n_rel'], x['n_rel_pair']))
customSchema = StructType([  
    StructField('n_char', LongType(), True),     
    StructField('n_ent', LongType(), True),
    StructField('n_mapped_ent', LongType(), True),
    StructField('n_rel', LongType(), True),
    StructField('n_rel_pair', LongType(), True)
])
stat_df = spark.createDataFrame(statistics, schema=customSchema)
stat_df.printSchema()

root
 |-- n_char: long (nullable = true)
 |-- n_ent: long (nullable = true)
 |-- n_mapped_ent: long (nullable = true)
 |-- n_rel: long (nullable = true)
 |-- n_rel_pair: long (nullable = true)



In [6]:
stat_df.summary().show()

[Stage 3:>                                                          (0 + 1) / 1]

+-------+------------------+------------------+------------------+------------------+------------------+
|summary|            n_char|             n_ent|      n_mapped_ent|             n_rel|        n_rel_pair|
+-------+------------------+------------------+------------------+------------------+------------------+
|  count|           6510960|           6510960|           6510960|           6510960|           6510960|
|   mean|258.56548803863024| 4.199946398073402| 4.017575595611093|2.8682862435032623|2.4621025163723935|
| stddev|192.97596523221156|3.1485174981289337|3.0089373338434915|6.1472581536658275|5.0221214668301295|
|    min|                 0|                 0|                 0|                 0|                 0|
|    25%|               122|                 2|                 2|                 0|                 0|
|    50%|               209|                 4|                 4|                 1|                 1|
|    75%|               344|                 6|        

                                                                                

In [None]:
data.filter(lambda x: x['n_rel'] == 1452).take(1)

In [55]:
empty_sample = data.filter(lambda x:  x['n_rel'] == 0)
empty_sample.count()

                                                                                

2816678

In [56]:
empty_sample.map(lambda x: x['cleaned_text']).map(lambda x: sum(map(len, x))).mean()

                                                                                

207.47491761571604

In [29]:
max_ent_num = 20
max_rel_num = 20
filtered_data = data.filter(lambda x: 0 < x['n_ents'] <= max_ent_num and x['n_rel'] <= max_rel_num)

In [30]:
filtered_data.count()

                                                                                

5871084

In [5]:
from transformers import BloomTokenizerFast

None of PyTorch, TensorFlow >= 2.0, or Flax have been found. Models won't be available and only tokenizers, configuration and file/data utilities can be used.


In [12]:
max_ent_num = 20
max_rel_num = 20
max_length = 2048
tokenizer = BloomTokenizerFast.from_pretrained("bigscience/bloom")
output_file = "/harddisk/user/keminglu/pretrained_data_processed/kb_ent_type/model_input.json"

def token_length(text, tokenizer):
    return len(tokenizer(text)['input_ids'])

data = raw_data.map(json.loads)\
    .filter(lambda x: 0 < x['n_ents'] <= max_ent_num and x['n_rel'] <= max_rel_num)\
    .map(lambda sample: transform(sample, key_map))\
    .filter(lambda sample: token_length(sample["inputs"] + " " + sample["targets"], tokenizer) > max_length)\
    .collect()

with open(output_file, "w") as f:
    for sample in tqdm(data):
        f.write(json.dumps(sample) + "\n")

None of PyTorch, TensorFlow >= 2.0, or Flax have been found. Models won't be available and only tokenizers, configuration and file/data utilities can be used.
None of PyTorch, TensorFlow >= 2.0, or Flax have been found. Models won't be available and only tokenizers, configuration and file/data utilities can be used.
None of PyTorch, TensorFlow >= 2.0, or Flax have been found. Models won't be available and only tokenizers, configuration and file/data utilities can be used.
None of PyTorch, TensorFlow >= 2.0, or Flax have been found. Models won't be available and only tokenizers, configuration and file/data utilities can be used.
None of PyTorch, TensorFlow >= 2.0, or Flax have been found. Models won't be available and only tokenizers, configuration and file/data utilities can be used.
None of PyTorch, TensorFlow >= 2.0, or Flax have been found. Models won't be available and only tokenizers, configuration and file/data utilities can be used.
None of PyTorch, TensorFlow >= 2.0, or Flax ha

PermissionError: [Errno 13] Permission denied: '/harddisk/user/keminglu/pretrained_data_processed/kb_ent_type/model_input.json'

In [33]:
token_lengths = filtered_data.map(lambda sample: transform(sample, key_map)).map(lambda sample: token_length(sample[0] + " " + sample[1], tokenizer))

In [34]:
token_lengths.mean()

None of PyTorch, TensorFlow >= 2.0, or Flax have been found. Models won't be available and only tokenizers, configuration and file/data utilities can be used.
None of PyTorch, TensorFlow >= 2.0, or Flax have been found. Models won't be available and only tokenizers, configuration and file/data utilities can be used.
None of PyTorch, TensorFlow >= 2.0, or Flax have been found. Models won't be available and only tokenizers, configuration and file/data utilities can be used.
None of PyTorch, TensorFlow >= 2.0, or Flax have been found. Models won't be available and only tokenizers, configuration and file/data utilities can be used.
None of PyTorch, TensorFlow >= 2.0, or Flax have been found. Models won't be available and only tokenizers, configuration and file/data utilities can be used.
None of PyTorch, TensorFlow >= 2.0, or Flax have been found. Models won't be available and only tokenizers, configuration and file/data utilities can be used.
None of PyTorch, TensorFlow >= 2.0, or Flax ha

516.81152390257

In [None]:
max_length = 2048
token_lengths.filter(lambda x: x > max_length).count()