# Create Elastic search local

### Notes

Links:
- celltypes for df: https://stackoverflow.com/questions/38231591/splitting-dictionary-list-inside-a-pandas-column-into-separate-columns
- url

### import

In [1]:
# imports
import os
import json

import pandas as pd
import numpy as np

from elasticsearch import Elasticsearch
from tqdm import tqdm_notebook

### functions

In [147]:
def code_output(cell):
    """
    Explanation function
    """
    if cell['outputs']!=[]:
        output_type = cell['outputs'][0]['output_type']
        if output_type == 'stream':
            return str(cell['outputs'][0]['text']),output_type
                       
        elif output_type == 'error':
            return str(cell['outputs'][0]['traceback']),output_type
                       
        elif output_type == 'display_data':
            return 'displayed data',output_type
                       
        elif 'data' in cell['outputs'][0].keys():
            return str(list(cell['outputs'][0]['data'].values())),output_type
                       
        elif 'text' in cell['outputs'][0].keys():
            return str(cell['outputs'][0]['text']),output_type
                       
        elif 'ename' in cell['outputs'][0].keys():
            return str(cell['outputs'][0]['ename']+cell['outputs'][0]['evalue']),output_type
    else:
#         return '',''
        return 'empty','empty'


def read_ipynb_cell(file_id,file_dict,file,folder,location,repo):
    """
    Explanation function
    """
#     file_dict = {} # gets created in the function 'create_SE_from_folder'
    temp_dict = {}
    temp_dict['cell_types'] = {}
    temp_dict['output_types'] = {}
    # cell_types/output_types used to be list, but changed to dict to use json_normalize in a later stage.
    
    with open(location,encoding="utf8") as notebook:
        data = json.load(notebook)
        nbformat = data['nbformat']
        file_text = ""
        file_lines = 0
        
        if nbformat == 4: # current nbformat
            data_cells =  data['cells']
        elif nbformat == 3: # old nbformat (older files)
            data_cells =  data['worksheets'][0]['cells']
        elif nbformat == 2: # even older nbformat, no differences with nbformat 3 at first glance at json level
            data_cells =  data['worksheets'][0]['cells']
        else:
            print(nbformat,file)

        for cell in data_cells:
            cell_type = cell['cell_type']
            if cell_type == 'code' and (nbformat == 3 or nbformat == 2): #cell['source'] doesn't exist within this condition, use cell['input']
                text = cell['input']
            else:
                text = cell['source']
            clean_cell = list(map(lambda s: s.strip(), text)) #remove the '\n' at the end of each string in the list         
            lines = len(clean_cell)
            single_string = ' '.join(clean_cell)
            file_text += single_string
            file_lines += lines
            
            # CELL TYPE
            cell_type = cell['cell_type']
            if cell_type == 'code':
                output, output_type = code_output(cell)
                file_text += output
                if output_type not in temp_dict['output_types'].keys():
                    temp_dict['output_types'][output_type] = 1    
            if cell_type not in temp_dict['cell_types'].keys():
                temp_dict['cell_types'][cell_type] = 1

        temp_dict['file'] = file
        temp_dict['nbformat'] = data['nbformat']
        temp_dict['folder'] = folder
        temp_dict['repo'] =  repo
        temp_dict['location'] = location
        temp_dict['string'] = file_text
        temp_dict['lines'] = file_lines
        file_dict[file_id] = temp_dict
        
    return file_dict

def rec_to_actions(df):
    for record in df.to_dict(orient="records"):
        yield ('{ "index" : { "_index" : "%s", "_type" : "%s" }}'% (INDEX, TYPE))
        yield (json.dumps(record, default=int))
      

