In [1]:
import zstandard
import os
import json
import sys
from datetime import datetime
import logging.handlers


log = logging.getLogger("bot")
log.setLevel(logging.DEBUG)
log.addHandler(logging.StreamHandler())


def read_and_decode(reader, chunk_size, max_window_size, previous_chunk=None, bytes_read=0):
	chunk = reader.read(chunk_size)
	bytes_read += chunk_size
	if previous_chunk is not None:
		chunk = previous_chunk + chunk
	try:
		return chunk.decode()
	except UnicodeDecodeError:
		if bytes_read > max_window_size:
			raise UnicodeError(f"Unable to decode frame after reading {bytes_read:,} bytes")
		log.info(f"Decoding error with {bytes_read:,} bytes, reading another chunk")
		return read_and_decode(reader, chunk_size, max_window_size, chunk, bytes_read)


def read_lines_zst(file_name):
	with open(file_name, 'rb') as file_handle:
		buffer = ''
		reader = zstandard.ZstdDecompressor(max_window_size=2**31).stream_reader(file_handle)
		while True:
			chunk = read_and_decode(reader, 2**27, (2**29) * 2)

			if not chunk:
				break
			lines = (buffer + chunk).split("\n")

			for line in lines[:-1]:
				yield line, file_handle.tell()

			buffer = lines[-1]

		reader.close()


In [2]:
file_path = "/data/reddit/submissions/RS_2021-11.zst"
file_size = os.stat(file_path).st_size
file_lines = 0
file_bytes_processed = 0
created = None
bad_lines = 0
selected_lines = []

try:
	for line, file_bytes_processed in read_lines_zst(file_path):
		try:
			obj = json.loads(line)
			created = datetime.utcfromtimestamp(int(obj['created_utc']))
			if obj["author"] in {"miaou_dubois", "black-rose-petal", "goddessmoneta", "faegoddess333", "sexyninja_", "Keruimin"}:
				print(obj)
				selected_lines.append(obj)
		except (KeyError, json.JSONDecodeError) as err:
			bad_lines += 1
		file_lines += 1
		if file_lines % 100000 == 0:
			log.info(f"{created.strftime('%Y-%m-%d %H:%M:%S')} : {file_lines:,} : {bad_lines:,} : {file_bytes_processed:,}:{(file_bytes_processed / file_size) * 100:.0f}%")
except Exception as err:
	log.info(err)

log.info(f"Complete : {file_lines:,} : {bad_lines:,}")


2021-11-01 02:33:48 : 100,000 : 0 : 29,885,100:0%
2021-11-01 05:25:04 : 200,000 : 0 : 58,590,525:1%
2021-11-01 09:06:56 : 300,000 : 0 : 86,640,575:1%
2021-11-01 12:19:36 : 400,000 : 0 : 113,904,175:1%
2021-11-01 14:38:56 : 500,000 : 0 : 142,085,300:2%
2021-11-01 16:40:57 : 600,000 : 0 : 170,004,275:2%
2021-11-01 18:38:35 : 700,000 : 0 : 198,185,400:3%
2021-11-01 20:35:13 : 800,000 : 0 : 216,666,975:3%
2021-11-01 22:36:18 : 900,000 : 0 : 244,585,950:3%
2021-11-02 00:47:59 : 1,000,000 : 0 : 272,767,075:4%
2021-11-02 03:08:45 : 1,100,000 : 0 : 300,554,975:4%
2021-11-02 06:01:19 : 1,200,000 : 0 : 328,080,725:4%
2021-11-02 09:39:36 : 1,300,000 : 0 : 346,431,225:5%
2021-11-02 12:41:01 : 1,400,000 : 0 : 373,956,975:5%
2021-11-02 14:52:56 : 1,500,000 : 0 : 401,744,875:5%
2021-11-02 16:49:57 : 1,600,000 : 0 : 430,188,150:6%
2021-11-02 18:46:40 : 1,700,000 : 0 : 458,631,425:6%
2021-11-02 20:43:38 : 1,800,000 : 0 : 477,637,300:6%
2021-11-02 22:44:57 : 1,900,000 : 0 : 505,949,500:7%
2021-11-03 00:

KeyboardInterrupt: 