# Produce new index from pipeline step
Copyright (C) 2021 ServiceNow, Inc.

This notebook produces files intended to be ingested by a search engine as the search index. 
It requires a processed metadata file, produced by the `Metadata Analysis.ipynb` notebook.

Given an existing dataset, and output files from a desired step of the preprocessing cleaning pipeline, this notebook will produce a new dataset (the index) in a specified folder. This will contain one file for every file from the preprocessing step, associated with its metadata name. 

It will also produce a new metadata index, which contains the metadata associated with all files in a json format useful for a search engine. 

In [1]:
import sys
sys.path.append('../..')
import nrcan_p2.metadata_processing.read_metadata as read_metadata

## Load the metadata

In [3]:
schema_file = '/nrcan_p2/data/01_raw/20201006/geoscan/geoscan_flex.xsd'
xml_file_large = '/nrcan_p2/data/01_raw/20201006/geoscan/GEOSCAN-extract-20200211144755.xml'
df_s_large = read_metadata.convert_xml_to_dataframe_and_save(xml_schema=schema_file, xml_file=xml_file_large,
                                       validate=False, ITEM_PREFIX='{https://geoscan.nrcan.gc.ca/schema/osdp_feed/1.0/}', 
                                                             SUB_KEY_PREFIX='{http://purl.org/dc/elements/1.1/}')

Loading existing df file...


In [5]:
import pandas as pd
# output_large = '/nrcan_p2/data/01_raw/20201006/geoscan/GEOSCAN-extract-20200211144755.xml_processed.parquet'
# meta_data = pd.read_parquet(output_large)

output_small = '/nrcan_p2/data/01_raw/20201006/geoscan/EAIDown.xml_processed_Feb29.parquet'
meta_data = pd.read_parquet(output_small)

In [6]:
meta_data.columns

