In [None]:
import pandas as pd
import glob
import sqlite3

import zstandard
import os
import json
import sys
from datetime import datetime
import logging.handlers

pd.set_option('display.max_columns', 100)

## Filtering to music-related subreddits

In [None]:
subrd_df = pd.read_csv('/mnt/data/public/reddit/subreddits/subreddits_basic.csv', 
                 names=['base10 id', 'reddit base36 id', 'creation epoch', 'subreddit name', 'number of subscribers'])
subrd_df.head()

In [None]:
subrd_df['number of subscribers'] = subrd_df['number of subscribers'].replace('None', '0').astype('float')

search = ['music', 'playlist', 'spotify']
music_subrd = subrd_df[subrd_df['subreddit name'].fillna('').str.contains('|'.join(search),
                             case=False)].sort_values('number of subscribers', ascending=False)

print(f"Number of music-related subreddits: {len(music_subrd)}")
music_subrd.head()

In [None]:
import matplotlib.pyplot as plt
import numpy as np

np.quantile(music_subrd['number of subscribers'], 0.75)

In [None]:
music_subrd = music_subrd[music_subrd['number of subscribers'] >= 40]
music_subrd.tail()

In [None]:
music_subrd[music_subrd.duplicated()]

In [None]:
music_subrd_ls = music_subrd['subreddit name'].tolist()

In [None]:
dbname = 'discogs-reddit.db'
conn = sqlite3.connect(dbname)

music_subrd.to_sql('subreddits', conn, if_exists='append', index=False)

## Reddit Submissions

In [None]:
log = logging.getLogger("bot")
log.setLevel(logging.DEBUG)
log.addHandler(logging.StreamHandler())


def read_lines_zst(file_name):
    """Stream reader of zstd compressed files"""
    with open(file_name, 'rb') as file_handle:
        buffer = ''
        reader = zstandard.ZstdDecompressor(max_window_size=2**31).stream_reader(file_handle)
        while True:
            try:
                chunk = reader.read(2**27).decode()
            except:
                chunk = reader.read(2**27).decode('latin-1')
                            
            if not chunk:
                break
            
            lines = (buffer + chunk).split("\n")

            for line in lines[:-1]:
                yield line, file_handle.tell()

            buffer = lines[-1]
        reader.close()


def pull_submissions(file_path, conn):
    """Main function to pull Reddit post submissions.
    Writes unpacked Reddit files to a table in the 
    SQLite database
    """
    print(f'Pulling data from {file_path}...')
    file_size = os.stat(file_path).st_size
    file_lines = 0
    file_bytes_processed = 0
    created = None
    field = "subreddit"
    value_list = music_subrd_ls.copy() # from subreddits data
    bad_lines = 0
    data = list()

    try:
        for line, file_bytes_processed in read_lines_zst(file_path):
            try:
                obj = json.loads(line)
                created = datetime.utcfromtimestamp(int(obj['created_utc']))
                music_rel_check = obj[field] in value_list
                if music_rel_check:
                    data.append(obj)
            except (KeyError, json.JSONDecodeError) as err:
                bad_lines += 1
            file_lines += 1
            if file_lines % 100_000 == 0:
                log.info(f"{created.strftime('%Y-%m-%d %H:%M:%S')} : {file_lines:,} : {bad_lines:,} : {(file_bytes_processed / file_size) * 100:.0f}%")
    except Exception as err:
        log.info(err)
    
    subm_df = pd.DataFrame(data)
    
    for i in subm_df.columns:
        subm_df[i] = subm_df[i].apply(json.dumps)
    
    file_cols = ['all_awardings', 'allow_live_comments', 'archived', 'author', 'author_created_utc',
                 'author_flair_background_color', 'author_flair_css_class', 'author_flair_richtext',
                 'author_flair_template_id', 'author_flair_text', 'author_flair_text_color',
                 'author_flair_type', 'author_fullname', 'author_patreon_flair', 'author_premium',
                 'awarders', 'banned_by', 'can_gild', 'can_mod_post', 'category', 'content_categories',
                 'contest_mode', 'created_utc', 'discussion_type', 'distinguished', 'domain', 'edited',
                 'gilded', 'gildings', 'hidden', 'hide_score', 'id', 'is_created_from_ads_ui',
                 'is_crosspostable', 'is_meta', 'is_original_content', 'is_reddit_media_domain',
                 'is_robot_indexable', 'is_self', 'is_video', 'link_flair_background_color',
                 'link_flair_css_class', 'link_flair_richtext', 'link_flair_template_id',
                 'link_flair_text', 'link_flair_text_color', 'link_flair_type', 'locked', 'media',
                 'media_embed', 'media_only', 'name', 'no_follow', 'num_comments', 'num_crossposts',
                 'over_18', 'parent_whitelist_status', 'permalink', 'pinned', 'post_hint', 'preview',
                 'pwls', 'quarantine', 'removed_by', 'removed_by_category', 'retrieved_on',
                 'retrieved_utc', 'score', 'secure_media', 'secure_media_embed', 'selftext',
                 'send_replies', 'spoiler', 'stickied', 'subreddit', 'subreddit_id', 'subreddit_name_prefixed',
                 'subreddit_subscribers', 'subreddit_type', 'suggested_sort', 'thumbnail', 'thumbnail_height',
                 'thumbnail_width', 'title', 'top_awarded_type', 'total_awards_received', 'treatment_tags',
                 'upvote_ratio', 'url', 'url_overridden_by_dest', 'view_count', 'whitelist_status', 'wls',
                 'crosspost_parent', 'crosspost_parent_list', 'gallery_data', 'is_gallery', 'media_metadata',
                 'author_cakeday', 'poll_data', 'call_to_action', 'live_audio', 'event_end', 'event_is_live',
                 'event_start']
    
    subm_t_db = pd.DataFrame(columns=file_cols)    
    loop_cols = [x for x in subm_df.columns if x in file_cols]
 
    try:
        subm_t_db = pd.concat([subm_t_db, subm_df[loop_cols]], axis=0, ignore_index=True)
        subm_t_db.to_sql('reddit_submissions', conn, if_exists='append', index=False)

    except Exception as e:
        print(e)
        subm_df.to_sql(f'reddit_submissions_{created.strftime("%Y_%m")}', conn, if_exists='append', index=False)

