In [2]:
import pandas as pd
import numpy as np
import json
import gzip
from tqdm import tqdm  
import pyarrow as pa  
import pyarrow.parquet as pq
import gc
from fuzzywuzzy import fuzz
from fuzzywuzzy import process

# Preproccessing

In [3]:
# HÀM CHUYỂN CSV SANG PARQUET (Theo chunk, không load hết RAM)
def csv_to_parquet(input_csv, output_parquet, chunksize=100000):
    writer = None
    # Đọc theo chunk
    for chunk in tqdm(pd.read_csv(input_csv, chunksize=chunksize, low_memory=False)):
        # Chuyển chunk sang pyarrow Table
        table = pa.Table.from_pandas(chunk)
        # Nếu lần đầu, tạo writer
        if writer is None:
            writer = pq.ParquetWriter(output_parquet, table.schema)
        # Viết chunk vào file
        writer.write_table(table)
    if writer:
        writer.close()
    print(f"Đã chuyển {input_csv} sang {output_parquet}")

In [4]:
# HÀM CHUYỂN JSON SANG PARQUET (Stream + batch, không load hết RAM)
def json_to_parquet(input_json, output_parquet, batch_size=100000):
    """Chuyển JSON (có thể gz) sang Parquet theo batch. Batch_size = 100k dòng/lần."""
    # Mở file: gz hoặc thường
    if input_json.endswith('.gz'):
        opener = gzip.open(input_json, 'rt', encoding='utf-8')
    else:
        opener = open(input_json, 'r', encoding='utf-8')
    
    batch = []
    writer = None
    with opener as f:
        for line in tqdm(f):  # Stream từng dòng
            if line.strip():  # Bỏ dòng rỗng
                batch.append(json.loads(line))
            if len(batch) >= batch_size:  # Đầy batch → xử lý
                df_batch = pd.DataFrame(batch)
                table = pa.Table.from_pandas(df_batch)
                if writer is None:
                    writer = pq.ParquetWriter(output_parquet, table.schema)
                writer.write_table(table)
                batch = []  # Reset batch
                del df_batch  # Giải phóng RAM
    
    # Xử lý batch cuối
    if batch:
        df_batch = pd.DataFrame(batch)
        table = pa.Table.from_pandas(df_batch)
        if writer is None:
            writer = pq.ParquetWriter(output_parquet, table.schema)
        writer.write_table(table)
    
    if writer:
        writer.close()
    print(f"Đã chuyển {input_json} sang {output_parquet}")

In [5]:
csv_to_parquet(path_amazon + 'books_data.csv', path_amazon + 'books_data.parquet')
csv_to_parquet(path_amazon + 'Books_rating.csv', path_amazon + 'Books_rating.parquet')
json_to_parquet(path_gr + 'goodreads_books.json', path_gr + 'goodreads_books.parquet')
csv_to_parquet(path_gr + 'goodreads_interactions.csv', path_gr + 'goodreads_interactions.parquet')
json_to_parquet(path_gr + 'goodreads_reviews_dedup.json', path_gr + 'goodreads_reviews_dedup.parquet')

0it [00:00, ?it/s]

3it [00:02,  1.19it/s]


Đã chuyển AMAZON/books_data.csv sang AMAZON/books_data.parquet


30it [00:30,  1.03s/it]


Đã chuyển AMAZON/Books_rating.csv sang AMAZON/Books_rating.parquet


2360655it [03:24, 11539.30it/s]


Đã chuyển GOODREADS/goodreads_books.json sang GOODREADS/goodreads_books.parquet


2287it [01:31, 25.00it/s]


Đã chuyển GOODREADS/goodreads_interactions.csv sang GOODREADS/goodreads_interactions.parquet


15739967it [02:44, 95596.50it/s] 


Đã chuyển GOODREADS/goodreads_reviews_dedup.json sang GOODREADS/goodreads_reviews_dedup.parquet


In [5]:
def read_parquet_chunk(file_path, chunksize=1000000, columns=None):
    parquet_file = pq.ParquetFile(file_path)
    for batch in parquet_file.iter_batches(batch_size=chunksize, columns=columns):
        yield batch.to_pandas()

In [8]:
path_amazon = 'AMAZON/'
path_gr = 'GOODREADS/'

