In [3]:
import sys
sys.path.append('../')
import os
from data_partitioner import *
from parallel_tagging import *
from biberplus.tagger import load_config
from biberplus.tagger.tagger_utils import load_tokenizer

INFO:numexpr.utils:Note: detected 160 virtual cores but NumExpr set to maximum of 64, check "NUMEXPR_MAX_THREADS" environment variable.
INFO:numexpr.utils:Note: NumExpr detected 160 cores but "NUMEXPR_MAX_THREADS" not set, so enforcing safe limit of 8.
INFO:numexpr.utils:NumExpr defaulting to 8 threads.


In [2]:
file_name = "RC_2020-05.gz"

input_path = os.path.join('/shared/4/datasets/long-reddit/', file_name)
partition_dir = os.path.join('/shared/3/projects/hiatus/tagged_data/partitions/reddit', file_name.split('.')[0]) 
tag_dir = os.path.join('/shared/3/projects/hiatus/tagged_data/partitions/tagged-reddit', file_name.split('.')[0])
output_dir= '/shared/3/projects/hiatus/tagged_data/long-reddit/'
    
# ensures all directories will exist
for directory in [partition_dir, tag_dir, output_dir]:
    os.makedirs(directory, exist_ok=True)

# initialize author subreddit dictionary
author_subreddit_counts = {}


## Partition the file into 100 parts

In [None]:
%%time
partition_file(input_path, partition_dir, chunks=100)

## Tag each partition with 1 CPU 

Set nice value low so we don't hog the server

**Tagger config**

In [None]:
config = load_config()
config.update({'use_gpu': False, 
               'biber': True,
               'binary_tags': True, 
               'function_words': True,
               'token_normalization': 100})
tokenizer = load_tokenizer(use_gpu=False)
config

defaultdict(<class 'int'>, {13651: 38, 12000: 16, 13000: 44, 13632: 1})


In [None]:
%%time
import warnings

# Suppress future warnings
warnings.simplefilter(action='ignore', category=FutureWarning)

tag_partitions(config,
               input_directory=partition_dir,
               output_directory=tag_dir,
               num_workers=101,
               post_counts=author_subreddit_counts,
               default_niceness=0
               )

In [4]:
# utilities:

# Counts the number of tagged entries across each partition
# from collections import defaultdict
# counts = defaultdict(int)
# for i in range(1, 103):
#     try:
#         counts[count_lines(tag_dir + f"/partition-{i}.jsonl-tagged.gz")] += 1
#     except Exception:
#         continue
# print(counts)

def print_stats(post_counts):
    total_posts = 0
    authors_over_5 = 0
    authors_over_10 = 0
    unique_subs = set()

    for author, subs in post_counts.items():
        author_posts = sum(subs.values())
        total_posts += author_posts
        unique_subs.update(subs.keys())
        
        if author_posts > 5:
            authors_over_5 += 1
        if author_posts > 10:
            authors_over_10 += 1

    total_authors = len(post_counts)
    avg_posts = float(total_posts) / total_authors if total_authors else 0.0

    print(f"Total number of authors: {total_authors}")
    print(f"Total number of posts: {total_posts}")
    print(f"Average number of posts per author: {avg_posts:.2f}")
    print(f"Total number of authors with over 5 total posts: {authors_over_5}")
    print(f"Total number of authors with over 10 total posts: {authors_over_10}")
    print(f"Total number of unique subreddits: {len(unique_subs)}")
    
# Individual partition
# print_stats(author_subreddit_counts) 

#All partitions
# import csv
# import os
# from collections import defaultdict

# def read_tsv_to_dict(file_path):
#     # Function to read a single TSV and update the main dictionary
#     with open(file_path, 'r', newline='') as f:
#         reader = csv.DictReader(f, delimiter='\t')
#         for row in reader:
#             author = row['author']
#             subreddit = row['subreddit']
#             post_count = int(row['post_count'])
#             # If author exists in the dictionary, update their count for the subreddit
#             if author in aggregated_data:
#                 if subreddit in aggregated_data[author]:
#                     aggregated_data[author][subreddit] += post_count
#                 else:
#                     aggregated_data[author][subreddit] = post_count
#             else:
#                 # If author does not exist, create new nested dictionary
#                 aggregated_data[author] = {subreddit: post_count}

# def process_directory(directory):
#     # Function to process each file in the directory
#     for filename in os.listdir(directory):
#         if filename.endswith('.tsv'):
#             file_path = os.path.join(directory, filename)
#             read_tsv_to_dict(file_path)

# # Main dictionary to hold all data
# aggregated_data = defaultdict(dict)

# # Path to the directory containing the TSV files
# directory_path = '/shared/3/projects/hiatus/tagged_data/long-reddit'
# process_directory(directory_path)

# # Now aggregated_data contains all the combined data
# print_stats(aggregated_data)


Total number of authors: 2011384
Total number of posts: 10666573
Average number of posts per author: 5.30
Total number of authors with over 5 total posts: 283038
Total number of authors with over 10 total posts: 142742
Total number of unique subreddits: 71749


In [None]:
%%time
output_tsv = output_dir + file_name.split('.')[0] + '-counts.tsv'
# Write the post counts to a TSV file
write_to_tsv(author_subreddit_counts, output_tsv)

## Join the partitioned files

In [None]:
join_tagged_files(input_directory=tag_dir,
                  output_file=os.path.join(output_dir, file_name))

## Delete the temp directories

In [None]:
delete_partitioned_files(partition_dir)
delete_partitioned_files(tag_dir)