# Converting Sample Data to ElasticSearch indexes.

> "Author" : "Syed Owais Chishti"

>"Task" : "Indexing Reddit Data with Elastic Search in Python."

### Importing Libraties

In [64]:
from elasticsearch import Elasticsearch
els = Elasticsearch()

In [65]:
import json
from datetime import datetime
timeframe = 'Data'

### Defining Schema for Index.

Index Name : redditData,

type : dbalter,

fields : [
parent_id : TEXT,
comment_id : TEXT,
parent : TEXT,
comment : TEXT,
subreddit : TEXT,
unix : INT,
score : INT
]

In [66]:
def create_table():    
    m = {
        "mappings": {
            "dbalter": {
                "properties": {
                    "parent_id": {
                        "type": "text"
                    },
                    "comment_id":{
                        "type": "text"
                    },
                    "parent":{
                        "type": "text",
                        "analyzer": "standard"
                    },
                    "comment": {
                        "type": "text",
                        "analyzer": "standard"
                    },
                    "subreddit": {
                        "type": "text"
                    },
                    "unix": {
                        "type": "integer"
                    },
                    "score": {
                        "type": "integer"
                    }
                }
            }
        }
    }
    Q1 = els.indices.create(index='redditdata', body=m)
    return Q1
print(create_table())

{'acknowledged': True, 'shards_acknowledged': True, 'index': 'redditdata'}


In [67]:
def format_data(data):
    data = data.replace('\n',' newlinechar ').replace('\r',' newlinechar ').replace('"',"'")
    return data

In [68]:
def find_parent(pid):
    try:
        #sql = "SELECT comment FROM parent_reply WHERE comment_id = '{}' LIMIT 1".format(pid)
        result = els.search(index='redditdata', body={"query":{"match":{"comment_id":"{}".format(pid)}}})
        if result['hits']['hits'] != []:
            result = result['hits']['hits'][0]['_source']['comment']
            #print("find_parent Function: ", result)
            return result
        else: return False
    except Exception as e:
        print(str(e))
        return False

In [69]:
def find_existing_score(pid):
    try:
        #sql = "SELECT score FROM parent_reply WHERE parent_id = '{}' LIMIT 1".format(pid)
        result = els.search(index='redditdata',body={"query":{"match":{"parent_id":"{}".format(pid)}}})
        if result ['hits']['hits'] != []:
            result = result['hits']['hits'][0]['_source']['score']
            result = int(result)
            #print("find_existing_score Function: ", result)
            return result
        else: return False
    except Exception as e:
        print(str(e))
        return False

In [70]:
def acceptable(data):
    if len(data.split(' ')) > 50 or len(data) < 1:
        return False
    elif len(data) > 1000:
        return False
    elif data == '[deleted]':
        return False
    elif data == '[removed]':
        return False
    else:
        return True

In [71]:
def sql_insert_replace_comment(commentid,parentid,parent,comment,subreddit,time,score):
    try:
        #sql = """UPDATE parent_reply SET parent_id = ?, comment_id = ?, parent = ?, comment = ?, \
        #subreddit = ?, unix = ?, score = ? WHERE parent_id =?;""".format(parentid, commentid, 
        #parent, comment, subreddit, int(time), score, parentid)
        pids = []
        m1 = {
            "query":{
                "match" : {"parent_id": '{}'.format(parentid)}
            }
        }
        R1 = els.search(index='redditdata', body=m1)
        if R1['hits']['hits'] != []:
            pids.append(R1['hits']['hits'][0]['_id'])
        else:
            print("Error retreiving ids.", R1)
        
        m2 = {
            "doc" : {"parent_id": '{0}'.format(parentid), "comment_id": '{}'.format(commentid),
                    "parent": '{}'.format(parent), "comment": '{}'.format(comment),
                    "subreddit": '{}'.format(subreddit), "unix": int(time), 
                    "score": score}
        }
        for eachID in pids:
            try:
                els.update(index='redditdata', doc_type='dbalter',id='{}'.format(eachID), body=m2)
            except Exception as e:
                print("Error updating data.", e)
    except Exception as e:
        print('sql_insert_replace_comment Function',str(e))

In [72]:
def sql_insert_has_parent(commentid,parentid,parent,comment,subreddit,time,score,givenID):
    try:
        #sql = """INSERT INTO parent_reply (parent_id, comment_id, parent, comment, subreddit, unix, score) 
        #VALUES ("{}","{}","{}","{}","{}",{},{});""".format(parentid, commentid, parent, comment, subreddit, int(time), score)
        m = {
            "parent_id": '{}'.format(parentid),
            "comment_id": '{}'.format(commentid),
            "parent": '{}'.format(parent),
            "comment": '{}'.format(comment),
            "subreddit": '{}'.format(subreddit),
            "unix": int(time),
            "score": score
        }
        R1 = els.index(index='redditdata', doc_type='dbalter', id='{}'.format(givenID), body=m)
    except Exception as e:
        print('sql_insert_has_parent',str(e))

