In [4]:
import os
import json
import sys
import csv
import zstandard
from datetime import datetime
import logging.handlers

In [33]:
# put the path to the input file
input_file = r"C:\Users\HP\prj\bsherin\learning-trajectories\reddit\a2c-data\ApplyingToCollege_submissions.zst"
# put the name or path to the output file. The file extension from below will be added automatically
output_file = r"C:\Users\HP\prj\bsherin\learning-trajectories\reddit-outputs\all-a2c-output-text-users-dates-csv"

In [1]:
input_file = "/Users/brucesherin/PycharmProjects/learning-trajectories/subreddits/ApplyingToCollege_submissions.zst"
output_file = "/Users/brucesherin/PycharmProjects/learning-trajectories/subreddits/all-a2c-output-text-users-dates-csv"

In [21]:
# the format to output in, pick from the following options
#   zst: same as the input, a zstandard compressed ndjson file. Can be read by the other scripts in the repo
#   txt: an ndjson file, which is a text file with a separate json object on each line. Can be opened by any text editor
#   csv: a comma separated value file. Can be opened by a text editor or excel
# WARNING READ THIS: if you use txt or csv output on a large input file without filtering 
# out most of the rows, the resulting file will be extremely large. 
# Usually about 7 times as large as the compressed input file
output_format = "txt"
# override the above format and output only this field into a text file, one per line. Useful if you want to make a list of authors or ids. See the examples below
# any field that's in the dump is supported, but useful ones are
#   author: the username of the author
#   id: the id of the submission or comment
#   link_id: only for comments, the fullname of the submission the comment is associated with
#   parent_id: only for comments, the fullname of the parent of the comment. Either another comment or the submission if it's top level
desired_field1 = "title"
desired_field2 = "selftext"
desired_field3 = "author"
desired_field4 = "created_utc"
# the fields in the file are different depending on whether it has comments or submissions. If we're writing a csv, we need to know which fields to write.
# The filename from the torrent has which type it is, but you'll need to change this if you removed that from the filename
is_submission = "submission" in input_file

In [23]:
# only output items between these two dates
from_date = datetime.strptime("2021-01-01", "%Y-%m-%d")
to_date = datetime.strptime("2022-01-01", "%Y-%m-%d")

In [6]:
field = "title"
values = []
# if you have a long list of values, you can put them in a file and put the filename here. If set this overrides the value list above
# if this list is very large, it could greatly slow down the process
values_file = None
exact_match = True

In [7]:
# sets up logging to the console as well as a file
log = logging.getLogger("bot")
log.setLevel(logging.INFO)
log_formatter = logging.Formatter('%(asctime)s - %(levelname)s: %(message)s')
log_str_handler = logging.StreamHandler()
log_str_handler.setFormatter(log_formatter)
log.addHandler(log_str_handler)
if not os.path.exists("logs"):
    os.makedirs("logs")
log_file_handler = logging.handlers.RotatingFileHandler(os.path.join("logs", "bot.log"), maxBytes=1024*1024*16, backupCount=5)
log_file_handler.setFormatter(log_formatter)
log.addHandler(log_file_handler)

In [8]:
def write_line_zst(handle, line):
    handle.write(line.encode('utf-8'))
    handle.write("\n".encode('utf-8'))

In [9]:
def write_line_json(handle, obj):
    handle.write(json.dumps(obj))
    handle.write("\n")

In [10]:
def write_line_single(handle, obj, field):
    if field in obj:
        handle.write(str(obj[field]))
    else:
        log.info(f"{field} not in object {obj['id']}")
    handle.write("\n")

In [11]:
def write_line_csv(writer, obj, is_submission):
    output_list = []
    output_list.append(str(obj['score']))
    output_list.append(datetime.fromtimestamp(float(obj['created_utc'])).strftime("%Y-%m-%d"))
    if is_submission:
        output_list.append(obj['title'])
    output_list.append(f"u/{obj['author']}")
    output_list.append(f"https://www.reddit.com{obj['permalink']}")
    if is_submission:
        if obj['is_self']:
            if 'selftext' in obj:
                output_list.append(obj['selftext'])
            else:
                output_list.append("")
        else:
            output_list.append(obj['url'])
    else:
        output_list.append(obj['body'])
    writer.writerow(output_list)


In [12]:
def read_and_decode(reader, chunk_size, max_window_size, previous_chunk=None, bytes_read=0):
    chunk = reader.read(chunk_size)
    bytes_read += chunk_size
    if previous_chunk is not None:
        chunk = previous_chunk + chunk
    try:
        return chunk.decode()
    except UnicodeDecodeError:
        if bytes_read > max_window_size:
            raise UnicodeError(f"Unable to decode frame after reading {bytes_read:,} bytes")
        log.info(f"Decoding error with {bytes_read:,} bytes, reading another chunk")
        return read_and_decode(reader, chunk_size, max_window_size, chunk, bytes_read)