Index(['{http://purl.org/dc/elements/1.1/}contributor',
       '{http://purl.org/dc/elements/1.1/}title_en',
       '{http://purl.org/dc/elements/1.1/}creator',
       '{http://purl.org/dc/elements/1.1/}subject_en',
       '{http://purl.org/dc/elements/1.1/}subject_fr',
       '{http://purl.org/dc/elements/1.1/}source_en',
       '{http://purl.org/dc/elements/1.1/}source_fr',
       '{http://purl.org/dc/elements/1.1/}description_en',
       '{http://purl.org/dc/elements/1.1/}description_fr',
       '{http://purl.org/dc/elements/1.1/}date',
       '{http://purl.org/dc/elements/1.1/}type_en',
       '{http://purl.org/dc/elements/1.1/}format_en',
       '{http://purl.org/dc/elements/1.1/}format_fr',
       '{http://purl.org/dc/elements/1.1/}format',
       '{http://purl.org/dc/elements/1.1/}identifier_geoscanid',
       '{http://purl.org/dc/elements/1.1/}identifier_en',
       '{http://purl.org/dc/elements/1.1/}identifier_fr',
       '{http://purl.org/dc/elements/1.1/}identifier_info',
  

## Utility function for converting a row of the metadata into dict format

In [7]:
import numpy as np
def convert_metadata_row_to_dict(row):
    # id, abstract, title, keyword, author, link, date, location
    data = {}
    
    metadata_id = row['{http://purl.org/dc/elements/1.1/}identifier_geoscanid']
    assert len(metadata_id) == 1
    metadata_id = metadata_id[0]
    data['metadata_id'] = str(metadata_id)
    
    abstract = row['desc_en_en']
    if abstract is not None:
        data['abstract'] = abstract
        
    title = row['title_merged']
    if title is not None:
        data['title'] = title
        
    keywords = [x for x in row['{http://purl.org/dc/elements/1.1/}subject_en'] if x is not None]
    if len(keywords) > 0:
        data['keywords'] = keywords
        
    authors = [x for x in row['{http://purl.org/dc/elements/1.1/}creator'] if x is not None]
    if len(authors) > 0:
        data['authors'] = authors
        
    links_en = row['{http://purl.org/dc/elements/1.1/}identifier_en']
    links_none = row['{http://purl.org/dc/elements/1.1/}identifier']
    links = []
    if links_en is not None: 
        links.extend(links_en)
    if links_none is not None:
        links.extend(links_none)
    if len(links) > 0:
        data['links'] = links
        
    date = row['{http://purl.org/dc/elements/1.1/}date']
    assert len(date) == 1
    date = date[0]
    if date is not None:
        data['date'] = date
        
        
    location_poly = row['{http://purl.org/dc/elements/1.1/}coverage']
    if location_poly is not None:
        assert type(location_poly) in [list, np.ndarray]
        location_poly = [x for x in location_poly if x is not None]
        assert all([type(x) == str for x in location_poly])
        
    location_en = row['{http://purl.org/dc/elements/1.1/}coverage_en']
    if location_en is not None:
        assert type(location_en) in [list, np.ndarray]
        location_en = [x for x in location_en if x is not None]
        assert all([type(x) == str for x in location_en])
        
        
    if location_poly is not None or location_en is not None:
        data['location'] = {}
        if location_poly is not None:
            data['location']['polygons'] = location_poly
        if location_en is not None:
            data['location']['desc'] = location_en
            
    return data

In [9]:
for irow, row in meta_data.iloc[0:3].iterrows():
    display(convert_metadata_row_to_dict(row))

{'metadata_id': '407',
 'title': 'Rock-Eval/TOC data for ten southwest Alberta wells (townships 16 to 30, ranges 2 to 10W5)',
 'keywords': ['wells',
  'lithology',
  'thermal maturation',
  'hydrocarbon migration',
  'hydrocarbon generation',
  'hydrocarbon potential',
  'hydrocarbons',
  'Elbow River 11-15-20-7W5 well',
  'Futurity Gap 16-17-24-9W5 well',
  'Harmattan East 7-9-33-3W5 well',
  'West Hunter Valley 11-16-29-9W5 well',
  'Jumpingpound Creek 7-4-25-5W5 well',
  'Panther River 9-19-30-10W5 well',
  'Sheep 5-29-18-3W5 well',
  'Stimson Creek 9-36-16-4W5 well',
  'Sullivan 6-15-18-5W5 well',
  'Turner Valley 16-14-18-2W5 well',
  'rock-eval analyses',
  'total organic carbon',
  'fossil fuels'],
 'authors': ['Watson, C',
  'Jayachandran, P T',
  'Spanswick, E',
  'Donovan, E F',
  'Danskin, D W'],
 'links': ['https://geoscan.nrcan.gc.ca/starweb/geoscan/servlet.starweb?path=geoscan/fulle.web&search1=R=407',
  'https://ftp.geogratis.gc.ca/pub/nrcan_rncan/publications/ess_sst/0/

{'metadata_id': '4680',
 'title': 'Catalogue of types and figured specimens of fossil plants in Geological Survey of Canada collections [Megaplant Supplement 1963-1967]',
 'keywords': ['fossil distribution, geographic',
  'fossil lists',
  'Invertebrata',
  'paleontology'],
 'authors': ['Bell, W A'],
 'links': ['https://geoscan.nrcan.gc.ca/starweb/geoscan/servlet.starweb?path=geoscan/fulle.web&search1=R=4680',
  'https://ftp.geogratis.gc.ca/pub/nrcan_rncan/publications/ess_sst/4/4680/gid_4680.pdf'],
 'date': '1969'}

{'metadata_id': '4681',
 'title': 'Catalogue of types and figured specimens of fossil plants in the Geological Survey of Canada collections',
 'keywords': ['fossil distribution, geographic',
  'fossil lists',
  'Invertebrata',
  'paleontology'],
 'authors': ['Bell, W A'],
 'links': ['https://geoscan.nrcan.gc.ca/starweb/geoscan/servlet.starweb?path=geoscan/fulle.web&search1=R=4681',
  'https://ftp.geogratis.gc.ca/pub/nrcan_rncan/publications/ess_sst/4/4681/gid_4681.pdf'],
 'date': '1962'}

## Write the metadata to the output folder

In [10]:
BASE_OUTPUT_DIR = '/nrcan_p2/data/03_primary/metadata/json'

In [12]:
import datetime
import pathlib 

now = datetime.datetime.now().strftime("%d-%m-%Y-%H-%M-%S")
OUTPUT_DIR = pathlib.Path(BASE_OUTPUT_DIR) / f'metadata_{now}'
print(OUTPUT_DIR)

if not OUTPUT_DIR.exists():
    OUTPUT_DIR.mkdir(parents=False, exist_ok=False)
    
SOURCE_FILE = OUTPUT_DIR / "source.txt"
with open(SOURCE_FILE, 'w') as f:
    f.write(output_small)

/nrcan_p2/data/03_primary/metadata/json/metadata_04-03-2021-19-18-48


In [14]:
import tqdm
import json

JSON_OUTPUT_DIR = OUTPUT_DIR / "json"
if not JSON_OUTPUT_DIR.exists():
    JSON_OUTPUT_DIR.mkdir(parents=False, exist_ok=False)
    
print(JSON_OUTPUT_DIR)
    
for irow, row in tqdm.tqdm(meta_data.iterrows(), total=meta_data.shape[0]):
    row_as_dict = convert_metadata_row_to_dict(row)
    
    fname = JSON_OUTPUT_DIR / f'geoscan_{row_as_dict["metadata_id"]}'
    with open(fname, 'w') as f:
        json.dump(row_as_dict, f, indent=4)

  0%|          | 13/12316 [00:00<01:48, 113.63it/s]

/nrcan_p2/data/03_primary/metadata/json/metadata_04-03-2021-19-18-48/json


100%|██████████| 12316/12316 [02:12<00:00, 93.16it/s] 


## Gather the datafiles 

### Examine the config to determine which pipeline to apply

In [15]:
CONFIG_FILE = '/nrcan_p2/data/03_primary/v4/all_text_PIPELINE_GLOVE_PLUS_POSTPIPE_GLOVE_dA_full_v1.config'

In [16]:
import yaml
with open(CONFIG_FILE) as f:
    config = yaml.load(f)
    
config


built_name = ''
for i, (func, ifunc) in enumerate(zip(config['preprocessing_functions'], config['preprocessing_functions_mapped'])):
    print(i, ifunc, func)
    
    if built_name == '':
        built_name = f'{i}'
    else:
        built_name = f'{built_name}__{ifunc}'
    print(built_name)

0 0 rm_dbl_space
0
1 1 rm_cid
0__1
2 2 convert_to_ascii
0__1__2
3 3 rm_nonprintable
0__1__2__3
4 4 filter_no_letter
0__1__2__3__4
5 6 rm_newline_hyphenation
0__1__2__3__4__6
6 14 rm_newline
0__1__2__3__4__6__14
7 16 filter_no_real_words_g3letter
0__1__2__3__4__6__14__16
8 26 filter_with_email
0__1__2__3__4__6__14__16__26
9 27 rm_url
0__1__2__3__4__6__14__16__26__27
10 28 rm_doi
0__1__2__3__4__6__14__16__26__27__28
11 29 filter_with_phonenumber
0__1__2__3__4__6__14__16__26__27__28__29
12 30 filter_non_english
0__1__2__3__4__6__14__16__26__27__28__29__30
13 33 add_space_to_various_punct
0__1__2__3__4__6__14__16__26__27__28__29__30__33
14 34 squish_punct
0__1__2__3__4__6__14__16__26__27__28__29__30__33__34
15 35 squish_spaced_punct_no_bracket
0__1__2__3__4__6__14__16__26__27__28__29__30__33__34__35
16 36 filter_g10_punct
0__1__2__3__4__6__14__16__26__27__28__29__30__33__34__35__36
17 37 filter_insufficient_real_words
0__1__2__3__4__6__14__16__26__27__28__29__30__33__34__35__36__37
18 38 m

  config = yaml.load(f)


## Select a pipeline and create the index

In [17]:
DATA_BASE_DIR = '/nrcan_p2/data/03_primary/v4/'
DATA_INPUT_DIR = pathlib.Path(DATA_BASE_DIR) / '0__1__2__3' #0__1__2__3__4__6__14__16__26__27__28__29__30'

In [18]:
DATA_OUTPUT_DIR_BASE = '/nrcan_p2/data/03_primary/index_data'
now = datetime.datetime.now().strftime("%d-%m-%Y-%H-%M-%S")
DATA_OUTPUT_DIR = pathlib.Path(DATA_OUTPUT_DIR_BASE) / f'index_{now}'
print(DATA_OUTPUT_DIR)

if not DATA_OUTPUT_DIR.exists():
    DATA_OUTPUT_DIR.mkdir(parents=False, exist_ok=False)
    
if not (DATA_OUTPUT_DIR/'json').exists():
    (DATA_OUTPUT_DIR/ 'json').mkdir(parents=False,exist_ok=False)
    
SOURCE = DATA_OUTPUT_DIR / "source.txt"
with open(SOURCE, 'w') as f:
    f.write(str(DATA_INPUT_DIR))

/nrcan_p2/data/03_primary/index_data/index_04-03-2021-19-22-12


In [19]:
from collections import defaultdict

def collect_file_list_from_folder(DATA_INPUT_DIR):
    ids = []
    ids_dict = defaultdict(int)
    for input_file in DATA_INPUT_DIR.iterdir():
        #print(input_file)
        fid = pathlib.Path(input_file.stem).stem
        fid_split = fid.split('_', 1)
        if len(fid_split) == 1:
            multi = False
        else:
            multi = True

        ids_dict[fid_split[0]] += 1

        if multi: 
            ids.append((fid, multi, fid_split[0], fid_split[1], input_file))
        else:
            ids.append((fid, multi, fid_split[0], None, input_file))

    return ids_dict, ids

ids_dict, ids = collect_file_list_from_folder(DATA_INPUT_DIR)

In [20]:
from collections import defaultdict

names = []
for fname, multi, fid, subname, input_file in tqdm.tqdm(ids, total=len(ids)):
    
    if ids_dict[fid] > 1:
        assert(multi == True)
        name = f'geoscan_{fid}__multi__{subname}.txt'
    else:
        name = f'geoscan_{fid}.txt'
        
    try:
        data = pd.read_csv(input_file, dtype={'processed_text':"str"}, usecols=['processed_text'])
        data = data.dropna(subset=['processed_text'])
    except Exception as e:
        print(e)
        print(input_file)
        raise(e)
    #display(data)
    
    text = "\n".join(data['processed_text'].tolist())
    #print(text)
        
    fname_out = DATA_OUTPUT_DIR / 'json' / name
    names.append(fname_out)
    #break
    with open(fname_out, 'w') as f:
        f.write(text)
print(len(names))

100%|██████████| 12216/12216 [12:09<00:00, 16.74it/s]

12216



