# imports and data montage

In [1]:
!pip install --upgrade pandas
!pip install --upgrade pyarrow



In [2]:
from google.colab import drive
import pandas as pd
import pyarrow as pa
import pyarrow.parquet as pq
import pyarrow.dataset as ds
import numpy as np
import math
from tqdm import tqdm
import logging

pd.__version__

'1.3.4'

In [3]:
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [4]:
log_path = ''
logging.basicConfig(level=logging.DEBUG, filename=log_path)

# read files

In [5]:
path_to_attr = '/content/drive/MyDrive/Project datasets/speaker_attributes.parquet'
path_to_quotbank_desc = '/content/drive/MyDrive/Project datasets/wikidata_labels_descriptions_quotebank.csv.bz2'
path_to_quotbank = '/content/drive/MyDrive/Quotebank/quotes-2020.json.bz2' 

In [6]:
paths_to_quotbank = [f'/content/drive/MyDrive/Quotebank/quotes-20{i}.json.bz2' for i in range(15, 21)]

In [7]:
columns = ['date_of_birth', 'nationality', 'gender', 'ethnic_group', 'occupation', 'party', 'academic_degree', 'id', 'candidacy', 'religion']
attr = pd.read_parquet(path_to_attr, columns=columns)

In [8]:
descriptions = pd.read_csv(path_to_quotbank_desc, compression='bz2', index_col='QID') 

### Exceptions

In [9]:
exeptions = ['Q3268166', 'Q11815360', 'Q12014399', 'Q16287483', 'Q20432251',
             'Q21550646', 'Q13365117', 'Q13424794', 'Q1248362', 'Q3186984',
             'Q6859927', 'Q15145782', 'Q15991263', 'Q99753484', 'Q12455619',
             'Q5568256', 'Q6363085', 'Q11819457', 'Q12334852', 'Q15145783']

# Process data

## Parse function

In [10]:
def parse_in_chunks(chunk_size, path_to_save, path_to_save_exceptions, descriptions, exeptions, path_to_quotbank):
  quotebank_reader = pd.read_json(path_to_quotbank, lines=True, compression='bz2', chunksize=chunk_size)
  columns = ['nationality', 'gender', 'ethnic_group', 'occupation', 'party', 'academic_degree', 'candidacy', 'religion'] 
  path = path_to_save 

  for i, quote in tqdm(enumerate(quotebank_reader)):
    #multiple people in one quote
    quote = quote.explode('qids')

    #marge with atributes
    quote = quote.merge(attr, how='left', left_on='qids', right_on='id', indicator=True)

    #get not in description file quotes 
    not_in_desc_quotes_mask = quote.applymap(lambda x:  np.isin(x,exeptions).any() if isinstance(x,np.ndarray) else x in exeptions).any(axis=1)
    not_in_desc_quotes = quote[not_in_desc_quotes_mask.values]

    #get in description file quotes by negation
    quote = quote[~not_in_desc_quotes_mask.values]

    #decript values
    try:
      quote[columns] = quote[columns].applymap(lambda x: descriptions.loc[x]['Label'].values[0], na_action='ignore')
    except:
      logging.exception(f"on chunk {i} after {i*chunk_size}:")
      print(f"error {i}")
      continue

    #unify NaN to None
    quote = quote.where(pd.notnull(quote), None)
    not_in_desc_quotes = not_in_desc_quotes.where(pd.notnull(not_in_desc_quotes), None)

    #save quotes
    table = pa.Table.from_pandas(quote, preserve_index=True)         
    pq.write_table(table, path + f"/{i}" + ".parquet")

    #save not in description
    table = pa.Table.from_pandas(not_in_desc_quotes, preserve_index=True)         
    pq.write_table(table, path_to_save_exceptions + f"/{i}" + ".parquet")

    if i % 10 == 0:
      print((i+1)*chunk_size)

In [None]:
%%time
for path in paths_to_quotbank:
  parse_in_chunks(100000,"/content/drive/MyDrive/parsed_quotes2/no_exception", "/content/drive/MyDrive/parsed_quotes2/exception", descriptions, exeptions, path)

1it [05:44, 344.05s/it]

100000


9it [52:54, 355.78s/it]

### Check 

In [None]:
# dataset = ds.dataset("/content/drive/MyDrive/quotes/quotes-2020-enhanced10000", format="parquet")
# print(dataset.files)
# df = dataset.head(100).to_pandas()
# df.info()


In [None]:
# df.head()