In [None]:
# References
# ===================

# Web scraping tips for Reddit
# https://www.datacamp.com/tutorial/scraping-reddit-python-scrapy
# https://www.bestproxyreviews.com/reddit-scraper/

# Web scraping
# https://www.learndatasci.com/tutorials/ultimate-guide-web-scraping-w-python-requests-and-beautifulsoup/

# Constructing JSON schema
# https://madplay.github.io/post/multiple-and-conditional-json-schemas-validation-examples

# Official Reddit API JSON structure
# https://github.com/reddit-archive/reddit/wiki/JSON

In [4]:
# Library import
# ===================

import time
from timeit import default_timer as timer
import random

import requests
from bs4 import BeautifulSoup
from selenium.webdriver.chrome.service import Service
from selenium import webdriver
from selenium.webdriver.common.by import By
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC

import os
import shutil
import hashlib
import re

In [5]:
# Constants
# ===================

# For the sake of simplicity, use teh old version of Reddit
BASE_URL = "https://old.reddit.com"
DATA_DIR = "../data/"
IMAGE_DIR = DATA_DIR + "images/" # workdir is /notebook

# Handful of User-agents to minimize the risk of connection block
USER_AGENTS = [
    "Mozilla/5.0 (iPhone; CPU iPhone OS 12_1 like Mac OS X) AppleWebKit/605.1.15 (KHTML, like Gecko) CriOS/71.0.3578.89 Mobile/15E148 Safari/605.1",
    "Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:99.0) Gecko/20100101 Firefox/99.0",
    "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/100.0.4896.60 Safari/537.36 Edg/100.0.1185.29",
    "Mozilla/5.0 (Windows NT 5.1; rv:68.0) Gecko/20100101 Goanna/4.8 Firefox/68.0 PaleMoon/28.10.6a1",
    "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/104.0.0.0 Safari/537.36",
    "Mozilla/5.0 (Linux; Android 10; SM-G996U Build/QP1A.190711.020; wv) AppleWebKit/537.36 (KHTML, like Gecko) Version/4.0 Mobile Safari/537.36",
    "Mozilla/5.0 (Linux; Android 10; Google Pixel 4 Build/QD1A.190821.014.C2; wv) AppleWebKit/537.36 (KHTML, like Gecko) Version/4.0 Chrome/78.0.3904.108 Mobile Safari/537.36",
    "Mozilla/5.0 (iPhone12,1; U; CPU iPhone OS 13_0 like Mac OS X) AppleWebKit/602.1.50 (KHTML, like Gecko) Version/10.0 Mobile/15E148 Safari/602.1",
    "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/42.0.2311.135 Safari/537.36 Edge/12.246",
    "Mozilla/5.0 (X11; CrOS x86_64 8172.45.0) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/51.0.2704.64 Safari/537.36",
    "Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/47.0.2526.111 Safari/537.36",
    "Mozilla/5.0 (X11; Ubuntu; Linux x86_64; rv:15.0) Gecko/20100101 Firefox/15.0.1",
    "Mozilla/5.0 (CrKey armv7l 1.5.16041) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/31.0.1650.0 Safari/537.36",
    "Roku4640X/DVP-7.70 (297.70E04154A)",
    "Mozilla/5.0 (Linux; Android 5.1; AFTS Build/LMY47O) AppleWebKit/537.36 (KHTML, like Gecko) Version/4.0 Chrome/41.99900.2250.0242 Safari/537.36",
    "AppleTV11,1/11.1",
    "Mozilla/5.0 (PlayStation; PlayStation 5/2.26) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/13.0 Safari/605.1.15",
    "Mozilla/5.0 (Windows NT 10.0; Win64; x64; Xbox; Xbox Series X) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/48.0.2564.82 Safari/537.36 Edge/20.02",
]

# List of /r/wallstreetbets moderators, which only provided when user is logged in
LIST_MODERATORS = [
    "OPINION_IS_UNPOPULAR",
    "CHAINSAW_VASECTOMY",
    "WallStreetBot",
    "bawse1",
    "notmikjaash",
    "Plechazunga_",
    "HellzAngelz",
    "Stylux",
    "ThetaGang_wsb",
    "Grumpy-james",
]

In [6]:
# Although Reddit disallows all scraping from all sources (bots)
# We have proceeded here for academic purpose
print(requests.get(BASE_URL + '/robots.txt').text)

User-Agent: *
Disallow: /



In [13]:
# Custom functions
# ===================