In [13]:
def read_lines_zst(file_name):
    with open(file_name, 'rb') as file_handle:
        buffer = ''
        reader = zstandard.ZstdDecompressor(max_window_size=2**31).stream_reader(file_handle)
        while True:
            chunk = read_and_decode(reader, 2**27, (2**29) * 2)

            if not chunk:
                break
            lines = (buffer + chunk).split("\n")

            for line in lines[:-1]:
                yield line.strip(), file_handle.tell()

            buffer = lines[-1]

        reader.close()

In [24]:
if __name__ == "__main__":
    # if desired_field1 and desired_field2 and desired_field3 and desired_field4:
    #     log.info("Single field output mode, changing output file format to txt")
    #     output_format = "txt"
    output_path = f"{output_file}.{output_format}"

    writer = None
    if output_format == "zst":
        log.info("Output format set to zst")
        handle = zstandard.ZstdCompressor().stream_writer(open(output_path, 'wb'))
    elif output_format == "txt":
        log.info("Output format set to txt")
        handle = open(output_path, 'w', encoding='UTF-8')
    elif output_format == "csv":
        log.info("Output format set to csv")
        handle = open(output_path, 'w', encoding='UTF-8', newline='')
        writer = csv.writer(handle)
    else:
        log.error(f"Unsupported output format {output_format}")
        sys.exit()

    if values_file:
        values = []
        with open(values_file, 'r') as values_handle:
            for value in values_handle:
                values.append(value.strip().lower())
        log.info(f"Loaded {len(values)} from values file")
    else:
        values = [value.lower() for value in values]  # convert to lowercase
        
    file_size = os.stat(input_file).st_size
    file_bytes_processed = 0
    created = None
    matched_lines = 0
    bad_lines = 0
    total_lines = 0
    for line, file_bytes_processed in read_lines_zst(input_file):
        total_lines += 1
        if total_lines % 100000 == 0:
            log.info(f"{created.strftime('%Y-%m-%d %H:%M:%S')} : {total_lines:,} : {matched_lines:,} : {bad_lines:,} : {file_bytes_processed:,}:{(file_bytes_processed / file_size) * 100:.0f}%")

        try:
            obj = json.loads(line)
            created = datetime.utcfromtimestamp(int(obj['created_utc']))

            if created < from_date:
                continue
            if created > to_date:
                continue

            field_value = obj[field].lower()
            matched = False
            if not values:
                if output_format == "zst":
                    write_line_zst(handle, line)
                elif output_format == "csv":
                    write_line_csv(writer, obj, is_submission)
                elif output_format == "txt":
                    if desired_field1 and desired_field2 and desired_field3 and desired_field4:
                        write_line_single(handle, obj, desired_field1)
                        write_line_single(handle, obj, desired_field2)
                        write_line_single(handle, obj, desired_field3)
                        write_line_single(handle, obj, desired_field4)
                    else:
                        write_line_json(handle, obj)
            for value in values:
                if exact_match:
                    if value == field_value:
                        matched = True
                        break
                else:
                    if value in field_value:
                        matched = True
                        break
                if matched:
                    matched_lines += 1
                    if output_format == "zst":
                        write_line_zst(handle, line)
                    elif output_format == "csv":
                        write_line_csv(writer, obj, is_submission)
                    elif output_format == "txt":
                        if desired_field1 and desired_field2 and desired_field3 and desired_field4:
                            write_line_single(handle, obj, desired_field1)
                            write_line_single(handle, obj, desired_field2)
                            write_line_single(handle, obj, desired_field3)
                            write_line_single(handle, obj, desired_field4)
                        else:
                            write_line_json(handle, obj)
        except (KeyError, json.JSONDecodeError) as err:
            bad_lines += 1
            
    if matched_lines == 0:
        log.info("No matching lines found.")
        if output_format == "txt":
            handle.write("No matching lines found.\n")

    handle.close()
    log.info(f"Complete : {total_lines:,} : {matched_lines:,} : {bad_lines:,}")

2023-08-21 12:16:57,835 - INFO: Output format set to txt
2023-08-21 12:16:59,138 - INFO: 2018-07-17 02:59:31 : 100,000 : 0 : 0 : 29,229,725:24%
2023-08-21 12:17:00,521 - INFO: 2019-07-28 19:04:25 : 200,000 : 0 : 0 : 41,157,550:33%
2023-08-21 12:17:02,164 - INFO: 2020-04-29 13:15:36 : 300,000 : 0 : 0 : 62,391,700:51%
2023-08-21 12:17:03,916 - INFO: 2021-01-19 02:47:12 : 400,000 : 0 : 0 : 82,184,025:67%
2023-08-21 12:17:06,147 - INFO: 2021-11-25 05:58:09 : 500,000 : 0 : 0 : 101,845,275:83%
2023-08-21 12:17:08,368 - INFO: 2022-10-28 15:32:54 : 600,000 : 0 : 0 : 121,637,600:99%
2023-08-21 12:17:08,754 - INFO: No matching lines found.
2023-08-21 12:17:08,755 - INFO: Complete : 620,963 : 0 : 0
