In [1]:
import gzip
import pandas as pd
import json
from tqdm import tqdm
from itertools import islice
import gc 

In [2]:
def readCSV(path):
    f = gzip.open(path, 'rt')
    f.readline()
    for l in f:
        u,b,r = l.strip().split(',')
        r = int(r)
        yield u,b,r

def readJSON(path):
    data = []
    count = 0
    with gzip.open(path, 'rt') as f:
        for line in tqdm(f):
            json_obj = json.loads(line.strip())
            data.append(json_obj)
    df = pd.DataFrame(data)
    return df

def readJSON_slice(path, start=0, end=10000):
    data = []
    with gzip.open(path, 'rt') as f:
        for line in islice(f, start, end):
            try:
                data.append(json.loads(line.strip()))
            except json.JSONDecodeError as e:
                print(f"Error parsing line: {e}")
                continue
    df = pd.DataFrame(data)
    return df

In [3]:
random_seed = 42

In [4]:
categories = ['Children', 'Biography', 'Comics', 'Fantasy Paranormal', 'Mystery Thriller Crime', 'Poetry', 'Romance', 'Young Adult']
for category in categories:
    file_path = f"/content/drive/MyDrive/258/Goodreads {category} Books.json.gz"
    df = readJSON('/content/drive/MyDrive/258/Goodreads Children Books.json.gz')
    df = (df[df['language_code'] == 'eng']
          .assign(text_reviews_count=lambda x: pd.to_numeric(x['text_reviews_count']))
          .query('text_reviews_count >= 5')
          .sample(n=10000, random_state=random_seed)
          )
    columns_to_keep = ['book_id', 'work_id', 'title', 'title_without_series', 'series', 'average_rating', 'publication_year', 'num_pages', 'authors', 'publisher', 'description', 'ratings_count', 'text_reviews_count']
    sampled_df = sampled_df[columns_to_keep]
    sampled_df.to_csv(f"/content/drive/MyDrive/258/sampled/Goodreads {category} subset.csv", index=False)

FileNotFoundError: [Errno 2] No such file or directory: '/content/drive/MyDrive/258/Goodreads Children Books.json.gz'

In [4]:
interaction_df = pd.DataFrame(columns=['user_id', 'book_id', 'is_read', 'rating'])
review_df = pd.DataFrame(columns=['user_id', 'book_id', 'rating', 'review_text', 'n_votes'])

In [15]:
df = pd.read_csv(f"/Users/allenlu/Google Drive/My Drive/258/sampled/Goodreads Poetry subset.csv")

In [27]:
df.iloc[0]

book_id                                                          13508414
work_id                                                          19060537
title                                             The Delaware Detectives
title_without_series                              The Delaware Detectives
series                                                         ['853084']
average_rating                                                       4.28
publication_year                                                   2012.0
num_pages                                                           150.0
authors                            [{'author_id': '4998892', 'role': ''}]
publisher                                       A Word Fitly Spoken Press
description             What do the following have in common: a muntja...
ratings_count                                                          12
text_reviews_count                                                      6
Name: 0, dtype: object

In [16]:
import pandas as pd
import gc
from pathlib import Path
import logging
from typing import Set, List, Dict
from dataclasses import dataclass
import gzip
import json
from concurrent.futures import ThreadPoolExecutor
import numpy as np

@dataclass
class ProcessingConfig:
    base_path: Path
    batch_size: int = 10000
    flush_threshold: int = 500000
    categories: List[str] = None