# Wait for some random time to avoid IP ban
# Official Reddit API allows 60 calls per minute
# https://github.com/reddit-archive/reddit/wiki/API#rules
def rand_sleep(sec=0):
    time.sleep(sec * (random.random()+1))

# Store images into filesystem instead of database
def save_image(url):
    rand_sleep()

    # restart Tor to get new proxy
    os.system('killall tor > /dev/null')
    os.system('service tor start > /dev/null')

    session = requests.session()
    session.proxies = {
        'http': 'socks5h://127.0.0.1:9050',
        'https': 'socks5h://127.0.0.1:9050'
    }
    resp = session.get(url, stream=True)

    # Extract filename from full url
    file_name = re.search('[^/\\&\?]+\.\w{3,4}(?=([\?&].*$|$))', url).group()
    save_path = IMAGE_DIR + file_name
    with open(save_path, 'wb') as f:
        resp.raw.decode_content=True # force to decompress GZIP or deflate
        shutil.copyfileobj(resp.raw, f) # stream data to file object
    # Rename file as MD5 hash
    new_path = IMAGE_DIR + hash_file(save_path) + '.' + file_name.split('.')[1]
    os.rename(save_path, new_path)
    return new_path

# Hash file to get unique identifier to be stored in DB
def hash_file(file_path):
    BUFFER_SIZE = 65536  # read stuff in 64kb chunks
    md5 = hashlib.md5()

    with open(file_path, 'rb') as f:
        while True:
            data = f.read(BUFFER_SIZE)
            if not data:
                break
            md5.update(data)
    return md5.hexdigest()

