In [1]:
import zstandard as zstd
import os
import json
import sys
import csv
from datetime import datetime
import logging.handlers
import traceback
import pandas as pd
import io

In [4]:
# this is an example of iterating over all zst files in a single folder,
# decompressing them and reading the created_utc field to make sure the files
# are intact. It has no output other than the number of lines

log = logging.getLogger("bot")
log.setLevel(logging.DEBUG)
log.addHandler(logging.StreamHandler())


def read_and_decode(reader, chunk_size, max_window_size, previous_chunk=None, bytes_read=0):
	chunk = reader.read(chunk_size)
	bytes_read += chunk_size
	if previous_chunk is not None:
		chunk = previous_chunk + chunk
	try:
		return chunk.decode()
	except UnicodeDecodeError:
		if bytes_read > max_window_size:
			raise UnicodeError(f"Unable to decode frame after reading {bytes_read:,} bytes")
		log.info(f"Decoding error with {bytes_read:,} bytes, reading another chunk")
		return read_and_decode(reader, chunk_size, max_window_size, chunk, bytes_read)


def read_lines_zst(file_name):
	with open(file_name, 'rb') as file_handle:
		buffer = ''
		reader = zstd.ZstdDecompressor(max_window_size=2**31).stream_reader(file_handle)
		while True:
			chunk = read_and_decode(reader, 2**27, (2**29) * 2)

			if not chunk:
				break
			lines = (buffer + chunk).split("\n")

			for line in lines[:-1]:
				yield line.strip(), file_handle.tell()

			buffer = lines[-1]

		reader.close()


input_folder = r"F:\reddit data\reddit\submissions"
input_files = []
total_size = 0
for subdir, dirs, files in os.walk(input_folder):
	for filename in files:
		print("Found file:", filename)
		input_path = os.path.join(subdir, filename)
		if input_path.endswith(".zst"):
			file_size = os.stat(input_path).st_size
			total_size += file_size
			input_files.append([input_path, file_size])

log.info(f"Processing {len(input_files)} files of {(total_size / (2**30)):.2f} gigabytes")

total_lines = 0
total_bytes_processed = 0
for input_file in input_files:
	file_lines = 0
	file_bytes_processed = 0
	created = None
	for line, file_bytes_processed in read_lines_zst(input_file[0]):
		obj = json.loads(line)
		created = datetime.utcfromtimestamp(int(obj['created_utc']))
		file_lines += 1
		if file_lines == 1:
			log.info(f"{created.strftime('%Y-%m-%d %H:%M:%S')} : {file_lines + total_lines:,} : 0% : {(total_bytes_processed / total_size) * 100:.0f}%")
		if file_lines % 100000 == 0:
			log.info(f"{created.strftime('%Y-%m-%d %H:%M:%S')} : {file_lines + total_lines:,} : {(file_bytes_processed / input_file[1]) * 100:.0f}% : {(total_bytes_processed / total_size) * 100:.0f}%")
	total_lines += file_lines
	total_bytes_processed += input_file[1]
	log.info(f"{created.strftime('%Y-%m-%d %H:%M:%S')} : {total_lines:,} : 100% : {(total_bytes_processed / total_size) * 100:.0f}%")

log.info(f"Total: {total_lines}")

Processing 3 files of 23.33 gigabytes
Processing 3 files of 23.33 gigabytes
Processing 3 files of 23.33 gigabytes


Found file: RS_2021-05.zst.part
Found file: RS_2021-09.zst.part
Found file: RS_2021-07.zst
Found file: RS_2021-08.zst
Found file: RS_2021-06.zst


2021-07-01 00:00:03 : 1 : 0% : 0%
2021-07-01 00:00:03 : 1 : 0% : 0%
2021-07-01 00:00:03 : 1 : 0% : 0%
2021-07-01 02:30:58 : 100,000 : 0% : 0%
2021-07-01 02:30:58 : 100,000 : 0% : 0%
2021-07-01 02:30:58 : 100,000 : 0% : 0%
2021-07-01 05:12:09 : 200,000 : 1% : 0%
2021-07-01 05:12:09 : 200,000 : 1% : 0%
2021-07-01 05:12:09 : 200,000 : 1% : 0%
2021-07-01 08:32:28 : 300,000 : 1% : 0%
2021-07-01 08:32:28 : 300,000 : 1% : 0%
2021-07-01 08:32:28 : 300,000 : 1% : 0%
2021-07-01 11:51:39 : 400,000 : 2% : 0%
2021-07-01 11:51:39 : 400,000 : 2% : 0%
2021-07-01 11:51:39 : 400,000 : 2% : 0%
Decoding error with 134,217,728 bytes, reading another chunk
Decoding error with 134,217,728 bytes, reading another chunk
Decoding error with 134,217,728 bytes, reading another chunk
2021-07-01 14:20:24 : 500,000 : 2% : 0%
2021-07-01 14:20:24 : 500,000 : 2% : 0%
2021-07-01 14:20:24 : 500,000 : 2% : 0%
2021-07-01 16:27:32 : 600,000 : 2% : 0%
2021-07-01 16:27:32 : 600,000 : 2% : 0%
2021-07-01 16:27:32 : 600,000 : 2% 

KeyboardInterrupt: 

usage: ipykernel_launcher.py [-h] [--output OUTPUT] [--working WORKING]
                             [--field FIELD] [--value VALUE]
                             [--value_list VALUE_LIST] [--processes PROCESSES]
                             [--file_filter FILE_FILTER]
                             [--split_intermediate] [--single_output]
                             [--error_rate ERROR_RATE] [--debug] [--partial]
                             [--regex]
                             input
ipykernel_launcher.py: error: ambiguous option: --f="c:\Users\Anya Yuxin Li\AppData\Roaming\jupyter\runtime\kernel-v3f6ca6d07e25dab1e4da02cbb4aaa30da0cf9baf3.json" could match --field, --file_filter


SystemExit: 2

  warn("To exit: use 'exit', 'quit', or Ctrl-D.", stacklevel=1)
