## Test Elasticsearch Manager Class + Subreddit Data Extraction and Insertion to ES
1. Test ES manager basic functionality.
2. Test subreddit extraction and insertion to ES reddit crypto index.


In [1]:
# Import libraries
import os
import sys
import numpy as np
import pandas as pd
from tqdm import tqdm
from datetime import datetime

# Change dir
os.getcwd()
os.chdir("..")  # Change to root dir to detect other local libs

In [2]:
# Import local modules
from utils import timer
from es.manager import ESManager
from data.schema.es_mappings import (
    REDDIT_CRYPTO_INDEX_NAME,
    reddit_crypto_mapping,
    test_reddit_crypto_mapping
) 

### 1. Test Basic ESManager Functionality

In [3]:
# Test Connection
es_conn = ESManager()
es_conn.get_status()

True

In [4]:
# Test Index Creation
TEST_REDDIT_CRYPTO_INDEX_NAME = "test-" + REDDIT_CRYPTO_INDEX_NAME

In [5]:
es_conn.create_index(
    index=TEST_REDDIT_CRYPTO_INDEX_NAME,
    mapping=test_reddit_crypto_mapping
)

In [6]:
es_conn.index_is_exist(index=TEST_REDDIT_CRYPTO_INDEX_NAME)

True

In [8]:
# Test Data: Dummy Reddit Comment and Submission for Testing

@timer
def gen_dummy_reddit_data(num: int = 1000):
    schema = reddit_crypto_mapping["mappings"]["properties"]
    for i in tqdm(range(num)):
        d = {
            "_id": str(i),
            "_index": TEST_REDDIT_CRYPTO_INDEX_NAME,
            "_type": "_doc",
            "_source": {key: "Hello Monkey Zoo Kangeroo" for key in schema}
        }
        yield d

# Gen dummy Data
dummy_data = gen_dummy_reddit_data(888)

Function `gen_dummy_reddit_data` took: 1.1920928955078125e-06 seconds


In [9]:
es_conn.bulk_insert_data(index=TEST_REDDIT_CRYPTO_INDEX_NAME, data=dummy_data)

  0%|          | 0/888 [00:00<?, ?it/s]

100%|██████████| 888/888 [00:00<00:00, 4244.76it/s]


In [10]:
# Test Querying
test_query = {
    "query": {
        "match_all": {}
    }
}

res = es_conn.run_match_query(index=TEST_REDDIT_CRYPTO_INDEX_NAME, query=test_query)
res[:2]

[{'_index': 'test-reddit-crypto',
  '_type': '_doc',
  '_id': '3',
  '_score': 1.0,
  '_source': {'id': 'Hello Monkey Zoo Kangeroo',
   'subreddit': 'Hello Monkey Zoo Kangeroo',
   'create_datetime': 'Hello Monkey Zoo Kangeroo',
   'author': 'Hello Monkey Zoo Kangeroo',
   'full_text': 'Hello Monkey Zoo Kangeroo',
   'type': 'Hello Monkey Zoo Kangeroo',
   'parent_id': 'Hello Monkey Zoo Kangeroo'}},
 {'_index': 'test-reddit-crypto',
  '_type': '_doc',
  '_id': '5',
  '_score': 1.0,
  '_source': {'id': 'Hello Monkey Zoo Kangeroo',
   'subreddit': 'Hello Monkey Zoo Kangeroo',
   'create_datetime': 'Hello Monkey Zoo Kangeroo',
   'author': 'Hello Monkey Zoo Kangeroo',
   'full_text': 'Hello Monkey Zoo Kangeroo',
   'type': 'Hello Monkey Zoo Kangeroo',
   'parent_id': 'Hello Monkey Zoo Kangeroo'}}]

In [11]:
# Test Index Deletion
es_conn.delete_index(index=TEST_REDDIT_CRYPTO_INDEX_NAME)

### 2. Reddit Extraction and Insertion Pipeline Test

In [12]:
# Pull Test Data
from config.reddit_data_cfg import CRYPTO_SUBREDDITS
from data.extract.reddit_extract import *

In [13]:
# Pull out CryptoCurrency
test_subreddit = CRYPTO_SUBREDDITS[6]

# Pull Configs
test_start_date = datetime.strptime("2021-01-14", "%Y-%m-%d")
test_end_date = datetime.strptime("2021-01-19", "%Y-%m-%d")

print(f"""TEST DETAILS:
Subreddit={test_subreddit}
Start Date={test_start_date}
End Date={test_end_date}
Date Range={test_end_date - test_start_date}
""")

TEST DETAILS:
Subreddit=CryptoCurrency
Start Date=2021-01-14 00:00:00
End Date=2021-01-19 00:00:00
Date Range=5 days, 0:00:00



In [15]:
# Test get_all_crypto_subreddit_date
results = get_all_crypto_subreddit_data(subreddit=test_subreddit,
                                        start_date=test_start_date,
                                        end_date=test_end_date)

results_fmt = {test_subreddit: results}

Function `get_all_crypto_subreddit_data` took: 90.1318531036377 seconds


In [None]:
print(f"Number of entries from subreddits: {len(results[test_subreddit[0]])}")

Number of entries from subreddits: 1001


In [None]:
# Test insert_reddit_to_es
insert_reddit_to_es(results)

In [None]:
# Test Insertion by Querying
res = es_conn.run_match_query(index=REDDIT_CRYPTO_INDEX_NAME, query=test_query)
res[:3]

[{'_index': 'reddit-crypto',
  '_type': '_doc',
  '_id': 'gpjKc34BDZWeY8f_1qZO',
  '_score': 1.0,
  '_source': {'id': 't1_gjrvq98',
   'subreddit': 'CryptoCurrency',
   'create_datetime': '2021-01-18T23:59:46+00:00',
   'author': 'afterthefuture',
   'full_text': "good man. keep giving us heads up like these.\n\nI think the volume seems to be low on this break out. I am 50/50 now on this break. Will move my stops even tighter. It doesn't seem to be explosive one.",
   'type': 'comment',
   'parent_id': 't1_gjrv2m9'}},
 {'_index': 'reddit-crypto',
  '_type': '_doc',
  '_id': 'iZjKc34BDZWeY8f_1qZO',
  '_score': 1.0,
  '_source': {'id': 't1_gjrvlgi',
   'subreddit': 'CryptoCurrency',
   'create_datetime': '2021-01-18T23:58:34+00:00',
   'author': 'stop_rebelling',
   'full_text': "Everybody tbh. It was 100k then and now I'm getting emails about 250k lol.",
   'type': 'comment',
   'parent_id': 't1_gjrusnd'}},
 {'_index': 'reddit-crypto',
  '_type': '_doc',
  '_id': 'ipjKc34BDZWeY8f_1qZO',