In [88]:
class RedditScraper():

    def __init__(self, subreddit="wallstreetbets"):
        self.base_url = BASE_URL
        self.subreddit = subreddit.strip()
        self.url = BASE_URL + "/r/" + self.subreddit
        self.threads = [] # list of dicts
        self.threads_df = None # Pandas DataFrame

    # Cleanup data upon closing instance
    def cleanup(self):
        # Remove all images downloaded
        shutil.rmtree(IMAGE_DIR)
        os.makedirs(IMAGE_DIR)

    # Initiate Selenium and Chorme webdriver
    def init_selenium_driver(self):
        # restart Tor to get new proxy
        os.system('killall tor > /dev/null')
        os.system('service tor start > /dev/null')

        # Brower setup for Selenium
        service = Service(executable_path="/root/chromedriver")
        options = webdriver.ChromeOptions()
        options.add_argument('--headless')
        options.add_argument('--no-sandbox')
        options.add_argument('--disable-dev-shm-usage')
        options.add_argument('user-agent={0}'.format(random.choice(USER_AGENTS)))
        # Used proxy to avoid IP ban
        options.add_argument('--proxy-server=socks5://127.0.0.1:9050')

        driver = webdriver.Chrome(service=service, options=options)
        print("Selenium initiated.")
        return driver

    # Load page source and unfold javascript-powered elements
    def load_page_source(self, driver, url):
        driver.get(url)
        # Unlike time.sleep, continues if webpage is loaded before max time
        # driver.implicitly_wait(5)

        # Collapsed content area should be expanded to get contents
        collapsed_expand_obj = driver.find_elements(
            By.XPATH, 
            "//div[contains(@class, 'expando-button') and contains(@class, 'collapsed')]"
        );
        for expand_button in collapsed_expand_obj:
            # click to expand content area
            expand_button.click()

            # (Explicitly) wait until (javescript-based) content area is fully loaded
            post_obj = expand_button.find_element(By.XPATH, "../..") # select grandparent
            content_obj = post_obj.find_element(By.CLASS_NAME, "expando") # where content is shown
            WebDriverWait(driver, 20).until(EC.visibility_of(content_obj))

        print('BeautifulSoup: page source loaded succesfully.')
        return BeautifulSoup(driver.page_source, "html.parser")

    # Parse post element into data
    def parse_post_object(self, post):
        # Scrape post information
        post_id = post.attrs["id"]
        post_link = post.attrs["data-permalink"]
        # Datetime submitted in format yyyy-MM-ddThh:mm:ss+00:00 (UTC)
        post_time = post.find("time").attrs["datetime"]
        post_title = post.find("a", class_="title").text
        post_domain = post.find("span", class_="domain").a.text

        board_name = post.attrs['data-subreddit']
        board_id = post.attrs['data-subreddit-fullname']
        board_type = post.attrs['data-subreddit-type']

        author_name = post.find("a", class_="author").text
        author_id = post.attrs['data-author-fullname']
        # Moderator list is hard-coded for /r/wallstreetbets
        author_ismod = (author_name in LIST_MODERATORS)
        
        post_likes = post.find("div", attrs={"class": "score unvoted"})
        post_likes = 0 if post_likes.text == "•" else int(post_likes.attrs['title'])
        
        # Number of comments
        post_comments = post.find("a", class_="comments").text.split()[0]
        if post_comments == "comment":
            post_comments = 0
        post_comments = int(post_comments)

        # Flair attached to the post                
        post_flair = post.find("span", class_="linkflairlabel")
        if post_flair:
            post_flair = post_flair.text

        # Awards conveyed to the author
        author_awards = {}
        awards = post.find("span", class_="awardings-bar").find_all("a", class_="awarding-link")
        if awards:
            for award in awards:
                k = award.attrs['data-award-id']
                v = award.attrs['data-count']
                author_awards[k] = v

        # 4 types of post in terms of its content: 
        # text only, (single) video, (multiple) images, (single) image
        post_content = None
        post_media = []
        content_area = post.find("div", class_="expando")

        if content_area:
            # if content loading is not finished, mark it as ERROR
            if content_area.find("span", class_="error"):
                post_content = "ERROR"
            else:
                content_text = content_area.find("div", class_="usertext-body")
                post_content = content_text.text if content_text else None

                # Extract media from content
                is_video = content_area.find("div", class_="video-player")
                is_gallery = content_area.find("div", class_="media-gallery")
                is_image = content_area.find("div", class_="media-preview-content")

                # Skip if content is video
                if is_video:
                    pass
                # if content is multiple images
                elif is_gallery:
                    imgs = content_area.find_all("div", class_="media-preview-content")
                    for img in imgs:
                        # do not store image(file) into database
                        img_path = save_image(img.find("img")["src"])
                        # instead, store file hash as a link to saved file
                        img_hash = hash_file(img_path)
                        post_media.append(img_hash)
                # if content is single image
                elif is_image:
                    img_path = save_image(is_image.find("img")["src"])
                    img_hash = hash_file(img_path)
                    post_media.append(img_hash)

        self.threads.append({
            "post_id": post_id, 
            "link": post_link,
            "time": post_time, 
            "title": post_title, 
            "domain": post_domain,
            "board_name": board_name,
            "board_id": board_id,
            "board_type": board_type,
            "author_name": author_name, 
            "author_id": author_id, 
            "author_ismod": author_ismod,
            "likes": post_likes, 
            "comments": post_comments, 
            "flair": post_flair,
            "author_awards": author_awards,
            "content": post_content,
            "media": post_media,
        })

    # Start scraping up to designated number of items
    def scrape_threads(self, max_item=0):
        # Measures time spent
        start_time = timer()
        # Disguise headers to avoid IP ban
        headers = {
            "User-Agent": random.choice(USER_AGENTS),
            "Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8"    
        }
        driver = self.init_selenium_driver()
        bs = self.load_page_source(driver, self.url)
        driver.quit() # Close browser

        # Get all threads regardless of domain direction (inbound / outbound)
        attrs = {"class": "thing", "id": True}
        page_counter = 0
        post_counter = 0

        # Loop until next page does not exists
        while True:
            page_counter += 1
            for post in bs.find_all("div", attrs=attrs):

                # Skip if the post is already collected
                if post.attrs["id"] in [p["post_id"] for p in self.threads]:
                    continue

                # Skip advertisement posts
                if post.find("span", class_="promoted-tag"):
                    continue

                # Show progress for debug
                post_counter += 1
                if post_counter % 5 == 1:
                    print("Processing post #{} from page #{}...".format(post_counter, page_counter))

                # Parse and store post object
                self.parse_post_object(post)

                # Break if reached target item count
                if (max_item > 0) and (post_counter >= max_item):
                    break

            # For every page request, pause not to reach rate limit
            rand_sleep()

            # Skipped pages afterwards due to IP ban
            if page_counter == 1:
                break

            next_button = bs.find("span", class_="next-button")
            # continue scraping only when next page exists
            if next_button:
                next_page_link = next_button.find("a").attrs['href']
                bs = self.load_page_source(driver, next_page_link)
                driver.quit() # Close browser
            # next button no longer exists after 500 newest posts
            else:
                break

        print(f'Scraping completed - collected (additional) {post_counter} post(s) from {page_counter} page(s) in {timer()-start_time:,.1f} seconds.')
        print(f'Returned {len(self.threads)} records.')
        return self.threads