def index_marks(nrows, chunk_size):
    return range(1 * chunk_size, (nrows // chunk_size + 1) * chunk_size, chunk_size)


def split(dfm, chunk_size):
    indices = index_marks(dfm.shape[0], chunk_size)
    return np.split(dfm, indices) 
    

def create_SE_from_folder(repo_name,folder):
    """
    Explenation
    """
    total_count = 0
    fail_count = 0
    
    cwd = os.getcwd()
    if folder == 'single_files':
        path = cwd+"\\"+folder
    elif folder == 'notebooks1':
        path = 'E:\\Files\\Universiteit\\thesis\\notebooks1' # External harddrive with 90 gb's of ipynb files
    elif folder == 'notebooks':
        path = cwd+"\\"+folder
    else:
        path = cwd+"\\"+folder+"\\"+repo_name
        print(path)
        
    # CREATE DICT FOR ALL 'IPYNB' FILES
    print('Looping through all files within the path directory...')
    file_id = 0
    file_dict = {}    
    for root, dirs, files in os.walk(path): # tqdm progress bar not possible since os.walk doesnt know
        for file in files:                  # how many files it will cross yet
            if file.endswith(".ipynb"):
                folder = root.split('\\')[-1]
                location = os.path.join(root,file)
                repo = repo_name
                try:
                    file_dict = read_ipynb_cell(file_id,file_dict,file,folder,location,repo)
                except Exception as e:
#                     print("failed for file:",file)
                    fail_count += 1
                file_id += 1
                total_count += 1
                
    print('Failed files:',fail_count,'/',total_count) 
    print('Setting up local elastic search')
    
    # CREATE DATAFRAME FROM DICT
    file_df = pd.DataFrame.from_dict(file_dict,orient='index')
#     file_df = file_df.fillna('empty').reset_index()
    file_df_extra = pd.concat([file_df.drop(['cell_types','output_types'], axis=1),
                            json_normalize(file_df['cell_types']),
                            json_normalize(file_df['output_types'])],
                           axis=1).fillna(0)#.drop([""],axis=1)
    

    # PUT DATAFRAME INTO ELASTIC SEARCH
    split_size = 500
    for chuck in tqdm_notebook(split(file_df_extra, split_size)):
        try:
            r = es.bulk(rec_to_actions(chuck))
        except:
            print('mini_split')
            try:
                for mini_chuck in tqdm_notebook(split(chuck, split_size/10)):
                    r = es.bulk(rec_to_actions(mini_chuck))
            except Exception as e:
                print('failed, skip this df',e)
    
    return es, file_df_extra

cell below takes proximately:  
- looping through files: 2 minutes 30 seconds
- put df in local es: 1 minutes 0 seconds

In [148]:
%time es, file_df = create_SE_from_folder('sample','notebooks')
file_df

Looping through all files within the path directory...
Failed files: 35 / 6529
Setting up local elastic search


HBox(children=(IntProgress(value=0, max=14), HTML(value='')))

mini_split
failed, skip this df 'float' object cannot be interpreted as an integer
mini_split
failed, skip this df 'float' object cannot be interpreted as an integer

Wall time: 3min 31s


Unnamed: 0,file,nbformat,folder,repo,location,string,lines,code,markdown,heading,raw,stream,empty,execute_result,display_data,pyout,pyerr,error
0,nb_1000546.ipynb,4.0,notebooks,sample,C:\Users\kenne\Documents\thesis\notebooks\nb_1...,from defectlib import load_tensors from defect...,31.0,1.0,0.0,0.0,0.0,1.0,1.0,1.0,1.0,0.0,0.0,0.0
1,nb_1000590.ipynb,4.0,notebooks,sample,C:\Users\kenne\Documents\thesis\notebooks\nb_1...,"# Up to this point we have not used R, but now...",59.0,1.0,1.0,0.0,0.0,1.0,1.0,1.0,0.0,0.0,0.0,0.0
2,nb_1000623.ipynb,4.0,notebooks,sample,C:\Users\kenne\Documents\thesis\notebooks\nb_1...,import numpy as np import math import matplotl...,329.0,1.0,0.0,0.0,0.0,1.0,1.0,0.0,1.0,0.0,0.0,0.0
3,nb_1001147.ipynb,4.0,notebooks,sample,C:\Users\kenne\Documents\thesis\notebooks\nb_1...,import numpy as np import pandas as pd import ...,94.0,1.0,0.0,0.0,0.0,1.0,1.0,0.0,0.0,0.0,0.0,0.0
4,nb_1001280.ipynb,3.0,notebooks,sample,C:\Users\kenne\Documents\thesis\notebooks\nb_1...,"def gcd(m,n): while m%n != 0: oldm = m oldn = ...",73.0,1.0,0.0,0.0,0.0,1.0,0.0,0.0,0.0,0.0,0.0,0.0
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
6524,nb_999055.ipynb,4.0,notebooks,sample,C:\Users\kenne\Documents\thesis\notebooks\nb_9...,Derived from sqlite_P1_v10import pandas as pd ...,182.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0
6525,nb_999056.ipynb,4.0,notebooks,sample,C:\Users\kenne\Documents\thesis\notebooks\nb_9...,import pandas as pd import sqlite3emptypd.set_...,38.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0
6526,nb_999673.ipynb,4.0,notebooks,sample,C:\Users\kenne\Documents\thesis\notebooks\nb_9...,import pandas as pd import reempty### Splits s...,31.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0
6527,nb_999700.ipynb,3.0,notebooks,sample,C:\Users\kenne\Documents\thesis\notebooks\nb_9...,from sklearn import datasets iris = datasets.l...,23.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0


In [149]:
file_df.info()

<class 'pandas.core.frame.DataFrame'>
Int64Index: 6529 entries, 0 to 6528
Data columns (total 18 columns):
file              6529 non-null object
nbformat          6529 non-null float64
folder            6529 non-null object
repo              6529 non-null object
location          6529 non-null object
string            6529 non-null object
lines             6529 non-null float64
code              6529 non-null float64
markdown          6529 non-null float64
heading           6529 non-null float64
raw               6529 non-null float64
stream            6529 non-null float64
empty             6529 non-null float64
execute_result    6529 non-null float64
display_data      6529 non-null float64
pyout             6529 non-null float64
pyerr             6529 non-null float64
error             6529 non-null float64
dtypes: float64(13), object(5)
memory usage: 969.1+ KB


In [153]:
# # update pandas for this function
# # !pip install --user pandas==1.0.3
# from pandas.io.json import json_normalize

# print(pd.__version__)
# json_normalize(file_df['cell_types'])

In [150]:
# json_normalize(file_df['cell_types'])

In [151]:
# # Concatenate the normalized columns of 'cell_types' and 'output_types'
# # fill the NaN values with 0, meaning negative (while 1 means a positive)
# # drop the column '' (empty sttring)

# file_df_extra = pd.concat([file_df.drop(['cell_types','output_types'], axis=1),
#                             json_normalize(file_df['cell_types']),
#                             json_normalize(file_df['output_types'])],
#                            axis=1).fillna(0).drop([""],axis=1)
# file_df_extra

In [154]:
# # get all files containing a header
# file_df[file_df.heading == 1]

In [155]:
# type(file_df.cell_types[0])

In [156]:
# %time print(5)


### Create local elastic search variable

#### Elastic Search curl's

In [157]:
# !curl "http://localhost:9200/test"
# !curl -XDELETE "localhost:9200/test_df_extra"
# !curl -XPOST "http://localhost:9200/_shutdown"

!curl "http://localhost:9200/_cat/indices?v"

health status index             uuid                   pri rep docs.count docs.deleted store.size pri.store.size
yellow open   test_df_extra     DCUnYCeMSeSpqyf4guzzhw   1   1       6029            0    514.8mb        514.8mb
yellow open   sample_notebooks1 kYo_GDRQTrmTrrxSklQDSQ   1   1     176812            0    915.7mb        915.7mb
yellow open   sample_notebooks  64SAHKxJSC6IqaDLYCuJ3A   1   1     176312            0    691.5mb        691.5mb


  % Total    % Received % Xferd  Average Speed   Time    Time     Time  Current
                                 Dload  Upload   Total   Spent    Left  Speed

  0     0    0     0    0     0      0      0 --:--:-- --:--:-- --:--:--     0
100   452  100   452    0     0  13696      0 --:--:-- --:--:-- --:--:-- 13294
100   452  100   452    0     0  13696      0 --:--:-- --:--:-- --:--:-- 13294


### Put single_files into elastic search

In [None]:
HOST = 'http://localhost:9200/'
es = Elasticsearch(hosts=[HOST]) 


INDEX = "test_single"
TYPE = "record"

In [None]:
create_SE_from_folder('unknown',es,'single_files')

In [130]:
!curl "http://localhost:9200/_cat/indices?v"

health status index             uuid                   pri rep docs.count docs.deleted store.size pri.store.size
yellow open   sample_notebooks1 kYo_GDRQTrmTrrxSklQDSQ   1   1     176812            0    915.7mb        915.7mb
yellow open   sample_notebooks  64SAHKxJSC6IqaDLYCuJ3A   1   1     176312            0    691.5mb        691.5mb


  % Total    % Received % Xferd  Average Speed   Time    Time     Time  Current
                                 Dload  Upload   Total   Spent    Left  Speed

  0     0    0     0    0     0      0      0 --:--:-- --:--:-- --:--:--     0
100   339  100   339    0     0   2690      0 --:--:-- --:--:-- --:--:--  2712


### Put github repositories into elastic search

In [146]:
# !curl -XDELETE "localhost:9200/test_repos"

HOST = 'http://localhost:9200/'
es = Elasticsearch(hosts=[HOST]) 


INDEX = "test_df_extra"
TYPE = "record"

In [None]:
folder = 'repos'
path = path = os.getcwd()+'//'+folder
dir_list = os.listdir(path)
file_id = 0
for repo in tqdm_notebook(dir_list):
    try:
        a = create_SE_from_folder(repo,es,folder,file_id)
        es = a[0]
        file_id = a[1]
    except Exception as e:
        print(e,repo)

    
# cell_df,es,file_id

In [None]:
!curl "http://localhost:9200/_cat/indices?v"

In [None]:
def count_ipynb(folder):
    path = os.getcwd()+'//'+folder
    dir_list = os.listdir(path)

    file_id = 0
    for root, dirs, files in os.walk(path):
#         for file in tqdm_notebook(files):
        for file in files:
            if file.endswith(".ipynb"):
                file_id += 1
    return file_id

count_ipynb('single_files'),count_ipynb('repos'),count_ipynb('notebooks')

In [None]:
create_SE_from_folder('sample',es,'notebooks',0)

In [None]:
HOST = 'http://localhost:9200/'
es = Elasticsearch(hosts=[HOST],timeout=30,max_retries=10, retry_on_timeout=True) 


INDEX = "sample_notebooks1"
TYPE = "record"

In [None]:
sample_df = create_SE_from_folder('sample',es,'notebooks',0)

In [None]:
sample_df

In [None]:
sample_df.drop(columns=['char'])

In [None]:
# !curl -XDELETE "localhost:9200/sample_notebooks"
!curl "http://localhost:9200/_cat/indices?v"

In [None]:
# set(sample_df.output_type)

sample_df[sample_df.output_type=='display_data']

In [None]:
for chuck in tqdm_notebook(split(sample_df, 500)):
    try:
        r = es.bulk(rec_to_actions(chuck))
    except:
        try:
            for mini_chuck in tqdm_notebook(split(chuck, 500)):
                r = es.bulk(rec_to_actions(mini_chuck))
        except:
            print('failed, skip this df')
    
    
# for chuck in tqdm_notebook(split(sample_df.drop(columns=['char']), 10000)):
#     r = es.bulk(rec_to_actions(chuck))

In [None]:
numpy_data = np.random.rand(117000,2)
df = pd.DataFrame(data=numpy_data, columns=["column1", "column2"])
df

In [None]:
for x in split(df, 50000):
    print(type(x))

In [170]:
text = 'drop column'
attributes_dict = {'code':1,'markdown':1,'heading':0,'raw':0,'stream':1,'empty':0,'execute_result':0}
query_str = ''

query_str += '(string:'+text+')'
for key in q.keys():
    if q[key]==1:
        to_add = '('+key+':1)'

        
        if query_str != '':
            query_str += ' AND '
        query_str += to_add
        print(to_add)
        
    
q,query_str

(code:1)
(markdown:1)
(stream:1)


({'code': 1,
  'markdown': 1,
  'heading': 0,
  'raw': 0,
  'stream': 1,
  'empty': 0,
  'execute_result': 0},
 '(string:drop column) AND (code:1) AND (markdown:1) AND (stream:1)')

In [173]:
def query_string_query(text,att_dict,highlight):
    query_str = ''
    fields = []
    
    query_str += '(string:'+text+')'
    for key in att_dict.keys():
        if att_dict[key]==1:
            fields.append
            to_add = '('+key+':1)'
            if query_str != '':
                query_str += ' AND '
            query_str += to_add
    
        
    q = {}
    query_dict = {}
    query_string = {}
    
    query_string['query'] = query_str
    query_dict['query_string'] = query_string
    q['query'] = query_dict
    
    if highlight == True:
        highlight = {"pre_tags":["<b>"],
                     "post_tags":["</b>"],
                     "order":"score",
                     "fields":{'string':{}}#,'code_str':{}}
                    }
        q['highlight'] = highlight
    return q

# test
text = 'drop column'
attributes_dict = {'code':1,'markdown':1,'heading':0,'raw':0,'stream':1,'empty':0,'execute_result':0}

q_test_querystring = query_string_query(text,attributes_dict,True)
q_test_querystring

{'query': {'query_string': {'query': '(string:drop column) AND (code:1) AND (markdown:1) AND (stream:1)'}},
 'highlight': {'pre_tags': ['<b>'],
  'post_tags': ['</b>'],
  'order': 'score',
  'fields': {'string': {}}}}

In [159]:
def multi_match_query(code,markdown,modules,highlight):
    query_str = ''
    fields = []
    
    if code != None:
        fields += ['code_str']
        query_str += ' '+code
    if markdown != None:
        fields += ['markdown_str']
        query_str += ' '+markdown
    if modules != None:
        fields += ['modules']
        query_str += ' '+modules   
    
        
    q = {}
    query_dict = {}
    multi_match = {}
    
    multi_match['fields'] = fields
    multi_match['query'] = query_str
    query_dict['multi_match'] = multi_match
    q['query'] = query_dict
    
    if highlight != None:
        highlight = {"pre_tags":["<b>"],
            "post_tags":["</b>"],
            "fields":{'markdown_str':{}}}
        q['highlight'] = highlight
    return q

def query_string_query(code,markdown,modules,highlight):
    query_str = ''
    fields = []
    
    if code != None:
        fields += ['code_str']
        query_str += '(code_str:'+code+')'
    if markdown != None:
        fields += ['markdown_str']
        if query_str != '':
            query_str += ' AND '
        query_str += '(markdown_str:'+markdown+')'
    if modules != None:
        fields += ['modules']
        if query_str != '':
            query_str += ' AND '
        query_str += '(modules:'+modules+')'  
    
        
    q = {}
    query_dict = {}
    query_string = {}
    
    query_string['query'] = query_str
    query_dict['query_string'] = query_string
    q['query'] = query_dict
    
    if highlight == True:
        highlight = {"pre_tags":["<b>"],
                     "post_tags":["</b>"],
                     "order":"score",
                     "fields":{'markdown_str':{},'code_str':{}}}
        q['highlight'] = highlight
    return q

# "order":"score",
# "fields":{'_all':{}}} WERKT NIET....


# https://www.elastic.co/guide/en/elasticsearch/reference/6.8/search-request-highlighting.html

In [160]:
# test
q_test_multimatch = multi_match_query('linear',None,'numpy',True)
q_test_multimatch

{'query': {'multi_match': {'fields': ['code_str', 'modules'],
   'query': ' linear numpy'}},
 'highlight': {'pre_tags': ['<b>'],
  'post_tags': ['</b>'],
  'fields': {'markdown_str': {}}}}

In [161]:
# test
q_test_querystring = query_string_query(None,'drop','pandas',True)
q_test_querystring

{'query': {'query_string': {'query': '(markdown_str:drop) AND (modules:pandas)'}},
 'highlight': {'pre_tags': ['<b>'],
  'post_tags': ['</b>'],
  'order': 'score',
  'fields': {'markdown_str': {}, 'code_str': {}}}}