In [None]:
# Library
!pip install zstandard



In [None]:
import psutil

# Check Memory
dict(psutil.virtual_memory()._asdict())

{'total': 8589934592,
 'available': 2644443136,
 'percent': 69.2,
 'used': 3992797184,
 'free': 110444544,
 'active': 2501214208,
 'inactive': 2498052096,
 'wired': 1491582976}

# Subreddit and Post Time

In [None]:
import zstandard as zstd
import json

size = 1024 * 1024 * 1

def count_unique_subreddit_ids(file_path, chunk_size=size):
    unique_ids = set()
    with open(file_path, 'rb') as compressed:
        dctx = zstd.ZstdDecompressor(max_window_size=2147483648)
        with dctx.stream_reader(compressed) as reader:
            buffer = ''
            while True:
                chunk = reader.read(chunk_size).decode('utf-8', errors='ignore')
                buffer += chunk
                lines = buffer.split('\n')
                buffer = lines.pop()  # Save incomplete line for next chunk

                for line in lines:
                    if line:
                        try:
                            json_obj = json.loads(line)
                            subreddit_id = json_obj.get("subreddit_id", None)
                            if subreddit_id:
                                unique_ids.add(subreddit_id)
                        except json.JSONDecodeError:
                            continue  # Skip invalid JSON lines

                if not chunk:
                    break

    return len(unique_ids)

file_path = r'D:\Eleveny\RS_2023-03.zst'
count = count_unique_subreddit_ids(file_path)
print(f"Unique subreddit ids: {count}")

# Unique subreddit ids: 742331 (RS_2023-03)

Unique subreddit ids: 742331


# Initializing dataset

In [None]:
import zstandard as zstd
import json
import csv

size = 1024 * 1024 * 1

def decompress_and_process(file_path, output_csv):
    unique_subreddits = set()

    # Decompress the file
    with open(file_path, 'rb') as compressed:
        dctx = zstd.ZstdDecompressor(max_window_size=2147483648)
        with dctx.stream_reader(compressed) as reader:
            buffer = ''
            while True:
                chunk = reader.read(size).decode('utf-8', errors='ignore')
                buffer += chunk
                lines = buffer.split('\n')
                buffer = lines.pop()

                for line in lines:
                    if line:
                        try:
                            json_obj = json.loads(line)
                            subreddit = json_obj.get("subreddit", None)
                            if subreddit:
                                unique_subreddits.add(subreddit)
                        except json.JSONDecodeError:
                            continue

                if not chunk:
                    break

    # Write to CSV
    with open(output_csv, 'w', newline='', encoding='utf-8') as csvfile:
        writer = csv.writer(csvfile)
        writer.writerow(["subreddit", "2016-05"])  # Header row
        for subreddit in unique_subreddits:
            writer.writerow([subreddit, 1])


file_path = r'/Users/ElevenyCHEN/Desktop/Mod_Datasets/RS_2016-05.zst'
output_csv = r'/Users/ElevenyCHEN/Desktop/Mod_Datasets/SubReddit-time.csv'
decompress_and_process(file_path, output_csv)

## Single-core CPU Version

In [None]:
import zstandard as zstd
import json
import csv
from datetime import datetime, timedelta

size = 1024 * 1024 * 1


def generate_file_paths(start_year, start_month, end_year, end_month):
    current = datetime(start_year, start_month, 1)
    end = datetime(end_year, end_month, 1)
    paths = []
    while current >= end:
        file_name = current.strftime("%Y-%m") + ".zst"
        file_path = f'/Users/ElevenyCHEN/Desktop/Mod_Datasets/RS_{file_name}'
        paths.append(file_path)
        current -= timedelta(days=1)
        current = current.replace(day=1)
    return paths

def decompress_and_update(file_paths, existing_csv):
    # Read the existing CSV
    existing_data = {}
    processed_months = set()
    with open(existing_csv, 'r', newline='', encoding='utf-8') as csvfile:
        reader = csv.reader(csvfile)
        headers = next(reader)
        processed_months.update(headers[1:])  # Exclude the first header ('subreddit')
        for row in reader:
            existing_data[row[0]] = row[1:]

    for file_path in file_paths:
        new_month = file_path.split('/')[-1].split('.')[0]  # Extract month-year from file path

        # Skip already processed months
        if new_month in processed_months:
            continue

        unique_subreddits = set()

        # Decompress and process the .zst file
        with open(file_path, 'rb') as compressed:
            dctx = zstd.ZstdDecompressor(max_window_size=2147483648)
            with dctx.stream_reader(compressed) as reader:
                buffer = ''
                while True:
                    chunk = reader.read(size).decode('utf-8', errors='ignore')
                    buffer += chunk
                    lines = buffer.split('\n')
                    buffer = lines.pop()

                    for line in lines:
                        if line:
                            try:
                                json_obj = json.loads(line)
                                subreddit = json_obj.get("subreddit", None)
                                if subreddit:
                                    unique_subreddits.add(subreddit)
                            except json.JSONDecodeError:
                                continue

                    if not chunk:
                        break

        # Update CSV data
        headers.append(new_month)
        for subreddit in unique_subreddits:
            if subreddit not in existing_data:
                # Initialize new subreddit data
                existing_data[subreddit] = ['0'] * (len(headers) - 2)
            existing_data[subreddit].append('1')

    # Write the updated CSV
    with open(existing_csv, 'w', newline='', encoding='utf-8') as csvfile:
        writer = csv.writer(csvfile)
        writer.writerow(["subreddit"] + headers[1:])
        for subreddit, months in existing_data.items():
            # Ensure each row has the correct length
            if len(months) < len(headers) - 1:
                months.extend(['0'] * (len(headers) - 1 - len(months)))
            writer.writerow([subreddit] + months)