In [89]:
# Initiate scraper
scraper = RedditScraper(subreddit="wallstreetbets")

In [90]:
# Collect posts
result = scraper.scrape_threads()

Selenium initiated.
BeautifulSoup: page source loaded succesfully.
Processing post #1 from page #1...
Processing post #6 from page #1...
Processing post #11 from page #1...
Processing post #16 from page #1...
Processing post #21 from page #1...
Processing post #26 from page #1...
Scraping completed - collected (additional) 27 post(s) from 1 page(s) in 187.0 seconds.
Returned 27 records.


In [91]:
# Redundant order does not create duplicates
result = scraper.scrape_threads()

Selenium initiated.
BeautifulSoup: page source loaded succesfully.
Scraping completed - collected (additional) 0 post(s) from 1 page(s) in 38.9 seconds.
Returned 27 records.


In [150]:
# Library import
# ===================

import csv
import json
import pandas as pd

import sqlite3
from elasticsearch import Elasticsearch, helpers

In [151]:
OUTPUT_DIR = "../output/"
ELASTIC_SERVER = "http://es01:9200"
NOSQL_MAPPING = "../config/NoSQLmap.json"

In [166]:
"""Inherited RedditScraper and added Database operations"""
class RedditDataManager(RedditScraper):

    # Reset database to initialize project
    def reset_db(self):
        pass

    # Download collected records in CSV format
    def download_csv(self):

        columns_headers = list(self.threads[0].keys())
        file_name = f'{self.subreddit}_{int(time.time())}.csv'

        with open(OUTPUT_DIR + file_name, 'a', encoding='utf-8') as f:
            # used tab delimiter - content contains many commas
            writer = csv.DictWriter(f, quoting=csv.QUOTE_ALL, fieldnames=columns_headers)
            writer.writerow(dict((fn,fn) for fn in columns_headers))
            for thread in self.threads:
                writer.writerow(thread)
        print(f'CSV export completeted: {file_name}')

    # Converts collected records to Pandas DataFrame
    def threads_to_df(self):

        if self.threads:
            df = pd.DataFrame(self.threads)

            # Datatype adjustments for DB upload
            # df['time'] = pd.to_datetime(df['time']) # object to datetime64, not compatible for sqlite
            df.loc[df['author_awards'].apply(len).eq(0), 'author_awards'] = None # replace {} with None
            df['author_awards'] = df['author_awards'].apply(json.dumps) # Serialize dict with JSON
            df.loc[df['media'].apply(len).eq(0), 'media'] = None # replace [] with None
            df['media'] = df['media'].apply(json.dumps) # Serialize list with JSON

            self.threads_df = df
            return df
    
    # Establish connection to NoSQL server
    def sql_connection(self):
        conn = None
        try:
            conn = sqlite3.connect(DATA_DIR + f"sqlite/reddit_scrap.db")
        except Excetpion as e:
            print("Failed to establish SQL connection: ", e)
        return conn

    # Upload collected records into SQL database
    def upload_sql(self):
        conn = self.sql_connection()
        try:
            cur = conn.cursor()
            conn.execute(f"""CREATE TABLE IF NOT EXISTS reddit_thread_{self.subreddit}(
                post_id TEXT PRIMARY KEY,
                link TEXT NOT NULL,
                time TEXT NOT NULL,
                title TEXT NOT NULL,
                domain TEXT,
                board_name TEXT,
                board_id TEXT,
                board_type TEXT,
                author_name TEXT NOT NULL,
                author_id TEXT NOT NULL,
                author_ismod INTEGER NOT NULL,
                likes INTEGER NOT NULL,
                comments INTEGER NOT NULL,
                flair TEXT,
                author_awards TEXT,
                content TEXT,
                media TEXT
            )""")
            # list of tuples needed as cursor.executemary() input
            list_of_tuple = list(self.threads_df.itertuples(index=False, name=None))
            cur.executemany( # Use INSERT IGNORE not to have primary key confliction
                f"""INSERT OR IGNORE INTO reddit_thread_{self.subreddit} 
                VALUES (?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)""",
                list_of_tuple
            )
            conn.commit()
            print(f"SQL upload succeeded: {len(list_of_tuple)} records processed.")
        except Exception as e:
            print("SQL upload failed:", e)
        finally:
            conn.close() # prevents database locked error

    # Execute query on SQL database
    def query_sql(self, sql):
        conn = self.sql_connection()
        try:
            cur = conn.cursor()
            cur.execute(sql)
            
            records = cur.fetchall()
            columns_headers = [cols[0] for cols in cur.description]

            print(f"SQL query succeeded: {len(records)} records fetched.")
            return pd.DataFrame.from_records(data=records, columns=columns_headers)

        except Exception as e:
            print("SQL query failed:", e)
        finally:
            conn.close()

    def drop_sql_table(self, table):
        conn = self.sql_connection()
        try:
            cur = conn.cursor()
            cur.execute(f"DROP table {table};")
            print(f"SQL table dropped: {table}")

        except Exception as e:
            print("SQL table drop failed:", e)
        finally:
            conn.close()

    # Establish connection to NoSQL server
    def nosql_connection(self):
        try:
            es = Elasticsearch(ELASTIC_SERVER)
        except Exception as e:
            print("Failed to establish NoSQL connection: ", e)
        return es        

    # Upload collected records into NoSQL database
    def upload_nosql(self):
        es = self.nosql_connection()
        es_index_name = f"reddit_thread_{self.subreddit}"

        # Loads json schema
        with open(NOSQL_MAPPING, 'r') as j:
            json_schema = json.loads(j.read())

        # Create index with predefined mappings if not exist
        if es.indices.exists(index=es_index_name):
            pass
        else:
            res = es.indices.create(index=es_index_name, **json_schema)
            print(res)

        # Insert data using Elasticsearch bulk API
        list_of_dicts = manager.threads_df.to_dict('records')
        bulk_doc = [
            {
                "_index": es_index_name,
                "_source": a_record
            } for a_record in list_of_dicts
        ]
        res = helpers.bulk(es, bulk_doc)
        print(res)
        es.close()

    # Execute query on NoSQL database
    def query_nosql(self, query):
        es = self.nosql_connection()
        es_index_name = f"reddit_thread_{self.subreddit}"

        res = es.search(index=es_index_name, query=query)
        print(f"Total {res['hits']['total']['value']} records returned.")
        es.close()
        return res['hits']['hits']

    def drop_nosql_index(self, index):
        es = self.nosql_connection()

        res = es.indices.delete(
            index=index, 
            ignore=[400, 404] # ignores error when index does not exist
        )
        print(res)


