# ETL and ingestion of data

In [None]:
import os
import re
import json


import pandas as pd
import numpy as np
import matplotlib.pyplot as plt

from pathlib import Path

COLOR = 'white'
plt.rcParams['text.color'       ] = COLOR
plt.rcParams['text.color'       ] = COLOR
plt.rcParams['axes.labelcolor'  ] = COLOR
plt.rcParams['xtick.color'      ] = COLOR
plt.rcParams['ytick.color'      ] = COLOR

# IPM data - December 2021 Scrape

In [None]:
_PATH = Path('../data/uc-ipm/scrape_cleaned_Dec2021')
DATA_FILE_NAMES = sorted(_PATH.iterdir())
[data_file.name for data_file in DATA_FILE_NAMES]

The list of files should be as following:
```python
['exoticPests.json',
 'fruitItems_new.json',
 'fruitVeggieEnvironItems_new.json',
 'pestDiseaseItems_new.json',
 'plantFlowerItems.json',
 'turfPests.json',
 'veggieItems_new.json',
 'weedItems.json']
```

The corresponding ETL for these sources (links):
* [`exoticPests.json`](#exoticpestsjson)
* [`fruitItems_new.json`](#fruititems_newjson)
* [`fruitVeggieEnvironItems_new.json`](#fruitveggieenvironitems_newjson)
* [`pestDiseaseItems_new.json`](#pestdiseaseitems_newjson)
* [`plantFlowerItems.json`](#plantfloweritemsjson)
* [`turfPests.json`](#turfpestsjson)
* [`veggieItems_new.json`](#veggieitems_newjson)
* [`weedItems.json`](#weeditemsjson)

## ETL of data

In [None]:
def update_key(row, field, subfield):
    '''
    Add the title to the subfield text.
    '''
    for link in row[field]:
        link['title'] = link.pop(subfield)
        if len(link['title']) > 0:
            link['title'] = row['title'] + ' - ' + link['title']

def clean(text):
    '''
    Fix encodings and remove escape and redundant whitespace characters from text.
    '''
    text = text.encode('ascii', 'ignore').decode()
    text = re.sub(r'\s+', ' ', text).strip()
    return text


### `exoticPests.json`
<a id='exoticpestsjson'></a>

In [None]:
def pestsExotic():
    # -------------------------------------------- Exotic pests
    FILE_NAME = 'exoticPests.json'
    print(f'Transforming "{FILE_NAME}"...')
    df = pd.read_json(Path.joinpath(_PATH, FILE_NAME))
    
    df['source'] = 'ucipm'

    df.rename(columns = {'name': 'title'}, inplace = True)

    df['images'] = df['images'].apply(lambda d: d if isinstance(d, list) else [])
    df.apply(lambda r: update_key(r, 'images', 'caption'), axis = 1)
    
    df['related_links'] = df['related_links'].apply(lambda d: d if isinstance(d, list) else [])
    df.apply(lambda r: update_key(r, 'related_links', 'text'), axis = 1)
    
    for col in df.columns[df.applymap(lambda x: isinstance(x, str)).all(0)]:
        df[col] = df[col].apply(clean)

    return df

pestsExotic().to_json('test.json', orient='records')

### `fruitItems_new.json`
<a id='fruititems_newjson'></a>

In [None]:
def infoFruits():
    # -------------------------------------------- Fruits information
    FILE_NAME = 'fruitItems_new.json'
    print(f'Transforming "{FILE_NAME}"...')
    df = pd.read_json(Path.joinpath(_PATH, FILE_NAME))
    
    df['source'] = 'ucipm'
    
    df.rename(columns = {'name': 'title'}, inplace = True)

    df['cultural_tips'] = df['cultural_tips'].apply(lambda d: d if isinstance(d, list) else [])
    df.apply(lambda r: update_key(r, 'cultural_tips', 'tip'), axis = 1)
    
    df['pests_and_disorders'] = df['pests_and_disorders'].apply(lambda d: d if isinstance(d, list) else [])
    df.apply(lambda r: update_key(r, 'pests_and_disorders', 'problem'), axis = 1)

    for col in df.columns[df.applymap(lambda x: isinstance(x, str)).all(0)]:
        df[col] = df[col].apply(clean)

    return df

infoFruits().to_json('test.json', orient='records')

### `fruitVeggieEnvironItems_new.json`
<a id='fruitveggieenvironitems_newjson'></a>

In [None]:
def damagesEnvironment():
    # -------------------------------------------- Fruit and veggie damages
    FILE_NAME = 'fruitVeggieEnvironItems_new.json'
    print(f'Transforming "{FILE_NAME}"...')
    df = pd.read_json(Path.joinpath(_PATH, FILE_NAME))
    
    df['source'] = 'ucipm'

    df.rename(columns = {'name': 'title',}, inplace = True)

    df['images'] = df['images'].apply(lambda d: d if isinstance(d, list) else [])
    df.apply(lambda r: update_key(r, 'images', 'caption'), axis = 1)


    for col in df.columns[df.applymap(lambda x: isinstance(x, str)).all(0)]:
        df[col] = df[col].apply(clean)

    return df

damagesEnvironment().to_json('test.json', orient='records')

### `pestDiseaseItems_new.json`
<a id='pestdiseaseitems_newjson'></a>

In [None]:
def pestsDiseases():
    # -------------------------------------------- Pests diseases
    FILE_NAME = 'pestDiseaseItems_new.json'
    print(f'Transforming "{FILE_NAME}"...')
    df = pd.read_json(Path.joinpath(_PATH, FILE_NAME))
    
    df['source'] = 'ucipm'

    df.rename(columns = {'name': 'title'}, inplace = True)

    df['images'] = df['images'].apply(lambda d: d if isinstance(d, list) else [])
    df.apply(lambda r: update_key(r, 'images', 'caption'), axis = 1)


    for col in df.columns[df.applymap(lambda x: isinstance(x, str)).all(0)]:
        df[col] = df[col].apply(clean)

    return df

pestsDiseases().to_json('test.json', orient='records')

### `plantFlowerItems.json`
<a id='plantfloweritemsjson'></a>

In [None]:
def infoFlowers():
    # -------------------------------------------- Flowers information
    FILE_NAME = 'plantFlowerItems.json'
    print(f'Transforming "{FILE_NAME}"...')
    df = pd.read_json(Path.joinpath(_PATH, FILE_NAME))

    df['source'] = 'ucipm'

    df.rename(columns = {'name': 'title',}, inplace = True)

    df['images'] = df['images'].apply(lambda d: d if isinstance(d, list) else [])
    df.apply(lambda r: update_key(r, 'images', 'caption'), axis = 1)

    df['pests_and_disorders'] = df['pests_and_disorders'].apply(lambda d: d if isinstance(d, list) else [])
    df.apply(lambda r: update_key(r, 'pests_and_disorders', 'problem'), axis = 1)

    for col in df.columns[df.applymap(lambda x: isinstance(x, str)).all(0)]:
        df[col] = df[col].apply(clean)

    return df

infoFlowers().to_json('test.json', orient='records')

### `turfPests.json`
<a id='plantfloweritemsjson'></a>

In [None]:
def pestsTurf():
    # -------------------------------------------- Turf pests
    FILE_NAME = 'turfPests.json'
    print(f'Transforming "{FILE_NAME}"...')
    df = pd.read_json(Path.joinpath(_PATH, FILE_NAME))

    df['source'] = 'ucipm'

    df.rename(columns = {'name': 'title'}, inplace = True)

    df['images'] = df['images'].apply(lambda d: d if isinstance(d, list) else [])
    df.apply(lambda r: update_key(r, 'images', 'caption'), axis = 1)

    for col in df.columns[df.applymap(lambda x: isinstance(x, str)).all(0)]:
        df[col] = df[col].apply(clean)

    return df

pestsTurf().to_json('test.json', orient='records')

### `veggieItems_new.json`
<a id='veggieitems_newjson'></a>

In [None]:
def infoVeggies():
    # -------------------------------------------- Veggies information
    FILE_NAME = 'veggieItems_new.json'
    print(f'Transforming "{FILE_NAME}"...')
    df = pd.read_json(Path.joinpath(_PATH, FILE_NAME))

    df['source'] = 'ucipm'

    df.rename(columns = {'name'  : 'title'}, inplace = True)

    df['images'] = df['images'].apply(lambda d: d if isinstance(d, list) else [])
    df.apply(lambda r: update_key(r, 'images', 'caption'), axis = 1)

    df['pests_and_disorders'] = df['pests_and_disorders'].apply(lambda d: d if isinstance(d, list) else [])
    df.apply(lambda r: update_key(r, 'pests_and_disorders', 'problem'), axis = 1)

    for col in df.columns[df.applymap(lambda x: isinstance(x, str)).all(0)]:
        df[col] = df[col].apply(clean)

    return df

infoVeggies().to_json('test.json', orient='records')

### `weedItems.json`
<a id='weeditemsjson'></a>

In [None]:
def damagesWeed():
    # -------------------------------------------- Weed damages
    FILE_NAME = 'weedItems.json'
    print(f'Transforming "{FILE_NAME}"...')
    df = pd.read_json(Path.joinpath(_PATH, FILE_NAME))

    df['source'] = 'ucipm'
    
    df.rename(columns = {'name': 'title'}, inplace = True)


    df['images'] = df['images'].apply(lambda d: d if isinstance(d, list) else [])
    df.apply(lambda r: update_key(r, 'images', 'caption'), axis = 1)

    for col in df.columns[df.applymap(lambda x: isinstance(x, str)).all(0)]:
        df[col] = df[col].apply(clean)

    return df

damagesWeed().to_json('test.json', orient='records')

### Merging and transforming

In [None]:
import sys

sys.path.insert(1, os.path.realpath(os.path.pardir))

os.environ['STAGE'          ] = 'dev'
os.environ['ES_USERNAME'    ] = 'elastic'
os.environ['ES_PASSWORD'    ] = 'changeme'
os.environ['TF_CACHE_DIR'   ] = '/var/tmp/models'
## select the environment for ingestion
os.environ['ES_HOST'    ] = 'http://localhost:9200/'
# os.environ['ES_HOST'    ] = 'https://dev.es.chat.ask.eduworks.com/'
# os.environ['ES_HOST'    ] = 'https://qa.es.chat.ask.eduworks.com/'

import config

In [None]:
import importlib
importlib.reload(config)

### Transforming textual data

In [None]:
data = [
    ('exoticPests.json', pestsExotic()),
    ('fruitItems_new.json', infoFruits()),
    ('fruitVeggieEnvironItems_new.json', damagesEnvironment()),
    ('pestsDiseaseItems_new.json', pestsDiseases()),
    ('plantFlowerItems.json', infoFlowers()),
    ('turfPests.json', pestsTurf()),
    ('veggieItems_new.json', infoVeggies()),
    ('weedItems.json', damagesWeed())]

In [None]:
CHUNK_SIZE      = 1
ROLLING_SIZE    = 3

for file_name, df in data:

    print(f'File name - "{file_name}"...')

    cols = [col for col in df.columns[df.applymap(lambda x: isinstance(x, str)).all(0)] if col not in ['url', 'source']]
    list_cols = df.columns[df.applymap(lambda x: isinstance(x, list)).all(0)].values

    print(f'Transforming columns - text {cols} and links {list_cols}].')

    from spacy.lang.en import English 

    nlp = English()
    nlp.add_pipe('sentencizer')

    print(f'STARTING TRANSFORMING - CHUNK_SIZE - {CHUNK_SIZE}, ROLLING_SIZE = {ROLLING_SIZE}')
    c_items = []
    for i, r in df.iterrows():
        r_texts = []

        n_sentences = 0
        for c in cols:
            t = r[c]
            
            doc = nlp(t)
            
            ts = [sent for sent in doc.sents]
            if len(ts) == 0:
                continue
            else:
                chunks, chunk_size, roll_size = len(ts), CHUNK_SIZE, ROLLING_SIZE
                ts = [ts[i1:i1+chunk_size+(roll_size - 1)] for i1 in range(0, chunks - (roll_size - 1), chunk_size)]
                ts = [{'text': ' '.join([l2.text for l2 in l1]), 'name': c + '_' + str(i1), 'start': l1[0].start_char, 'end': l1[-1].end_char} for i1, l1 in enumerate(ts)]
            
            n_sentences += len(ts)
            r_texts.extend(ts)
        
        links_exits = False
        for lc in list_cols:
            ts = [i1['title'] for i1 in r[lc]]
            for i1, v in enumerate(ts):
                r_texts.append({'text': v, 'name': lc + '.' + 'title' + '_' + str(i1), 'start': 0, 'end': -1})
        
        
        c_items.append(r_texts)
        # break
        if (i+1) % 500 == 0:
            print(f'Finished transforming of {i+1} rows of dataframe')

    print(f'Finished transforming of {i+1} rows of dataframe')
    print(f'FINISHED TRANSFORMING')

    texts = [r1['text'] for r in c_items for r1 in r]

    BATCH_SIZE = 64

    print(f'STARTING EMBEDDING - BATCH_SIZE = {BATCH_SIZE}')
    df['vectors'] = np.empty((len(df), 0)).tolist()

    # TF HUB model
    # vectors   = config.embed(texts_modified).numpy().tolist()
        
    # Sentence Encoder model        
    vectors = config.embed.encode(
        sentences           = texts     ,
        batch_size          = BATCH_SIZE,
        show_progress_bar   = True
    ).tolist()

    index = 0
    for i, r in enumerate(c_items):
        for i1, r1 in enumerate(r):
            r1['vector'] = vectors[index]
            r1.pop('text')
            index += 1

    print(f'FINISHED EMBEDDING')

    df['vectors'] = c_items
    print(f'The number of vectors to be ingested: {len([r1["vector"] for r in df["vectors"] for r1 in r])}', end='\n\n')

## Ingesting data into ES

__Final mapping__
```json
{
    # mandatory fields
    "url"       : "url",                            # Main URL
    "source"    : "ucipm|aekb|okstate|orstate",     # Source Dataset
    "title"     : "title",                          # Title of data item
    ...
    # other fields
    ...
    "vectors"   : {
        "name"  : "field_name_and_index",           # Name of the field
        "start" : "number",                         # Start index within text
        "end"   : "number",                         # End index within text
        "vector": "dense_vector",                   # Embedding vector
}
```

In [None]:
# Different embedding sizes depending on the models
# VECTOR_SIZE = 384
# VECTOR_SIZE = 512
VECTOR_SIZE = 768

mapping  = {
    "settings": {"number_of_shards": 2, "number_of_replicas": 1},
    "mappings": {
        "dynamic"   : "true",
        "_source"   : {"enabled": "true"},
        "properties": {
            "source"        : {"type": "keyword", "index": "true" , "ignore_above": 32766},
            "url"           : {"type": "keyword", "index": "true" , "ignore_above": 32766},

            "title"         : {"type": "keyword", "index": "false", "ignore_above": 32766},
            "vectors"       : {
                "type"      : "nested",
                "properties": {
                    "vector": {
                        "type": "dense_vector", 
                        "dims": VECTOR_SIZE
                    },
                    "name"  : {"type": "keyword", "index": "false", "ignore_above": 32766},
                    "start" : {"type": "integer"                                         },
                    "end"   : {"type": "integer"                                         },
                }
            }
        }
    }
}

In [None]:
from elasticsearch import Elasticsearch
from elasticsearch.helpers import parallel_bulk

from collections import deque

# increase the timeout if necessary
es_client = Elasticsearch([config.es_host], http_auth=(config.es_username, config.es_password), timeout = 20)

es_client.indices.delete(
    index   = config.es_combined_index, 
    ignore  = 404)
es_client.indices.create(
    index       = config.es_combined_index  , 
    settings    = mapping['settings']       , 
    mappings    = mapping['mappings']       )
# play with chunk size parameter for timed out problem
for file_name, df in data:
    print(f'Ingesting "{file_name}"...')
    final_json = df.to_dict(orient = 'records')
    deque(parallel_bulk(es_client, actions = final_json, index = config.es_combined_index, max_chunk_bytes = 5 * 1024 * 1024), maxlen = 0)

es_client.indices.refresh()

# IPM data - April 2022 Scrape

In [None]:
_PATH = Path('../data/uc-ipm/scrape_cleaned_Apr2022/')
DATA_FILE_NAMES = sorted(_PATH.iterdir())
[data_file.name for data_file in DATA_FILE_NAMES]

The list of files should be as following:
```python
['FruitVegCulturalItems.json',
 'GardenControlsPestItems.json',
 'GardenControlsPesticideItems.json',
 'PestNotes.json',
 'QuickTips.json',
 'Videos.json',
 'WeedIdItems.json']
```

The corresponding ETL for these sources (links):
* [`FruitVegCulturalItems.json`](#fruitvegculturalitemsjson)
* [`GardenControlsPestItems.json`](#gardercontolspestitemsjson)
* [`GardenControlsPesticideItems.json`](#gardencontrolspesticideitemsjson)
* [`PestNotes.json`](#pestnotesjson)
* [`QuickTips.json`](#quicktipsjson)
* [`Videos.json`](#videosjson)
* [`WeedIdItems.json`](#weediditemsjson)

## ETL of data

In [None]:
def transform_table(row):
    '''
    Rename the 'tips_table' key values to title with title and header concatenation.
    '''
    if len(row['tips_table']) > 0:
        items = row['tips_table']
        assert 'header' in items[0]
        header = items[0]['header']
        header_title = row['title'] + ' - ' + items[0].pop('header')
        items[0]['title'] = header_title
        for item in items[1:]:
            item['title'] = item.pop('row')
            if len(item['title']) > 0:
                item['title'] = header_title + ' - ' + item['title']

def transform_pesticide(row):
    '''
    Merge pesticide subfield into main field - information.
    '''
    information = row['information'][0]
    texts = []
    for k, v in information.items():
        texts.append(k.replace('_', ' ').capitalize() + ': ' + v)
    row['information'] = '. '.join(texts)

### `FruitVegCulturalItems.json`
<a id='fruitvegculturalitemsjson'></a>

In [None]:
def infoFruitVegCultural():
    # -------------------------------------------- Fruit and veggie cultural tips
    FILE_NAME = 'FruitVegCulturalItems.json'
    print(f'Transforming "{FILE_NAME}"...')
    df = pd.read_json(Path.joinpath(_PATH, FILE_NAME))
    
    df['source'] = 'ucipm'      
    
    df.rename(columns = {'name': 'title'}, inplace = True)

    df['images'] = df['images'].apply(lambda d: d if isinstance(d, list) else [])
    df.apply(lambda r: update_key(r, 'images', 'caption'), axis = 1)

    df['tips_table'] = df['tips_table'].apply(lambda d: d if isinstance(d, list) else [])
    df.apply(lambda r: transform_table(r), axis = 1)

    for col in df.columns[df.applymap(lambda x: isinstance(x, str)).all(0)]:
        df[col] = df[col].apply(clean)
    
    return df

infoFruitVegCultural().to_json('test.json', orient='records')

### `GarderContolsPestItems.json`
<a id='gardercontolspestitemsjson'></a>

In [None]:
def infoPestControl():
    # -------------------------------------------- Garden pest control
    FILE_NAME = 'GardenControlsPestItems.json'
    print(f'Transforming "{FILE_NAME}"...')
    df = pd.read_json(Path.joinpath(_PATH, FILE_NAME))
    
    df['source'] = 'ucipm'
    
    df.rename(columns = {'name': 'title'}, inplace = True)

    df['images'] = df['images'].apply(lambda d: d if isinstance(d, list) else [])
    df.apply(lambda r: update_key(r, 'images', 'caption'), axis = 1)

    for col in df.columns[df.applymap(lambda x: isinstance(x, str)).all(0)]:
        df[col] = df[col].apply(clean)
    
    return df

infoPestControl().to_json('test.json', orient='records')

### `GardenControlsPesticideItems.json`
<a id='gardencontrolspesticideitemsjson'></a>

In [None]:


def infoPesticideControl():
    # -------------------------------------------- Garden pesticide control
    FILE_NAME = 'GardenControlsPesticideItems.json'
    print(f'Transforming "{FILE_NAME}"...')
    df = pd.read_json(Path.joinpath(_PATH, FILE_NAME))
    
    df['source'] = 'ucipm'
    
    df['title'] = df[['active_ingredient', 'pesticide_type']].agg(' - '.join, axis=1)
    df.drop(['active_ingredient', 'pesticide_type'], axis=1, inplace=True)

    df.apply(lambda r: transform_pesticide(r), axis = 1)

    for col in df.columns[df.applymap(lambda x: isinstance(x, str)).all(0)]:
        df[col] = df[col].apply(clean)

    return df

infoPesticideControl().to_json('test.json', orient='records')

### `PestNotes.json`
<a id='pestnotesjson'></a>

In [None]:
def pestsNotes():
    # -------------------------------------------- Pests IPM
    FILE_NAME = 'PestNotes.json'
    print(f'Transforming "{FILE_NAME}"...')
    df = pd.read_json(Path.joinpath(_PATH, FILE_NAME))
    
    df['source'] = 'ucipm'
    
    df.rename(columns = {
        'urlPestNote'           : 'url'         ,
        'name'                  : 'title'       ,
        'descriptionPestNote'   : 'description' ,
        'lifecyclePestNote'     : 'lifecycle'   ,
        'damagePestNote'        : 'damage'      ,
        'managementPestNote'    : 'management'  ,
        'imagePestNote'         : 'image'       ,
    }, inplace = True)

    df.drop('tablePestNote', axis=1, inplace=True)


    df['image'] = df['image'].apply(lambda d: d if isinstance(d, list) else [])
    df.apply(lambda r: update_key(r, 'image', 'caption'), axis = 1)

    for col in df.columns[df.applymap(lambda x: isinstance(x, str)).all(0)]:
        df[col] = df[col].apply(clean)
    
    return df

pestsNotes().to_json('test.json', orient='records')

### `QuickTips.json`
<a id='quicktipsjson'></a>

In [None]:
def pestsQuickTips():
    # -------------------------------------------- Quick tips on pests
    FILE_NAME = 'QuickTips.json'
    print(f'Transforming "{FILE_NAME}"...')
    df = pd.read_json(Path.joinpath(_PATH, FILE_NAME))
    
    df['source'        ] = 'ucipm'
    
    df.rename(columns = {
        'urlQuickTip'           : 'url'     ,
        'name'                  : 'title'   ,
        'contentQuickTips'      : 'content' ,
        'imageQuickTips'        : 'image'   ,
    }, inplace = True)


    df['image'] = df['image'].apply(lambda d: d if isinstance(d, list) else [])
    df.apply(lambda r: update_key(r, 'image', 'caption'), axis = 1)

    for col in df.columns[df.applymap(lambda x: isinstance(x, str)).all(0)]:
        df[col] = df[col].apply(clean)
    
    return df

pestsQuickTips().to_json('test.json', orient='records')

### `Videos.json`
<a id='videosjson'></a>

In [None]:
def pestsVideos():
    # -------------------------------------------- Videos of UC IPM YouTube data
    FILE_NAME = 'Videos.json'
    print(f'Transforming "{FILE_NAME}"...')
    df = pd.read_json(Path.joinpath(_PATH, FILE_NAME))
    
    df['source'] = 'ucipm'

    for col in df.columns[df.applymap(lambda x: isinstance(x, str)).all(0)]:
        df[col] = df[col].apply(clean)
    
    return df

pestsVideos().to_json('test.json', orient='records')

### `WeedIdItems.json`
<a id='weediditemsjson'></a>

In [None]:
def pestsWeed():
    # -------------------------------------------- Weed related pests
    FILE_NAME = 'WeedIdItems.json'
    print(f'Transforming "{FILE_NAME}"...')
    df = pd.read_json(Path.joinpath(_PATH, FILE_NAME))
    
    df['source'] = 'ucipm'
    
    df.rename(columns = {'name'  : 'title',}, inplace = True)

    df['images'] = df['images'].apply(lambda d: d if isinstance(d, list) else [])
    df.apply(lambda r: update_key(r, 'images', 'caption'), axis = 1)

    for col in df.columns[df.applymap(lambda x: isinstance(x, str)).all(0)]:
        df[col] = df[col].apply(clean)

    return df

pestsWeed().to_json('test.json', orient='records')


### Merging and transforming

In [None]:
data = [
    ('FruitVegCulturalItems.json', infoFruitVegCultural()),
    ('GardenControlPestItems.json', infoPestControl()),
    ('GardenControlsPesticideItems.json', infoPesticideControl()),
    ('PestNotes.json', pestsNotes()),
    ('QuickTips.json', pestsQuickTips()),
    ('Videos.json', pestsVideos()),
    ('WeedIdItems.json', pestsWeed())
]

In [None]:
CHUNK_SIZE      = 1
ROLLING_SIZE    = 3

for file_name, df in data:

    print(f'File name - "{file_name}"...')

    cols = [col for col in df.columns[df.applymap(lambda x: isinstance(x, str)).all(0)] if col not in ['url', 'source']]
    list_cols = df.columns[df.applymap(lambda x: isinstance(x, list)).all(0)].values

    print(f'Transforming columns - text {cols} and links {list_cols}].')

    from spacy.lang.en import English 

    nlp = English()
    nlp.add_pipe('sentencizer')

    print(f'STARTING TRANSFORMING - CHUNK_SIZE - {CHUNK_SIZE}, ROLLING_SIZE = {ROLLING_SIZE}')
    c_items = []
    for i, r in df.iterrows():
        r_texts = []

        n_sentences = 0
        for c in cols:
            t = r[c]
            
            doc = nlp(t)
            
            ts = [sent for sent in doc.sents]
            if len(ts) == 0:
                continue
            else:
                chunks, chunk_size, roll_size = len(ts), CHUNK_SIZE, ROLLING_SIZE
                ts = [ts[i1:i1+chunk_size+(roll_size - 1)] for i1 in range(0, chunks - (roll_size - 1), chunk_size)]
                ts = [{'text': ' '.join([l2.text for l2 in l1]), 'name': c + '_' + str(i1), 'start': l1[0].start_char, 'end': l1[-1].end_char} for i1, l1 in enumerate(ts)]
            
            n_sentences += len(ts)
            r_texts.extend(ts)
        
        links_exits = False
        for lc in list_cols:
            ts = [i1['title'] for i1 in r[lc]]
            for i1, v in enumerate(ts):
                r_texts.append({'text': v, 'name': lc + '.' + 'title' + '_' + str(i1), 'start': 0, 'end': -1})
        
        
        c_items.append(r_texts)
        # break
        if (i+1) % 500 == 0:
            print(f'Finished transforming of {i+1} rows of dataframe')

    print(f'Finished transforming of {i+1} rows of dataframe')
    print(f'FINISHED TRANSFORMING')

    texts = [r1['text'] for r in c_items for r1 in r]

    BATCH_SIZE = 64

    print(f'STARTING EMBEDDING - BATCH_SIZE = {BATCH_SIZE}')
    df['vectors'] = np.empty((len(df), 0)).tolist()

    # TF HUB model
    # vectors   = config.embed(texts_modified).numpy().tolist()
        
    # Sentence Encoder model        
    vectors = config.embed.encode(
        sentences           = texts     ,
        batch_size          = BATCH_SIZE,
        show_progress_bar   = True
    ).tolist()

    index = 0
    for i, r in enumerate(c_items):
        for i1, r1 in enumerate(r):
            r1['vector'] = vectors[index]
            r1.pop('text')
            index += 1

    print(f'FINISHED EMBEDDING')

    df['vectors'] = c_items
    print(f'The number of vectors to be ingested: {len([r1["vector"] for r in df["vectors"] for r1 in r])}', end='\n\n')

## Ingesting data into ES

__Final mapping__
```json
{
    # mandatory fields
    "url"       : "url",                            # Main URL
    "source"    : "ucipm|aekb|okstate|orstate",     # Source Dataset
    "title"     : "title",                          # Title of data item
    ...
    # other fields
    ...
    "vectors"   : {
        "name"  : "field_name_and_index",           # Name of the field
        "start" : "number",                         # Start index within text
        "end"   : "number",                         # End index within text
        "vector": "dense_vector",                   # Embedding vector
}
```

In [None]:
from elasticsearch import Elasticsearch
from elasticsearch.helpers import parallel_bulk

from collections import deque

# increase the timeout if necessary
es_client = Elasticsearch([config.es_host], http_auth=(config.es_username, config.es_password), timeout = 20)

# play with chunk size parameter for timed out problem
for file_name, df in data:
    print(f'Ingesting "{file_name}"...')
    final_json = df.to_dict(orient = 'records')
    deque(parallel_bulk(es_client, actions = final_json, index = config.es_combined_index, max_chunk_bytes = 5 * 1024 * 1024), maxlen = 0)

es_client.indices.refresh()

# AskExtension Data

In [None]:
_PATH = Path('../data/askextension_kb/')
DATA_FILE_NAMES = sorted(_PATH.iterdir())

print(f'List of files:\n{[data_file.name for data_file in DATA_FILE_NAMES]}')

with open(DATA_FILE_NAMES[0]) as f:
    f = json.load(f)
    print(json.dumps(f[0], indent = 2))

The list of files should be as following:
```python
['2012-2013.json', '2014-2015.json', '2016-2017.json', '2018-2019.json', '2020-1.json', '2020-2.json', '2021-1.json', '2021-2.json']
```

__NB__: We will only using tickets from California state.

## ETL

In [None]:
# Combines the data files into one and returns it.
df = pd.DataFrame()
for f in DATA_FILE_NAMES:
    df = pd.concat([df, pd.read_json(f)], ignore_index = True, axis = 0)
df.sample(5)

In [None]:
def transform_answer(answer_dict):
    '''
    Convert answer field from a dictionary to a list.
    '''
    answers = [{}] * len(answer_dict)
    
    for k, v in answer_dict.items():
        # clean the response up
        v = {
            'response' : clean(v['response']),
        }
        answers[int(k) - 1] = v
    
    return answers

def transform_title(title):
    '''
    Remove question ID from title, and append '.' in the end
    if no punctuation was detected.

    Example with '#' - 437259
    Example with '...' - 437264
    '''
    title = ''.join(title.split('#')[:-1]).strip().strip('...')
    
    # add a '.' if it does not yet end with a punctuation
    title = title if (title and title[-1] in pn) else title + '.'
    
    return title

def merge_title_question(df):
    '''
    Create new column from questions and title,
    but only if it is not already exactly in the question.
    '''
    titles      = df['title'    ].tolist()
    questions   = df['question' ].tolist()
    
    tqs = [
        question
        if (title and question.startswith(title[:-1]))
        else title + " " + question
        for (title, question) in zip(titles, questions)
    ]

    return tqs

In [None]:
import sys
import re

from string import punctuation as pn

# Modify STATE_FILTER and MIN_WORD_COUNT variables accordingly
STATE_FILTER    = ['California', 'Oklahoma', 'Oregon']
MIN_WORD_COUNT  = 3

ASKEXTENSION_QUESTION_URL = 'https://ask2.extension.org/kb/faq.php?id='

df['source'] = 'ae-kb'

# Convert 'faq-id' to str type
df['faq-id'] = df['faq-id'].astype(str)

# Leave tickets from California state
df = df[df['state'].isin(STATE_FILTER)]

# Add the URL and leave blank URL for questions with no ID
df['url'] = [
    f"{ASKEXTENSION_QUESTION_URL}{ticket_no}" if len(ticket_no) == 6 else ""
    for ticket_no in df['title'].str.split('#').str[-1]
]

# Add the ticket number from title and leave blank for questions without
df['ticket-no'] = [
    ticket_no if len(ticket_no) == 6 else ""
    for ticket_no in df['title'].str.split('#').str[-1]
]

df.rename(columns = {'faq-id': 'faq_id', 'ticket-no': 'ticket_no'}, inplace = True)

# Transform answer
df['answers'] = df['answer'].apply(transform_answer)

# Strip all spaces and remove non-ascii characters from text fields
for column in ['state', 'title', 'question']:
    df[column] = df[column].apply(clean)

# Clean ID and '...' from title, and append punctuation if not present
df['title'] = df['title'].apply(transform_title)

# Create new column from `title` and `question`, or only question
# if title is exactly the question     
df['question'] = merge_title_question(df)
    
# Remove questions with small number words in title-question
if MIN_WORD_COUNT:
    df = df[df['question'].str.split().str.len() > MIN_WORD_COUNT]

df = df.loc[:, ['source', 'url', 'state', 'title', 'question', 'answers']]
df.reset_index(drop=True, inplace=True)
df.sample(5)

In [None]:
df.groupby('state').size()

# Embedding text fields into vectors and stripping text fields for saving into ES

In [None]:
import importlib
importlib.reload(config)

### Transforming textual data

In [None]:
CHUNK_SIZE      = 1
ROLLING_SIZE    = 3


print(f'AE KB data sources...')

cols = [col for col in df.columns[df.applymap(lambda x: isinstance(x, str)).all(0)] if col not in ['url', 'source', 'state']]
list_cols = df.columns[df.applymap(lambda x: isinstance(x, list)).all(0)].values

print(f'Transforming columns - text {cols} and links {list_cols}].')

from spacy.lang.en import English 

nlp = English()
nlp.add_pipe('sentencizer')

print(f'STARTING TRANSFORMING - CHUNK_SIZE - {CHUNK_SIZE}, ROLLING_SIZE = {ROLLING_SIZE}')
c_items = []
for i, r in df.iterrows():
    r_texts = []

    n_sentences = 0
    for c in cols:
        t = r[c]
        
        doc = nlp(t)
        
        ts = [sent for sent in doc.sents]
        if len(ts) == 0:
            continue
        else:
            chunks, chunk_size, roll_size = len(ts), CHUNK_SIZE, ROLLING_SIZE
            ts = [ts[i1:i1+chunk_size+(roll_size - 1)] for i1 in range(0, chunks - (roll_size - 1), chunk_size)]
            ts = [{'text': ' '.join([l2.text for l2 in l1]), 'name': c + '_' + str(i1), 'start': l1[0].start_char, 'end': l1[-1].end_char} for i1, l1 in enumerate(ts)]
        
        n_sentences += len(ts)
        r_texts.extend(ts)
    
    links_exits = False
    for lc in list_cols:
        ts = [i1['response'] for i1 in r[lc]]
        for i1, v in enumerate(ts):
            r_texts.append({'text': v, 'name': lc + '.' + 'response' + '_' + str(i1), 'start': 0, 'end': -1})
    
    
    c_items.append(r_texts)
    # break
    if (i+1) % 500 == 0:
        print(f'Finished transforming of {i+1} rows of dataframe')

print(f'Finished transforming of {i+1} rows of dataframe')
print(f'FINISHED TRANSFORMING')

texts = [r1['text'] for r in c_items for r1 in r]

BATCH_SIZE = 64

print(f'STARTING EMBEDDING - BATCH_SIZE = {BATCH_SIZE}')
df['vectors'] = np.empty((len(df), 0)).tolist()

# TF HUB model
# vectors   = config.embed(texts_modified).numpy().tolist()
    
# Sentence Encoder model        
vectors = config.embed.encode(
    sentences           = texts     ,
    batch_size          = BATCH_SIZE,
    show_progress_bar   = True
).tolist()

index = 0
for i, r in enumerate(c_items):
    for i1, r1 in enumerate(r):
        r1['vector'] = vectors[index]
        r1.pop('text')
        index += 1

print(f'FINISHED EMBEDDING')

df['vectors'] = c_items
print(f'The number of vectors to be ingested: {len([r1["vector"] for r in df["vectors"] for r1 in r])}', end='\n\n')

### Checking for invalid links

In [None]:
count_ucipm = 0
count_askextension = 0
for i, r in finalDf.iterrows():
    url = r['url']
    if len(r['url']) < 10:
        print(f'Source with no main link at row {i} of data frame, main link - {url}.')
    links = r['links']
    no_link = False
    show_main_url = False
    for l in links:
        if len(l['src']) < 10:
            no_link = True
            if not show_main_url:
                show_main_url = True
                print(f'Links at {url}')
            print(l)
    if no_link:
        if r['source'] == 'ae_kb':
            count_askextension += 1
        else:
            count_ucipm += 1
        

print(f'Number of sources from AskExtension with no link urls - {count_askextension}')
print(f'Number of sources from UC IPM with no link urls - {count_ucipm}')

### Embedding textual data

In [None]:
texts = [r1['text'] for r in c_items for r1 in r]


BATCH_SIZE = 64

print(f'STARTING EMBEDDING')

finalDf['vectors'] = np.empty((len(finalDf), 0)).tolist()

# TF HUB model
# vectors   = config.embed(texts_modified).numpy().tolist()
    
# Sentence Encoder model        
vectors = config.embed.encode(
    sentences           = texts_modified,
    batch_size          = BATCH_SIZE    ,
    show_progress_bar   = True
).tolist()

index = 0
for i, r in enumerate(c_items):
    for i1, r1 in enumerate(r):
        r1['vector'] = vectors[index]
        r1.pop('text')
        index += 1

print(f'FINISHED EMBEDDING')

finalDf['vectors'] = c_items
print(f'The number of vectors to be ingested: {len([r1["vector"] for r in finalDf["vectors"] for r1 in r])}')        
finalDf.sample(5)

## Ingesting data into ES

In [None]:
# Different embedding sizes depending on the models
# VECTOR_SIZE = 384
# VECTOR_SIZE = 512
VECTOR_SIZE = 768

mapping  = {
    "settings": {"number_of_shards": 2, "number_of_replicas": 1},
    "mappings": {
        "dynamic"   : "false",
        "_source"   : {"enabled": "true"},
        "properties": {
            "source"        : {"type": "keyword", "index": "true" , "ignore_above": 32766},
            "url"           : {"type": "keyword", "index": "false", "ignore_above": 32766},

            "title"         : {"type": "keyword", "index": "false", "ignore_above": 32766},
            "description"   : {"type": "keyword", "index": "false", "ignore_above": 32766},
            "identification": {"type": "keyword", "index": "false", "ignore_above": 32766},
            "development"   : {"type": "keyword", "index": "false", "ignore_above": 32766},
            "damage"        : {"type": "keyword", "index": "false", "ignore_above": 32766},
            "management"    : {"type": "keyword", "index": "false", "ignore_above": 32766},
            "vectors"       : {
                "type"      : "nested",
                "properties": {
                    "vector": {
                        "type": "dense_vector", 
                        "dims": VECTOR_SIZE
                    },
                    "name"  : {"type": "keyword", "index": "false", "ignore_above": 32766},
                    "start" : {"type": "integer"                                         },
                    "end"   : {"type": "integer"                                         },
                }
            },
            
            "links"         : {
                "type"      : "nested",
                "properties": {
                    "type"  : {"type": "keyword", "index": "false", "ignore_above": 32766},
                    "src"   : {"type": "keyword", "index": "false", "ignore_above": 32766},
                    "link"  : {"type": "keyword", "index": "false", "ignore_above": 32766},
                    "title" : {"type": "keyword", "index": "false", "ignore_above": 32766}
                }
            }
        }
    }
}

final_json = finalDf.to_dict('records')

In [None]:
from elasticsearch import Elasticsearch
from elasticsearch.helpers import parallel_bulk

from collections import deque

# increase the timeout if necessary
es_client = Elasticsearch([config.es_host], http_auth=(config.es_username, config.es_password), timeout = 20)

es_client.indices.delete(
    index   = config.es_combined_index, 
    ignore  = 404)
es_client.indices.create(
    index       = config.es_combined_index  , 
    settings    = mapping['settings']       , 
    mappings    = mapping['mappings']       )
# play with chunk size parameter for timed out problem
deque(parallel_bulk(es_client, actions = final_json, index = config.es_combined_index, max_chunk_bytes = 5 * 1024 * 1024), maxlen = 0)

es_client.indices.refresh()