# Example usage
file_paths = generate_file_paths(2016, 5, 2016, 1)
existing_csv = r'/Users/ElevenyCHEN/Desktop/Mod_Datasets/SubReddit-time.csv'
decompress_and_update(file_paths, existing_csv)


## Multireading iteration in subreddit existing time

In [None]:
import zstandard as zstd
import json
import csv
import threading
from datetime import datetime, timedelta

size = 1024 * 1024 * 1

# Generate file paths
def generate_file_paths(start_year, start_month, end_year, end_month):
    current = datetime(start_year, start_month, 1)
    end = datetime(end_year, end_month, 1)
    paths = []
    while current >= end:
        file_name = current.strftime("%Y-%m") + ".zst"
        file_path = f'/Users/ElevenyCHEN/Desktop/Mod_Datasets/RS_{file_name}'
        paths.append(file_path)
        current -= timedelta(days=1)
        current = current.replace(day=1)
    return paths

# Worker function to process a single .zst file
def process_file(file_path, existing_data, headers, processed_months, lock):
    print(f"Starting processing file: {file_path} at {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")

    line_count = 0
    update_interval = 1000000  # Update the progress every 1,000,000 lines

    new_month = file_path.split('/')[-1].split('.')[0]

    if new_month not in processed_months:
        unique_subreddits = set()

        with open(file_path, 'rb') as compressed:
            dctx = zstd.ZstdDecompressor(max_window_size=2147483648)
            with dctx.stream_reader(compressed) as reader:
                buffer = ''
                while True:
                    chunk = reader.read(size).decode('utf-8', errors='ignore')
                    buffer += chunk
                    lines = buffer.split('\n')
                    buffer = lines.pop()

                    for line in lines:
                        if line:
                            line_count += 1
                            try:
                                json_obj = json.loads(line)
                                subreddit = json_obj.get("subreddit", None)
                                if subreddit:
                                    unique_subreddits.add(subreddit)
                            except json.JSONDecodeError:
                                continue

                    if not chunk:
                        break

        print(f"Finished processing file: {file_path} at {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")

        with lock:
            headers.add(new_month)
            for subreddit in unique_subreddits:
                if subreddit not in existing_data:
                    existing_data[subreddit] = ['0'] * (len(headers) - 2)
                existing_data[subreddit].append('1')

# Main function to decompress and update
def decompress_and_update(file_paths, existing_csv):
    existing_data = {}
    headers = set(["subreddit"])
    processed_months = set()
    lock = threading.Lock()

    # Read existing CSV
    with open(existing_csv, 'r', newline='', encoding='utf-8') as csvfile:
        reader = csv.reader(csvfile)
        first_row = next(reader)
        headers.update(first_row[1:])
        processed_months.update(first_row[1:])
        for row in reader:
            existing_data[row[0]] = row[1:]

    threads = []
    for file_path in file_paths:
        thread = threading.Thread(target=process_file, args=(file_path, existing_data, headers, processed_months, lock))
        threads.append(thread)
        thread.start()

    # Progress reporting and wait for all threads to complete
    for thread in threads:
        thread.join()
        print(f"Finished processing file: {file_path}")

    # Write the updated CSV
    with open(existing_csv, 'w', newline='', encoding='utf-8') as csvfile:
        writer = csv.writer(csvfile)
        writer.writerow(sorted(list(headers)))
        for subreddit, months in existing_data.items():
            writer.writerow([subreddit] + months)

# Example usage
file_paths = generate_file_paths(2016, 5, 2016, 1)
existing_csv = r'/Users/ElevenyCHEN/Desktop/Mod_Datasets/SubReddit-time.csv'
decompress_and_update(file_paths, existing_csv)

# Filling blank cells