In [167]:
# Initiate data manager
manager = RedditDataManager(subreddit="wallstreetbets")
# Copy collected data into inherited class
manager.threads = scraper.threads

# Ready to upload to database
data_df = manager.threads_to_df()
data_df.head(3)

Unnamed: 0,post_id,link,time,title,domain,board_name,board_id,board_type,author_name,author_id,author_ismod,likes,comments,flair,author_awards,content,media
0,thing_t3_xa4z97,/r/wallstreetbets/comments/xa4z97/weekend_disc...,2022-09-09T20:00:11+00:00,Weekend Discussion Thread for the Weekend of S...,self.wallstreetbets,wallstreetbets,t5_2th52,public,OPINION_IS_UNPOPULAR,t2_bd6q5,True,103,4789,Weekend Discussion,"{""award_f44611f1-b89e-46dc-97fe-892280b13b82"":...","Read rules, follow Twitter and IG, join Discor...",
1,thing_t3_x4ryjg,/r/wallstreetbets/comments/x4ryjg/most_anticip...,2022-09-03T11:16:01+00:00,Most Anticipated Earnings Releases for the wee...,i.redd.it,wallstreetbets,t5_2th52,public,bigbear0083,t2_eaak0,False,1724,812,Earnings Thread,"{""gid_1"": ""4"", ""award_f44611f1-b89e-46dc-97fe-...",,"[""e823d70122c40c36bc6526b7527f1d54""]"
2,thing_t3_xadm21,/r/wallstreetbets/comments/xadm21/anyone_hirin...,2022-09-10T02:27:59+00:00,Anyone hiring ? So over this. I have a degree ...,i.redd.it,wallstreetbets,t5_2th52,public,Jellyfish_Vegetable,t2_7x5hixx2,False,1426,519,Loss,"{""award_abcdefe4-c92f-4c66-880f-425962d17098"":...",,"[""48bc5978a8c2e71ff7e0a1399586dd81""]"


In [99]:
# Export as CSV
manager.download_csv()

CSV export completeted: wallstreetbets_1662789771.csv


In [137]:
# Upload DataFrame to SQL database
manager.upload_sql()

SQL upload succeeded: 27 records processed.