In [None]:
dbname = 'discogs-reddit.db'

conn = sqlite3.connect(dbname)
file_paths = glob.glob('/mnt/data/public/reddit/submissions/RS_2022*.zst')

for file_path in file_paths:
    pull_submissions(file_path, conn)

In [None]:
dbname = 'discogs-reddit.db'

conn = sqlite3.connect(dbname)

file_cols = ['author', 'author_created_utc', 'author_premium',
            'created_utc', 'distinguished', 'edited',
            'id', 'is_crosspostable', 'is_original_content', 
            'no_follow', 'num_comments', 'num_crossposts',
            'over_18', 'permalink', 'post_hint',
            'removed_by_category', 'score', 'selftext',
            'send_replies', 'spoiler', 'stickied', 'subreddit', 'subreddit_id',
            'subreddit_subscribers', 'subreddit_type','title', 
            'top_awarded_type', 'total_awards_received',
            'upvote_ratio', 'url']

sql_cols = ', '.join(file_cols)

music_subm = pd.read_sql(f'SELECT {sql_cols} FROM reddit_submissions', conn)
music_subm.shape

In [None]:
music_subm.head()

In [None]:
str_cols = ['author', 'permalink', 'post_hint',
            'removed_by_category', 'selftext',
            'subreddit', 'subreddit_id',
            'subreddit_type','title', 
            'top_awarded_type', 'url', 'id']

for i in str_cols:
    music_subm[i] = music_subm[i].apply(json.loads)

In [None]:
num_cols = ['num_comments', 'num_crossposts',
            'score', 'subreddit_subscribers', 'total_awards_received']

for i in num_cols:
    music_subm[i] = music_subm[i].astype('int64')

In [None]:
music_subm['upvote_ratio'] = music_subm['upvote_ratio'].astype('float64')
music_subm['created_utc'] = music_subm['created_utc'].astype('int64').apply(datetime.utcfromtimestamp)
music_subm['author_created_utc'] = music_subm['author_created_utc'].replace('NaN', 0)\
            .astype('float64').apply(datetime.utcfromtimestamp)


music_subm.head()

In [None]:
music_subm.to_sql('reddit_submissions_processed', conn, if_exists='append', index=False)

## Adding discogs tables to db

In [None]:
dbname = 'discogs-reddit.db'
conn1 = sqlite3.connect(dbname)
conn2 = sqlite3.connect('discogs-reddit4.db')

pd.read_sql_query('PRAGMA table_list', conn2)

In [None]:
pd.read_sql_query('PRAGMA table_list', conn1)

In [None]:
releases = pd.read_sql('SELECT * FROM releases', conn2)
print(len(releases))

releases.to_sql('releases', conn1, if_exists='append', index = False)

In [None]:
formats = pd.read_sql('SELECT * FROM formats', conn2)
print(len(formats))

formats.to_sql('formats', conn1, if_exists='append', index = False)

In [None]:
tracks = pd.read_sql('SELECT * FROM tracks', conn2)
print(len(tracks))

tracks.to_sql('tracks', conn1, if_exists='append', index = False)

In [None]:
companies = pd.read_sql('SELECT * FROM companies', conn2)
print(len(companies))

companies.to_sql('companies', conn1, if_exists='append', index = False)

In [None]:
artists = pd.read_sql('SELECT * FROM artists', conn2, chunksize=100_000)

for chunk_ in artists:
    chunk_.to_sql('artists', conn1, if_exists='append', index = False)

In [None]:
labels = pd.read_sql('SELECT * FROM labels', conn2)
print(len(labels))

labels.to_sql('labels', conn1, if_exists='replace', index = False)

In [None]:
pd.read_sql_query('PRAGMA table_list', conn1)