In [73]:
def sql_insert_no_parent(commentid,parentid,comment,subreddit,time,score,givenID):
    try:
        #sql = """INSERT INTO parent_reply (parent_id, comment_id, comment, subreddit, unix, score) 
        #VALUES ("{}","{}","{}","{}",{},{});""".format(parentid, commentid, comment, subreddit, int(time), score)
        m = {
            "parent_id": '{}'.format(parentid),
            "comment_id": '{}'.format(commentid),
            "parent" : '{}'.format('NULL'),
            "comment": '{}'.format(comment),
            "subreddit": '{}'.format(subreddit),
            "unix": int(time),
            "score": score
        }
        R1 = els.index(index='redditdata', doc_type='dbalter', id='{}'.format(givenID), body=m)
    except Exception as e:
        print('sql_insert_no_parent',str(e))

In [99]:
import linecache
import sys

def PrintException():
    exc_type, exc_obj, tb = sys.exc_info()
    f = tb.tb_frame
    lineno = tb.tb_lineno
    filename = f.f_code.co_filename
    linecache.checkcache(filename)
    line = linecache.getline(filename, lineno, f.f_globals)
    print ('EXCEPTION IN ({}, LINE {} "{}"): {}'.format(filename, lineno, line.strip(), exc_obj))


In [75]:
row_counter = 0
paired_rows = 0
givenID = 1
with open('Data',encoding='utf8', errors='ignore')as f:
    try:
        for row in f:
            row_counter += 1
            row = json.loads(row)
            parent_id = row['parent_id']
            body = format_data(row['body'])
            created_utc = row['created_utc']
            score = row['score']
            comment_id = row['name']
            subreddit = row['subreddit']
            parent_data = find_parent(parent_id)
            if score >= 2:
                existing_comment_score = find_existing_score(parent_id)
                if existing_comment_score:
                    if score > existing_comment_score:
                        if acceptable(body):
                            sql_insert_replace_comment(comment_id,parent_id,parent_data,body,subreddit,created_utc,score)
                else:
                    if acceptable(body):
                        if parent_data:
                            sql_insert_has_parent(comment_id,parent_id,parent_data,body,subreddit,created_utc,score, givenID)
                            paired_rows += 1
                            givenID += 1
                        else:
                            sql_insert_no_parent(comment_id,parent_id,body,subreddit,created_utc,score, givenID)
                            givenID += 1

            if row_counter % 100000 == 0:
                    print('Total Rows Read: {}, Paired Rows: {}, Time: {}'.format(row_counter, paired_rows, str(datetime.now())))
    except Exception as e:
        print("main", e)
        PrintException()

# Converting Index to Files.

In [96]:
#df = pd.concat([pd.Series(d) for d in dicts], axis=1).fillna(0).T
#df

Unnamed: 0,parent_id,comment_id,parent,comment,subreddit,unix,score
0,erfge,bebae,rgjuj,etb,ghr,244,244
1,rgjuj,wref,adfa,ghr,rgjuj,361,361
2,ghr,adfa,ghr,bebae,rgjuj,366,366
3,wref,ghr,etb,ghr,rgjuj,370,370
4,erfge,adfa,beb,beb,wref,478,478
5,etb,erfge,beb,bsdthrt,erfge,505,505
6,bsdthrt,erfge,adfa,wref,beb,508,508
7,beb,erfge,bsdthrt,ghr,adfa,510,510
8,bebae,bebae,adfa,ghr,beb,627,627
9,ghr,etb,ghr,adfa,wref,638,638


In [100]:
from elasticsearch import Elasticsearch
els = Elasticsearch()
import pandas as pd
import linecache
import sys

def PrintException():
    exc_type, exc_obj, tb = sys.exc_info()
    f = tb.tb_frame
    lineno = tb.tb_lineno
    filename = f.f_code.co_filename
    linecache.checkcache(filename)
    line = linecache.getline(filename, lineno, f.f_globals)
    print ('EXCEPTION IN ({}, LINE {} "{}"): {}'.format(filename, lineno, line.strip(), exc_obj))



limit = 5000
last_unix = 0
cur_length = limit
counter = 0
test_done = False
try:
    while cur_length == limit:
        m = {
       "query": {
         "bool": {
           "must_not": [
             {
               "match": {
               "score": 0
             }
             },
             {
               "match": {
                 "parent": "NULL"
               }
             }
           ],
           "filter": {
             "range": {
               "unix": {
                 "gt": last_unix
               }
             }
           }
         }
       }, 
        "sort": [
            {
                "unix": {
                    "order" : "asc"
                }
            }
        ]
        }
        Res = els.search(index='redditdata', body=m)
        dfdicts = []
        for i in range(len(Res['hits']['hits'])):
            dfdicts.append(Res['hits']['hits'][i]['_source'])
        df = pd.concat([pd.Series(d) for d in dfdicts], axis=1).fillna(0).T
        last_unix = df.tail(1)['unix'].values[0]
        cur_length = len(df)

        if not test_done:
            with open('test.from','a', encoding='utf8') as f:
                for content in df['parent'].values:
                    f.write(content+'\n')

            with open('test.to','a', encoding='utf8') as f:
                for content in df['comment'].values:
                    f.write(str(content)+'\n')

            test_done = True

        else:
            with open('train.from','a', encoding='utf8') as f:
                for content in df['parent'].values:
                    f.write(content+'\n')

            with open('train.to','a', encoding='utf8') as f:
                for content in df['comment'].values:
                    f.write(str(content)+'\n')

        counter += 1
        if counter % 20 == 0:
            print(counter*limit,'rows completed so far')
except Exception as e:
    print(e)
    PrintException()