class GoodreadsProcessor:
    def __init__(self, config: ProcessingConfig):
        self.config = config
        self.setup_logging()
        
    def setup_logging(self):
        logging.basicConfig(
            level=logging.INFO,
            format='%(asctime)s - %(levelname)s - %(message)s'
        )
        self.logger = logging.getLogger(__name__)
    
    def read_json_slice(self, filepath: str, start: int, end: int) -> pd.DataFrame:
        records = []
        try:
            with gzip.open(filepath, 'rt', encoding='utf-8') as f:
                # Skip to start position
                for _ in range(start):
                    next(f, None)
                
                # Read specified chunk
                for _ in range(end - start):
                    line = next(f, None)
                    if line is None:
                        break
                    records.append(json.loads(line))
                    
        except Exception as e:
            self.logger.error(f"Error reading JSON slice: {e}")
            return pd.DataFrame()
            
        return pd.DataFrame.from_records(records) if records else pd.DataFrame()

    def process_slice(self, df_slice: pd.DataFrame, books: Set[int], columns: List[str]) -> List[Dict]:
        if df_slice.empty:
            return []
            
        # Convert book_ids efficiently using numpy
        df_slice['book_id'] = pd.to_numeric(df_slice['book_id'], errors='coerce').astype('Int64')
        
        # Vectorized filtering
        mask = df_slice['book_id'].isin(books)
        if 'is_read' in df_slice.columns:
            mask &= df_slice['is_read']
            
        filtered = df_slice[mask]
        return filtered[columns].to_dict('records')

    def process_category(self, category: str):
        # Initialize paths
        base = self.config.base_path
        interactions_path = base / f"Goodreads {category} Interactions.json.gz"
        reviews_path = base / f"Goodreads {category} Reviews.json.gz"
        # Read and cache book IDs
        books_df = pd.read_csv(base / 'sampled' / f"Goodreads {category} subset.csv")
        books = set(books_df['book_id'].astype(int))
        
        # Process interactions
        self._process_file(
            filepath=interactions_path,
            output_path=base / 'sampled' / f"Goodreads_{category}_Interactions_sampled.csv",
            columns=['user_id', 'book_id', 'is_read', 'rating'],
            books=books,
            file_type="interactions"
        )
        
        # Process reviews
        self._process_file(
            filepath=reviews_path,
            output_path=base / 'sampled' / f"Goodreads_{category}_Reviews_sampled.csv",
            columns=['user_id', 'book_id', 'rating', 'review_text', 'n_votes'],
            books=books,
            file_type="reviews"
        )

    def _process_file(self, filepath: Path, output_path: Path, columns: List[str], 
                     books: Set[int], file_type: str):
        # Initialize output file
        pd.DataFrame(columns=columns).to_csv(output_path, index=False)
        
        records = []
        count = 0
        batch_num = 0
        
        while True:
            start = batch_num * self.config.batch_size
            end = (batch_num + 1) * self.config.batch_size
            
            self.logger.info(f"Processing {file_type} batch {batch_num}, range: {start}-{end}")
            
            df_slice = self.read_json_slice(str(filepath), start, end)
            if df_slice.empty:
                break
                
            new_records = self.process_slice(df_slice, books, columns)
            records.extend(new_records)
            count += len(new_records)
            
            # Flush to disk if threshold reached
            if start % self.config.flush_threshold == 0 and start > 0 and records:
                self._flush_records(records, output_path)
                records = []
                gc.collect()
            
            batch_num += 1
        
        # Final flush
        if records:
            self._flush_records(records, output_path)
        
        self.logger.info(f"Completed processing {file_type}. Total records: {count}")
        
    def _flush_records(self, records: List[Dict], output_path: Path):
        pd.DataFrame(records).to_csv(output_path, mode='a', header=False, index=False)

    def process_all(self):
        for category in self.config.categories:
            self.logger.info(f"Starting processing for category: {category}")
            self.process_category(category)
            self.logger.info(f"Completed processing for category: {category}")


In [17]:
config = ProcessingConfig(
        base_path=Path("/Volumes/Drive/allenlu/Google Drive/My Drive/258/"),
        categories=['Fantasy Paranormal'],
        batch_size=50000,
        flush_threshold=500000
    )
    
processor = GoodreadsProcessor(config)
processor.process_all()

2024-11-29 02:05:36,794 - INFO - Starting processing for category: Fantasy Paranormal
2024-11-29 02:05:36,869 - INFO - Processing interactions batch 0, range: 0-50000
2024-11-29 02:05:37,050 - INFO - Processing interactions batch 1, range: 50000-100000
2024-11-29 02:05:37,242 - INFO - Processing interactions batch 2, range: 100000-150000
2024-11-29 02:05:37,462 - INFO - Processing interactions batch 3, range: 150000-200000
2024-11-29 02:05:37,718 - INFO - Processing interactions batch 4, range: 200000-250000
2024-11-29 02:05:38,002 - INFO - Processing interactions batch 5, range: 250000-300000
2024-11-29 02:05:38,334 - INFO - Processing interactions batch 6, range: 300000-350000
2024-11-29 02:05:38,722 - INFO - Processing interactions batch 7, range: 350000-400000
2024-11-29 02:05:39,117 - INFO - Processing interactions batch 8, range: 400000-450000
2024-11-29 02:05:39,577 - INFO - Processing interactions batch 9, range: 450000-500000
2024-11-29 02:05:40,035 - INFO - Processing interac