## AMAZON

**Ref:**
https://www.kaggle.com/datasets/mohamedbakhet/amazon-books-reviews

### books_data.parquet

In [9]:
amz_data_df= pd.read_parquet(path_amazon + 'books_data.parquet',columns=['Title', 'description', 'authors', 'publisher',
       'publishedDate', 'categories', 'ratingsCount'])

In [10]:
print(amz_data_df.columns,'\n')
print(amz_data_df.info(),'\n')
print(amz_data_df.isna().sum(),'\n')
amz_data_df.head()


Index(['Title', 'description', 'authors', 'publisher', 'publishedDate',
       'categories', 'ratingsCount'],
      dtype='object') 

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 212404 entries, 0 to 212403
Data columns (total 7 columns):
 #   Column         Non-Null Count   Dtype  
---  ------         --------------   -----  
 0   Title          212403 non-null  object 
 1   description    143962 non-null  object 
 2   authors        180991 non-null  object 
 3   publisher      136518 non-null  object 
 4   publishedDate  187099 non-null  object 
 5   categories     171205 non-null  object 
 6   ratingsCount   49752 non-null   float64
dtypes: float64(1), object(6)
memory usage: 11.3+ MB
None 

Title                 1
description       68442
authors           31413
publisher         75886
publishedDate     25305
categories        41199
ratingsCount     162652
dtype: int64 



Unnamed: 0,Title,description,authors,publisher,publishedDate,categories,ratingsCount
0,Its Only Art If Its Well Hung!,,['Julie Strain'],,1996,['Comics & Graphic Novels'],
1,Dr. Seuss: American Icon,Philip Nel takes a fascinating look into the k...,['Philip Nel'],A&C Black,2005-01-01,['Biography & Autobiography'],
2,Wonderful Worship in Smaller Churches,This resource includes twelve principles in un...,['David R. Ray'],,2000,['Religion'],
3,Whispers of the Wicked Saints,Julia Thomas finds her life spinning out of co...,['Veronica Haddon'],iUniverse,2005-02,['Fiction'],
4,"Nation Dance: Religion, Identity and Cultural ...",,['Edward Long'],,2003-03-01,,


In [11]:
# CELL 2: TIỀN XỬ LÝ AMAZON METADATA
df_meta = pd.read_parquet(path_amazon + 'books_data.parquet')
df_meta = df_meta[['Title', 'authors', 'publisher', 'publishedDate', 'categories', 'ratingsCount', 'description']]
df_meta['description'] = df_meta['description'].fillna('Unknown')
df_meta['publisher'] = df_meta['publisher'].fillna('Unknown')
df_meta['publishedDate'] = df_meta['publishedDate'].fillna('Unknown')
df_meta['categories'] = df_meta['categories'].fillna('Unknown')
df_meta['ratingsCount'] = df_meta['ratingsCount'].fillna(0)
df_meta = df_meta.drop_duplicates(subset=['Title'])
df_meta['title_clean'] = df_meta['Title'].str.lower().str.replace(r'[^\w\s]', '', regex=True)
df_meta['author_clean'] = df_meta['authors'].str.replace(r"[\[\]']", '', regex=True).str.lower().str.strip()
gc.collect()
print(f"Amazon metadata sau xử lý: {len(df_meta)} sách")

Amazon metadata sau xử lý: 212404 sách


### Books_rating.parquet

In [12]:
amz_rt_df = pd.read_parquet(path_amazon +'Books_rating.parquet',columns=[ 'Title', 'review/helpfulness',
       'review/score', 'review/text'])

In [13]:
print(amz_rt_df.columns,'\n')
print(amz_rt_df.info(), '\n')
print(amz_rt_df.isna().sum(), '\n')
amz_rt_df.head()

Index(['Title', 'review/helpfulness', 'review/score', 'review/text'], dtype='object') 

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 3000000 entries, 0 to 2999999
Data columns (total 4 columns):
 #   Column              Dtype  
---  ------              -----  
 0   Title               object 
 1   review/helpfulness  object 
 2   review/score        float64
 3   review/text         object 
dtypes: float64(1), object(3)
memory usage: 91.6+ MB
None 

Title                 208
review/helpfulness      0
review/score            0
review/text             8
dtype: int64 