In [None]:
def fill_empty_cells_in_csv(csv_file):
    updated_data = []
    with open(csv_file, 'r', newline='', encoding='utf-8') as csvfile:
        reader = csv.reader(csvfile)
        headers = next(reader)  # Keep headers
        updated_data.append(headers)

        for row in reader:
            # Ensure the row has the correct number of columns
            if len(row) < len(headers):
                row.extend(['0'] * (len(headers) - len(row)))

            # Replace None or empty/whitespace-only cells with '0'
            updated_row = ['0' if cell is None or cell.strip() == '' else cell for cell in row]
            updated_data.append(updated_row)
8
    # Write the updated data back to the CSV
    with open(csv_file, 'w', newline='', encoding='utf-8') as csvfile:
        writer = csv.writer(csvfile)
        writer.writerows(updated_data)

# Example usage
csv_file = r'D:\Eleveny\SubReddit-time.csv'
fill_empty_cells_in_csv(csv_file)

# Getting Subreddit and Deletion

## Single-core CPU Version

In [None]:
import zstandard as zstd
import json
import csv
from datetime import datetime, timedelta
import os

size = 1024 * 1024 * 1

def decompress_and_update(file_paths, input_csv, output_csv):
    # Read the existing data from output CSV
    existing_data = {}
    existing_headers = []
    if os.path.exists(output_csv):
        with open(output_csv, 'r', newline='', encoding='utf-8') as csvfile:
            reader = csv.reader(csvfile)
            existing_headers = next(reader)
            for row in reader:
                existing_data[row[0]] = row[1:]

    # Fill missing data with blanks if new subreddits are found
    subreddits = set(existing_data.keys())

    for file_path in file_paths:
        time_mark = os.path.basename(file_path).split('.')[0].split('_')[1]
        header_prefixes = ["total_posts_", "deleted_posts_", "proportion_", "mod_delete_", "admin_delete_"]
        new_headers = [prefix + time_mark for prefix in header_prefixes]

        # Skip if these headers already exist (data already processed)
        if all(header in existing_headers for header in new_headers):
            continue

        subreddit_metrics = {subreddit: {'total_posts': 0, 'deleted_posts': 0, 'mod_delete': 0, 'admin_delete': 0} for subreddit in subreddits}

        # Decompress and process the .zst file
        with open(file_path, 'rb') as compressed:
            dctx = zstd.ZstdDecompressor(max_window_size=2147483648)
            with dctx.stream_reader(compressed) as reader:
                buffer = ''
                while True:
                    chunk = reader.read(size).decode('utf-8', errors='ignore')
                    buffer += chunk
                    lines = buffer.split('\n')
                    buffer = lines.pop()

                    for line in lines:
                        if line:
                            try:
                                json_obj = json.loads(line)
                                subreddit = json_obj.get("subreddit", None)
                                if subreddit and subreddit in subreddit_metrics:
                                    metrics = subreddit_metrics[subreddit]
                                    metrics['total_posts'] += 1
                                    distinguished = json_obj.get("distinguished", None)
                                    if distinguished == "moderator":
                                        metrics['deleted_posts'] += 1
                                        metrics['mod_delete'] += 1
                                    elif distinguished == "admin":
                                        metrics['deleted_posts'] += 1
                                        metrics['admin_delete'] += 1
                            except json.JSONDecodeError:
                                continue

                    if not chunk:
                        break

        # Update existing_data with new metrics or leave blank
        for subreddit in subreddits:
            metrics = subreddit_metrics.get(subreddit)
            if metrics:
                proportion = metrics['deleted_posts'] / metrics['total_posts'] if metrics['total_posts'] > 0 else 0
                new_data = [metrics['total_posts'], metrics['deleted_posts'], proportion, metrics['mod_delete'], metrics['admin_delete']]
            else:
                new_data = ['', '', '', '', '']

            existing_data[subreddit].extend(new_data)

        # Update headers
        existing_headers.extend(new_headers)

    # Write the updated data to the output CSV
    with open(output_csv, 'w', newline='', encoding='utf-8') as csvfile:
        writer = csv.writer(csvfile)
        writer.writerow(["subreddit"] + existing_headers[1:])
        for subreddit, data in existing_data.items():
            writer.writerow([subreddit] + data)

# Example usage
file_paths = generate_file_paths(2016, 5, 2022, 9)
input_csv = r'D:\Eleveny\SubReddit-time.csv'
output_csv = r'D:\Eleveny\SubReddit-deletion.csv'
decompress_and_update(file_paths, input_csv, output_csv)


## Multireading Version

In [None]:
import zstandard as zstd
import json
import csv
from datetime import datetime, timedelta
import os
import threading

size = 1024 * 1024 * 1

