In [53]:
%pip install zstandard pandas tqdm swifter

huggingface/tokenizers: The current process just got forked, after parallelism has already been used. Disabling parallelism to avoid deadlocks...
	- Avoid using `tokenizers` before the fork if possible
	- Explicitly set the environment variable TOKENIZERS_PARALLELISM=(true | false)
Collecting swifter
  Downloading swifter-1.3.5.tar.gz (490 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m490.6/490.6 kB[0m [31m5.9 MB/s[0m eta [36m0:00:00[0m00:01[0m00:01[0m
[?25h  Preparing metadata (setup.py) ... [?25ldone
Collecting dask[dataframe]>=2.10.0 (from swifter)
  Downloading dask-2023.6.0-py3-none-any.whl (1.2 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m1.2/1.2 MB[0m [31m11.4 MB/s[0m eta [36m0:00:00[0ma [36m0:00:01[0m
Collecting cloudpickle>=0.2.2 (from swifter)
  Downloading cloudpickle-2.2.1-py3-none-any.whl (25 kB)
Collecting partd>=1.2.0 (from dask[dataframe]>=2.10.0->swifter)
  Downloading partd-1.4.0-py3-none-any.whl (18 kB)
Coll

In [54]:
import pandas as pd
import numpy as np
from tqdm import tqdm
import swifter

tqdm.pandas()

In [44]:
# Adapted from https://github.com/Watchful1/PushshiftDumps/blob/master/scripts/single_file.py

import zstandard
import os
import json
import logging.handlers


log = logging.getLogger("bot")
log.setLevel(logging.DEBUG)
log.addHandler(logging.StreamHandler())

def read_and_decode(reader, chunk_size, max_window_size, previous_chunk=None, bytes_read=0):
	chunk = reader.read(chunk_size)
	bytes_read += chunk_size
	if previous_chunk is not None:
		chunk = previous_chunk + chunk
	try:
		return chunk.decode()
	except UnicodeDecodeError:
		if bytes_read > max_window_size:
			raise UnicodeError(f"Unable to decode frame after reading {bytes_read:,} bytes")
		log.info(f"Decoding error with {bytes_read:,} bytes, reading another chunk")
		return read_and_decode(reader, chunk_size, max_window_size, chunk, bytes_read)


def read_lines_zst(file_name):
	with open(file_name, 'rb') as file_handle:
		buffer = ''
		reader = zstandard.ZstdDecompressor(max_window_size=2**31).stream_reader(file_handle)
		while True:
			chunk = read_and_decode(reader, 2**27, (2**29) * 2)

			if not chunk:
				break
			lines = (buffer + chunk).split("\n")

			for line in lines[:-1]:
				yield line, file_handle.tell()

			buffer = lines[-1]

		reader.close()


in_file = '/workspace/data/reddit/submissions/RS_2023-01.zst'
out_file = '/workspace/data/reddit/submissions/RS_2023-01.jsonl'

fields = [
  'id',
  'author',
  'subreddit',
  'title',
  'selftext',
  'created_utc',
  'score',
  'upvote_ratio',
  'removed_by_category',
  'num_comments',
]

file_size = os.stat(in_file).st_size
file_lines = 0
file_bytes_processed = 0
created = None
bad_lines = 0

with open(out_file, 'w') as out:
  for line, file_bytes_processed in read_lines_zst(in_file):    
    file_lines += 1
    if file_lines % 100000 == 0:
      log.info(f"Processed {file_lines} lines ({(file_bytes_processed / file_size) * 100:.0f}%) ({bad_lines} failed)")

    try:
      parsed = json.loads(line)

      if len(parsed['selftext'] or '') < 10:
        continue
      
      # Only keep the fields we want
      obj = {k: parsed[k] for k in fields}
      
      out.write(json.dumps(obj) + '\n')
    except (KeyError, json.JSONDecodeError) as err:
      print(err)

log.info(f"Complete : {file_lines:,} : {bad_lines:,}")

Processed 100000 lines (0%) (0 failed)
Processed 200000 lines (1%) (0 failed)
Processed 300000 lines (1%) (0 failed)
Processed 400000 lines (1%) (0 failed)
Processed 500000 lines (1%) (0 failed)
Processed 600000 lines (2%) (0 failed)
Processed 700000 lines (2%) (0 failed)
Processed 800000 lines (2%) (0 failed)
Processed 900000 lines (3%) (0 failed)
Processed 1000000 lines (3%) (0 failed)
Processed 1100000 lines (3%) (0 failed)
Processed 1200000 lines (3%) (0 failed)
Processed 1300000 lines (4%) (0 failed)
Processed 1400000 lines (4%) (0 failed)
Processed 1500000 lines (4%) (0 failed)
Processed 1600000 lines (4%) (0 failed)
Processed 1700000 lines (5%) (0 failed)
Processed 1800000 lines (5%) (0 failed)
Processed 1900000 lines (5%) (0 failed)
Processed 2000000 lines (6%) (0 failed)
Processed 2100000 lines (6%) (0 failed)
Processed 2200000 lines (6%) (0 failed)
Processed 2300000 lines (6%) (0 failed)
Processed 2400000 lines (7%) (0 failed)
Processed 2500000 lines (7%) (0 failed)
Processed

KeyboardInterrupt: 

In [None]:
import pandas as pd

df = pd.read_json(out_file, lines=True)
df.head()

In [None]:
df['created_utc'] = pd.to_datetime(df['created_utc'], unit='s')

In [None]:
df.to_feather('/workspace/data/reddit/submissions/RS_2023-01.arrow')

In [45]:
df = pd.read_feather('/workspace/data/reddit/submissions/RS_2023-01.arrow')

In [46]:
# Limit to only subreddits with at least 1000 submissions
df = df.groupby('subreddit').filter(lambda x: len(x) > 1000)

df.shape

(2380634, 9)

In [47]:
df['log_score'] = np.log10(df['score'] + 1)

In [65]:
def format_text(row):
  return f"""Title: {row.title}
Subreddit: /r/{row.subreddit}
Author: /u/{row.author}
Posted: {row.created_utc.strftime('%A, %B %d, %I:%M %p')}

Text: {row.selftext}"""

df['formatted_text'] = df.swifter.apply(format_text, axis=1)

Pandas Apply:   0%|          | 0/2380634 [00:00<?, ?it/s]

In [66]:
# Split into train and test based on date. 80% train, 20% test

df = df.sort_values('created_utc')

split_date = df.iloc[int(len(df) * 0.8)]['created_utc']

train_df = df[df['created_utc'] < split_date]
test_df = df[df['created_utc'] >= split_date]

train_df.shape, test_df.shape

((1904507, 12), (476127, 12))

In [75]:
from transformers import AutoTokenizer
from datasets import Dataset, DatasetDict

tokenizer = AutoTokenizer.from_pretrained('microsoft/deberta-v3-base')

def tokenize(batch):
  return tokenizer(batch['formatted_text'], padding='max_length', truncation=True, max_length=512)

def to_dataset(df):
  ds = Dataset.from_pandas(df[['id', 'formatted_text', 'log_score']], preserve_index=False)
  ds = ds.map(tokenize, batched=True, batch_size=1000, remove_columns=['formatted_text'], num_proc=4)
  return ds

test_ds = to_dataset(test_df)
test_ds
# df.iloc[:2]['formatted_text'].astype(str)

Special tokens have been added in the vocabulary, make sure the associated word embeddings are fine-tuned or trained.
Special tokens have been added in the vocabulary, make sure the associated word embeddings are fine-tuned or trained.


Map (num_proc=8):   0%|          | 0/476127 [00:00<?, ? examples/s]

In [None]:
train_df.reset_index(drop=True).to_feather('/workspace/data/reddit/submissions/RS_2023-01-train.arrow')
test_df.reset_index(drop=True).to_feather('/workspace/data/reddit/submissions/RS_2023-01-test.arrow')