Unnamed: 0,Title,review/helpfulness,review/score,review/text
0,Its Only Art If Its Well Hung!,7/7,4.0,This is only for Julie Strain fans. It's a col...
1,Dr. Seuss: American Icon,10/10,5.0,I don't care much for Dr. Seuss but after read...
2,Dr. Seuss: American Icon,10/11,5.0,"If people become the books they read and if ""t..."
3,Dr. Seuss: American Icon,7/7,4.0,"Theodore Seuss Geisel (1904-1991), aka &quot;D..."
4,Dr. Seuss: American Icon,3/3,4.0,Philip Nel - Dr. Seuss: American IconThis is b...


In [14]:
# CELL 3: TIỀN XỬ LÝ AMAZON REVIEWS (tách expert/public theo chunk)
print("Tách expert/public Amazon reviews theo chunk...")
expert_am_chunks = []
public_am_chunks = []

for chunk in read_parquet_chunk(path_amazon + 'Books_rating.parquet', chunksize=500000):
    chunk['helpful_num'] = chunk['review/helpfulness'].str.split('/').str[0].astype(float)
    chunk['helpful_den'] = chunk['review/helpfulness'].str.split('/').str[1].astype(float)
    chunk['helpfulness_ratio'] = chunk['helpful_num'] / chunk['helpful_den'].replace(0, 1)
    chunk['title_clean'] = chunk['Title'].str.lower().str.replace(r'[^\w\s]', '', regex=True)

    expert_chunk = chunk[(chunk['helpful_num'] > 50) & (chunk['helpfulness_ratio'] > 0.7)].copy()
    public_chunk = chunk[~chunk.index.isin(expert_chunk.index)].copy()

    expert_am_chunks.append(expert_chunk)
    public_am_chunks.append(public_chunk)
    gc.collect()

expert_am = pd.concat(expert_am_chunks, ignore_index=True)
public_am = pd.concat(public_am_chunks, ignore_index=True)
gc.collect()
print(f"Amazon expert: {len(expert_am)} | public: {len(public_am)}")

Tách expert/public Amazon reviews theo chunk...
Amazon expert: 28860 | public: 2971140


In [15]:
# CELL 4: LẤY 1 EXPERT + 1 PUBLIC MỖI SÁCH (Amazon)
expert_sample_am = expert_am.loc[expert_am.groupby('Title')['helpful_num'].idxmax()][['Title', 'review/text']].rename(columns={'review/text': 'expert_review_am'})

public_sample_am = public_am.groupby('Title').apply(
    lambda x: x.sample(1, random_state=42)
).reset_index(drop=True)[['Title', 'review/text']].rename(columns={'review/text': 'public_review_am'})