# Generate file paths
def generate_file_paths(start_year, start_month, end_year, end_month):
    current = datetime(start_year, start_month, 1)
    end = datetime(end_year, end_month, 1)
    paths = []
    while current >= end:
        file_name = current.strftime("%Y-%m") + ".zst"
        file_path = f'/Users/ElevenyCHEN/Desktop/Mod_Datasets/RS_{file_name}'
        paths.append(file_path)
        current -= timedelta(days=1)
        current = current.replace(day=1)
    return paths


# Worker function to process a single .zst file
def process_file(file_path, existing_data, subreddit_metrics, headers, lock):
    print(f"Starting processing file: {file_path} at {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
    time_mark = os.path.basename(file_path).split('.')[0].split('_')[1]
    header_prefixes = ["total_posts_", "deleted_posts_", "proportion_", "mod_delete_", "admin_delete_"]
    new_headers = [prefix + time_mark for prefix in header_prefixes]

    line_count = 0
    update_interval = 1000000  # Update the progress every 1,000,000 lines

    with open(file_path, 'rb') as compressed:
        dctx = zstd.ZstdDecompressor(max_window_size=2147483648)
        with dctx.stream_reader(compressed) as reader:
            buffer = ''
            while True:
                chunk = reader.read(size).decode('utf-8', errors='ignore')
                buffer += chunk
                lines = buffer.split('\n')
                buffer = lines.pop()

                for line in lines:
                    if line:
                        line_count += 1
                        try:
                            json_obj = json.loads(line)
                            subreddit = json_obj.get("subreddit", None)
                            if subreddit and subreddit in subreddit_metrics:
                                metrics = subreddit_metrics[subreddit]
                                metrics['total_posts'] += 1
                                distinguished = json_obj.get("distinguished", None)
                                if distinguished == "moderator":
                                    metrics['deleted_posts'] += 1
                                    metrics['mod_delete'] += 1
                                elif distinguished == "admin":
                                    metrics['deleted_posts'] += 1
                                    metrics['admin_delete'] += 1
                        except json.JSONDecodeError:
                            continue

                        # Periodic progress update
                        if line_count % update_interval == 0:
                            print(f"Processing file: {file_path}, lines processed: {line_count}, time: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")

                if not chunk:
                    break

    print(f"Finished processing file: {file_path} at {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")

    with lock:
        # Update existing_data with new metrics or leave blank
        for subreddit, metrics in subreddit_metrics.items():
            proportion = metrics['deleted_posts'] / metrics['total_posts'] if metrics['total_posts'] > 0 else 0
            new_data = [metrics['total_posts'], metrics['deleted_posts'], proportion, metrics['mod_delete'], metrics['admin_delete']]
            existing_data[subreddit].extend(new_data)
        headers.update(new_headers)

# Main function to decompress and update
def decompress_and_update(file_paths, input_csv, output_csv):
    existing_data = {}
    headers = set(["subreddit"])
    lock = threading.Lock()

    # Read the existing data from output CSV
    existing_data = {}
    existing_headers = []
    if os.path.exists(output_csv):
        with open(output_csv, 'r', newline='', encoding='utf-8') as csvfile:
            reader = csv.reader(csvfile)
            existing_headers = next(reader)
            for row in reader:
                existing_data[row[0]] = row[1:]

    # Fill missing data with blanks if new subreddits are found
    subreddits = set(existing_data.keys())

    for file_path in file_paths:
        time_mark = os.path.basename(file_path).split('.')[0].split('_')[1]
        header_prefixes = ["total_posts_", "deleted_posts_", "proportion_", "mod_delete_", "admin_delete_"]
        new_headers = [prefix + time_mark for prefix in header_prefixes]

        # Skip if these headers already exist (data already processed)
        if all(header in existing_headers for header in new_headers):
            continue

    # Initialize subreddit_metrics
    subreddit_metrics = {subreddit: {'total_posts': 0, 'deleted_posts': 0, 'mod_delete': 0, 'admin_delete': 0} for subreddit in existing_data.keys()}

    threads = []
    for file_path in file_paths:
        thread = threading.Thread(target=process_file, args=(file_path, existing_data, subreddit_metrics, headers, lock))
        threads.append(thread)
        thread.start()

    # Progress reporting and wait for all threads to complete
    for thread in threads:
        thread.join()
        print(f"Finished processing file: {file_path} at {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")

    # Write the updated data to the output CSV
    with open(output_csv, 'w', newline='', encoding='utf-8') as csvfile:
        writer = csv.writer(csvfile)
        writer.writerow(["subreddit"] + sorted(list(headers)))
        for subreddit, data in existing_data.items():
            writer.writerow([subreddit] + data)

# Example usage
file_paths = generate_file_paths(2016, 4, 2016, 1)
input_csv = r'/Users/ElevenyCHEN/Desktop/Mod_Datasets/SubReddit-time.csv'
output_csv = r'/Users/ElevenyCHEN/Desktop/Mod_Datasets/SubReddit-deletion.csv'
decompress_and_update(file_paths, input_csv, output_csv)