In [169]:
target_table = f"reddit_thread_{manager.subreddit}"
query_sql = f"SELECT * FROM {target_table}"

query_data = manager.query_sql(query_sql)
query_data.head()

SQL query succeeded: 27 records fetched.


Unnamed: 0,post_id,link,time,title,domain,board_name,board_id,board_type,author_name,author_id,author_ismod,likes,comments,flair,author_awards,content,media
0,thing_t3_xa4z97,/r/wallstreetbets/comments/xa4z97/weekend_disc...,2022-09-09T20:00:11+00:00,Weekend Discussion Thread for the Weekend of S...,self.wallstreetbets,wallstreetbets,t5_2th52,public,OPINION_IS_UNPOPULAR,t2_bd6q5,1,103,4789,Weekend Discussion,"{""award_f44611f1-b89e-46dc-97fe-892280b13b82"":...","Read rules, follow Twitter and IG, join Discor...",
1,thing_t3_x4ryjg,/r/wallstreetbets/comments/x4ryjg/most_anticip...,2022-09-03T11:16:01+00:00,Most Anticipated Earnings Releases for the wee...,i.redd.it,wallstreetbets,t5_2th52,public,bigbear0083,t2_eaak0,0,1724,812,Earnings Thread,"{""gid_1"": ""4"", ""award_f44611f1-b89e-46dc-97fe-...",,"[""e823d70122c40c36bc6526b7527f1d54""]"
2,thing_t3_xadm21,/r/wallstreetbets/comments/xadm21/anyone_hirin...,2022-09-10T02:27:59+00:00,Anyone hiring ? So over this. I have a degree ...,i.redd.it,wallstreetbets,t5_2th52,public,Jellyfish_Vegetable,t2_7x5hixx2,0,1426,519,Loss,"{""award_abcdefe4-c92f-4c66-880f-425962d17098"":...",,"[""48bc5978a8c2e71ff7e0a1399586dd81""]"
3,thing_t3_x2xl6r,/r/wallstreetbets/comments/x2xl6r/wallstreetbe...,2022-09-09T11:27:26+00:00,🔮WallStreetBets Predictions Tournament for Sep...,reddit.com,wallstreetbets,t5_2th52,public,ThetaGang_wsb,t2_amboe4pe,1,27281,131,,"{""gid_1"": ""1"", ""award_f44611f1-b89e-46dc-97fe-...",,
4,thing_t3_x9ye6u,/r/wallstreetbets/comments/x9ye6u/bbby_got_me_...,2022-09-09T15:26:44+00:00,$BBBY got me like,i.redd.it,wallstreetbets,t5_2th52,public,Kieran30803,t2_12t1meg,0,9391,171,Meme,"{""gid_3"": ""1"", ""award_f44611f1-b89e-46dc-97fe-...",,"[""923230d128904d88c16dae27522a7412""]"


In [163]:
manager.upload_nosql()

{'acknowledged': True, 'shards_acknowledged': True, 'index': 'reddit_thread_wallstreetbets'}
(27, [])


In [171]:
query_nosql = {
    "match": {
        "board_name": "wallstreetbets"
    }
}
query_data = manager.query_nosql(query_nosql)
query_data[0]

Total 27 records returned.


{'_index': 'reddit_thread_wallstreetbets',
 '_type': '_doc',
 '_id': 'o-UQJoMBacOpM5JcYy6n',
 '_score': 0.018018505,
 '_source': {'post_id': 'thing_t3_xa4z97',
  'link': '/r/wallstreetbets/comments/xa4z97/weekend_discussion_thread_for_the_weekend_of/',
  'time': '2022-09-09T20:00:11+00:00',
  'title': 'Weekend Discussion Thread for the Weekend of September 10, 2022',
  'domain': 'self.wallstreetbets',
  'board_name': 'wallstreetbets',
  'board_id': 't5_2th52',
  'board_type': 'public',
  'author_name': 'OPINION_IS_UNPOPULAR',
  'author_id': 't2_bd6q5',
  'author_ismod': True,
  'likes': 103,
  'comments': 4789,
  'flair': 'Weekend Discussion',
  'author_awards': '{"award_f44611f1-b89e-46dc-97fe-892280b13b82": "2"}',
  'content': 'Read rules, follow Twitter and IG, join Discord, see ban bets!\nEarnings Thread\n\n',
  'media': 'null'}}

In [162]:
manager.drop_nosql_index(target_table)

{'acknowledged': True}