amazon_reviews = expert_sample_am.merge(public_sample_am, on='Title', how='inner')
amazon_reviews['title_clean'] = amazon_reviews['Title'].str.lower().str.replace(r'[^\w\s]', '', regex=True)
gc.collect()
print(f"Amazon sách có đủ review: {len(amazon_reviews)}")

  public_sample_am = public_am.groupby('Title').apply(


Amazon sách có đủ review: 12769


In [None]:
amazon_reviews.reset_index(drop=True, inplace=True)

## GOODREADS

**Ref:**
https://cseweb.ucsd.edu/~jmcauley/datasets/goodreads.html#datasets

**Download:** 
- goodreads_books: https://mcauleylab.ucsd.edu/public_datasets/gdrive/goodreads/goodreads_books.json.gz
- goodreads_interaction: https://mcauleylab.ucsd.edu/public_datasets/gdrive/goodreads/goodreads_interactions.csv
- goodreads_reviews: https://mcauleylab.ucsd.edu/public_datasets/gdrive/goodreads/goodreads_reviews_dedup.json.gz

### goodreads_books.parquet

In [16]:
gr_books_df = pd.read_parquet(path_gr + 'goodreads_books.parquet',columns=['book_id',
'title','authors','publication_year','description','average_rating','ratings_count'])


In [17]:
print(gr_books_df.columns,'\n')
print(gr_books_df.info(), '\n')
print(gr_books_df.isna().sum(), '\n')
gr_books_df.head()

Index(['book_id', 'title', 'authors', 'publication_year', 'description',
       'average_rating', 'ratings_count'],
      dtype='object') 

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 2360655 entries, 0 to 2360654
Data columns (total 7 columns):
 #   Column            Dtype 
---  ------            ----- 
 0   book_id           object
 1   title             object
 2   authors           object
 3   publication_year  object
 4   description       object
 5   average_rating    object
 6   ratings_count     object
dtypes: object(7)
memory usage: 126.1+ MB
None 

book_id             0
title               0
authors             0
publication_year    0
description         0
average_rating      0
ratings_count       0
dtype: int64 



Unnamed: 0,book_id,title,authors,publication_year,description,average_rating,ratings_count
0,5333265,W.C. Fields: A Life on Film,"[{'author_id': '604031', 'role': ''}]",1984.0,,4.0,3
1,1333909,Good Harbor,"[{'author_id': '626222', 'role': ''}]",2001.0,"Anita Diamant's international bestseller ""The ...",3.23,10
2,7327624,"The Unschooled Wizard (Sun Wolf and Starhawk, ...","[{'author_id': '10333', 'role': ''}]",1987.0,Omnibus book club edition containing the Ladie...,4.03,140
3,6066819,Best Friends Forever,"[{'author_id': '9212', 'role': ''}]",2009.0,Addie Downs and Valerie Adler were eight when ...,3.49,51184
4,287140,Runic Astrology: Starcraft and Timekeeping in ...,"[{'author_id': '149918', 'role': ''}]",,,3.4,15


In [None]:
# CELL 5: TIỀN XỬ LÝ GOODREADS BOOKS 
import re

df_books = pd.read_parquet(path_gr + 'goodreads_books.parquet')
df_books = df_books[['book_id', 'title', 'authors', 'publication_year', 'description', 'average_rating', 'ratings_count']]

# Fill null
df_books['description'] = df_books['description'].fillna('Unknown')
df_books['publication_year'] = df_books['publication_year'].fillna('Unknown')
df_books['average_rating'] = df_books['average_rating'].fillna(0)
df_books['ratings_count'] = df_books['ratings_count'].fillna(0)

# Loại duplicate
df_books = df_books.drop_duplicates(subset=['book_id'])



# TRÍCH TÁC GIẢ TỪ CHUỖI
df_books['author'] = df_books['authors'].astype(str).str.extract(r"'author_id':\s*'([^']+)'")
df_books['author'] = df_books['author'].fillna('Unknown')

# Clean title
df_books['title_clean'] = df_books['title'].str.lower().str.replace(r'[^\w\s]', '', regex=True)

gc.collect()
print(f"Goodreads books sau xử lý: {len(df_books)} sách")

Goodreads books sau xử lý: 2360655 sách


In [31]:
# CHUYỂN book_id → int64 (SỬA LỖI MERGE)
df_books['book_id'] = pd.to_numeric(df_books['book_id'], errors='coerce').fillna(0).astype('object')

### goodreads_interactions.parquet

In [21]:
gr_interactions_df = pd.read_parquet(path_gr + 'goodreads_interactions.parquet',columns=['book_id','rating','is_read'])

In [22]:
print(gr_interactions_df.columns,'\n')
print(gr_interactions_df.info(), '\n')
print(gr_interactions_df.isna().sum(), '\n')
gr_interactions_df.head()

Index(['book_id', 'rating', 'is_read'], dtype='object') 

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 228648342 entries, 0 to 228648341
Data columns (total 3 columns):
 #   Column   Dtype
---  ------   -----
 0   book_id  int64
 1   rating   int64
 2   is_read  int64
dtypes: int64(3)
memory usage: 5.1 GB
None 

book_id    0
rating     0
is_read    0
dtype: int64 



Unnamed: 0,book_id,rating,is_read
0,948,5,1
1,947,5,1
2,946,5,1
3,945,5,1
4,944,5,1


In [23]:
# CELL 6: TIỀN XỬ LÝ GOODREADS INTERACTIONS (aggregate theo chunk)
print("Aggregate interactions theo chunk...")
public_agg_chunks = []
for chunk in read_parquet_chunk(path_gr + 'goodreads_interactions.parquet', chunksize=1000000):
    chunk_agg = chunk.groupby('book_id').agg(
        total_ratings=('rating', 'count'),
        total_reads=('is_read', 'sum'),
        public_sentiment=('rating', 'mean')
    ).reset_index()
    public_agg_chunks.append(chunk_agg)
    gc.collect()

public_agg = pd.concat(public_agg_chunks, ignore_index=True)
public_agg = public_agg.groupby('book_id').agg({
    'total_ratings': 'sum',
    'total_reads': 'sum',
    'public_sentiment': 'mean'
}).reset_index()
public_agg['public_influence'] = np.log1p(public_agg['total_ratings'] + public_agg['total_reads'])
gc.collect()
print(f"Goodreads aggregate: {len(public_agg)} sách")

Aggregate interactions theo chunk...
Goodreads aggregate: 2360650 sách


In [32]:
public_agg['book_id'] = pd.to_numeric(df_books['book_id'], errors='coerce').fillna(0).astype('object')

### goodreads_review_dedup.parquet

In [24]:
gr_review_df = pd.read_parquet(path_gr + 'goodreads_reviews_dedup.parquet',columns=['book_id','rating','review_text','n_votes'])


In [25]:
print(gr_review_df.columns,'\n')
print(gr_review_df.info(),'\n')
print(gr_review_df.isna().sum(),'\n')
gr_review_df.head()

Index(['book_id', 'rating', 'review_text', 'n_votes'], dtype='object') 

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 15739967 entries, 0 to 15739966
Data columns (total 4 columns):
 #   Column       Dtype 
---  ------       ----- 
 0   book_id      object
 1   rating       int64 
 2   review_text  object
 3   n_votes      int64 
dtypes: int64(2), object(2)
memory usage: 480.3+ MB
None 

book_id        0
rating         0
review_text    0
n_votes        0
dtype: int64 



Unnamed: 0,book_id,rating,review_text,n_votes
0,24375664,5,Mind blowingly cool. Best science fiction I've...,16
1,18245960,5,This is a special book. It started slow for ab...,28
2,6392944,3,I haven't read a fun mystery book in a while a...,6
3,22078596,4,"Fun, fast paced, and disturbing tale of murder...",22
4,6644782,4,A fun book that gives you a sense of living in...,8


In [26]:
# CELL 7: TIỀN XỬ LÝ GOODREADS REVIEWS (tách expert/public theo chunk)
print("Tách expert/public Goodreads reviews theo chunk...")
expert_gr_chunks = []
public_gr_chunks = []

for chunk in read_parquet_chunk(path_gr + 'goodreads_reviews_dedup.parquet', chunksize=500000):
    chunk['n_votes'] = pd.to_numeric(chunk['n_votes'], errors='coerce').fillna(0)
    
    expert_chunk = chunk[chunk['n_votes'] > 50].copy()
    public_chunk = chunk[chunk['n_votes'] <= 50].copy()
    
    expert_gr_chunks.append(expert_chunk)
    public_gr_chunks.append(public_chunk)
    gc.collect()

df_expert_gr = pd.concat(expert_gr_chunks, ignore_index=True)
df_public_gr = pd.concat(public_gr_chunks, ignore_index=True)
gc.collect()
print(f"Goodreads expert: {len(df_expert_gr)} | public: {len(df_public_gr)}")

Tách expert/public Goodreads reviews theo chunk...
Goodreads expert: 38047 | public: 15701920


In [27]:
# CELL 8: LẤY 1 EXPERT + 1 PUBLIC MỖI SÁCH (Goodreads)
expert_sample_gr = df_expert_gr.loc[df_expert_gr.groupby('book_id')['n_votes'].idxmax()][['book_id', 'review_text']].rename(columns={'review_text': 'expert_review_gr'})

public_sample_gr = df_public_gr.groupby('book_id').apply(
    lambda x: x.sample(1, random_state=42)
).reset_index(drop=True)[['book_id', 'review_text']].rename(columns={'review_text': 'public_review_gr'})
gc.collect()
print(f"Goodreads expert sample: {len(expert_sample_gr)} | public sample: {len(public_sample_gr)}")

  public_sample_gr = df_public_gr.groupby('book_id').apply(


Goodreads expert sample: 22341 | public sample: 2079034


## MERGE

In [None]:
# CELL 9: MERGE GOODREADS (books + interactions + reviews sample)
df_gr_merged = df_books.merge(public_agg, on='book_id', how='left') \
                       .merge(expert_sample_gr, on='book_id', how='left') \
                       .merge(public_sample_gr, on='book_id', how='left')
gc.collect()
print(f"Goodreads merged: {len(df_gr_merged)} sách")

Goodreads merged: 2360655 sách


In [35]:
df_gr_merged.reset_index(drop=True, inplace=True)

In [41]:
# CELL 10: MERGE SIÊU NHANH – CHỈ DÙNG EXACT MATCH (10 GIÂY)
print("Bắt đầu merge exact match (title_clean)...")

# 1. Merge Goodreads + Amazon bằng title_clean (exact)
df_final = df_gr_merged.merge(
    amazon_reviews,
    on='title_clean',
    how='inner',
    suffixes=('_gr', '_am')
)

# 2. Gộp expert & public review
df_final['expert_review'] = df_final['expert_review_am'].fillna(df_final.get('expert_review_gr', ''))
df_final['public_review'] = df_final['public_review_am'].fillna(df_final.get('public_review_gr', ''))

# 3. Lọc sách có đủ 2 review
df_final = df_final.dropna(subset=['expert_review', 'public_review'])

# 4. Giữ cột cần thiết
final_cols = [
    'book_id', 'title', 'author', 'publication_year',
    'average_rating', 'ratings_count', 'public_sentiment', 'public_influence',
    'expert_review', 'public_review'
]
df_final = df_final[final_cols]

# 5. Lấy mẫu ngẫu nhiên 10.000 sách (nếu quá nhiều)
if len(df_final) > 10000:
    df_final = df_final.sample(n=10000, random_state=42).reset_index(drop=True)

gc.collect()
print(f"HOÀN TẤT! Có {len(df_final)} sách được merge.")
print(df_final.head())

Bắt đầu merge exact match (title_clean)...
HOÀN TẤT! Có 10000 sách được merge.
    book_id                                 title   author publication_year  \
0  18930889  Spook: Science Tackles the Afterlife     7956                    
1   1573214                     Wuthering Heights     4191             1983   
2  29794446                The Martian Chronicles     1630                    
3  17789021                       Charlotte's Web   988142             2013   
4  10969818                                Alaska  4759585                    

  average_rating ratings_count  public_sentiment  public_influence  \
0           3.57           350          2.888889          4.454347   
1           3.83            30          3.300000          2.944439   
2           4.12             1          1.000000          1.609438   
3           4.15            16          1.000000          1.609438   
4           3.00             1          0.000000          1.098612   

                         

In [42]:
# CELL 11: GỘP EXPERT & PUBLIC + LÀM SẠCH + XUẤT TẤT CẢ
df_final['expert_review'] = df_final['expert_review'].fillna(df_final.get('expert_review_gr', ''))
df_final['public_review'] = df_final['public_review'].fillna(df_final.get('public_review_gr', ''))
df_final = df_final.dropna(subset=['expert_review', 'public_review'])

In [43]:
# Giữ feature cuối
df_final = df_final[[
    'book_id', 'title', 'author', 'publication_year', 'average_rating', 'ratings_count', 'public_sentiment', 'public_influence',
    'expert_review', 'public_review'
]].copy()

df_final.to_csv('final_ALL_expert_public.csv', index=False)
gc.collect()

print(f"HOÀN TẤT! Xuất {len(df_final)} sách vào final_ALL_expert_public.csv")
print(df_final.head())

HOÀN TẤT! Xuất 10000 sách vào final_ALL_expert_public.csv
    book_id                                 title   author publication_year  \
0  18930889  Spook: Science Tackles the Afterlife     7956                    
1   1573214                     Wuthering Heights     4191             1983   
2  29794446                The Martian Chronicles     1630                    
3  17789021                       Charlotte's Web   988142             2013   
4  10969818                                Alaska  4759585                    

  average_rating ratings_count  public_sentiment  public_influence  \
0           3.57           350          2.888889          4.454347   
1           3.83            30          3.300000          2.944439   
2           4.12             1          1.000000          1.609438   
3           4.15            16          1.000000          1.609438   
4           3.00             1          0.000000          1.098612   

                                       expert_