# Pipeline for fetch 4 stars reviews



Functions

In [5]:
from huggingface_hub import snapshot_download, upload_file, HfApi
import pathlib
import time
import json
import logging
import pandas as pd

def download_reviews_data(category_idx:str):
  logger = logging.getLogger(f'fetch reviews data for {category_idx} ')
  logger.setLevel(logging.DEBUG)
  try:
    path_to_file = pathlib.Path(f"/content/dataset/raw/review_categories/{category_idx}.jsonl")
    if not path_to_file.exists():
      snapshot_download(repo_id="McAuley-Lab/Amazon-Reviews-2023",
                  local_dir="/content/dataset",
                  repo_type="dataset",
                  allow_patterns=[f"raw/review_categories/{category_idx}.jsonl*"])
    logger.info(f"Successfully downloaded reviews data for category {category_idx}")
  except Exception as e:
    logger.error(f"Failed to download reviews data for category {category_idx} - exception: {e}")

def download_items_data(category_idx:str):
  logger = logging.getLogger(f'fetch items data for {category_idx} ')
  logger.setLevel(logging.DEBUG)
  try:
    path_to_file = pathlib.Path(f"/content/dataset/raw/meta_categories/meta_{category_idx}.jsonl")
    if not path_to_file.exists():
      snapshot_download(repo_id="McAuley-Lab/Amazon-Reviews-2023",
                    local_dir="/content/dataset",
                    repo_type="dataset",
                    allow_patterns=[f"raw/meta_categories/meta_{category_idx}.jsonl*"])
      logger.info(f"Successfully downloaded items data for category {category_idx}")
  except:
    print(f"Failed to download items data for category {category_idx}")

def fetch_reviews_data(target_idxs,filepath_review_category):
  filtered_reviews = []
  logger = logging.getLogger(f'Filter 4 star reviews for {filepath_review_category} ')
  logger.setLevel(logging.DEBUG)
  count = 0
  reviews_per_item_count = {}
  with open(filepath_review_category, 'r') as file:
    for line in file:
        item = json.loads(line)
        asin = item.get("parent_asin")
        rating = item.get("rating")
        text = item.get("text", "")

        ## Only search if is in target_idxs
        if asin in target_idxs:
          # Apply filters
          if 4.0 <= rating <= 4.9 and len(text) > 100:
              item_dict = {}
              item_dict["ori_review"] = text
              item_dict["asin"] = asin
              item_dict["rating"] = rating
              if asin not in reviews_per_item_count:
                reviews_per_item_count[asin] = 1
                filtered_reviews.append(item_dict)
                count += 1
              else:
                reviews_per_item_count[asin] += 1
                if reviews_per_item_count[asin] < 4:
                  filtered_reviews.append(item_dict)
                  count += 1
                  if count % 1000 == 0:
                    logger.debug(f"Processed {count} reviews")

    logger.info(f"Successfully fetched {len(filtered_reviews)} reviews data for category {filepath_review_category}")
    return filtered_reviews

def fetch_items_data(target_idxs,filepath_meta_category):
  filtered_items_meta =  []
  logger = logging.getLogger(f'fetch reviews data for {filepath_meta_category} ')
  logger.setLevel(logging.DEBUG)
  with open(filepath_meta_category, 'rb') as file:
    for line in file:
      line = json.loads(line)
      curr_asin = line.get("parent_asin")
      if curr_asin in target_idxs:
        meta_dict = {}
        meta_dict["main_category"] = line.get("main_category")
        meta_dict["title"] = line.get("title")
        meta_dict["parent_asin"] = line.get("parent_asin")
        meta_dict["description"] = line.get("description")
        meta_dict["categories"] = line.get("categories")
        meta_dict["features"] = line.get("features")
        filtered_items_meta.append(meta_dict)
  logger.info(f"Successfully fetched {len(filtered_items_meta)} items data for category {filepath_meta_category}")
  return filtered_items_meta


def build_final_dataframe(df_items,df_reviews,category_idx=None,save_parquet=False):
  logger = logging.getLogger(f'build final dataframe for {category_idx} ')
  logger.setLevel(logging.DEBUG)
  df_final = df_reviews.join(df_items.set_index("parent_asin")[["title","features","description"]],on=["asin"],how="left")
  if save_parquet and category_idx:
    path = pathlib.Path(f"/content/dataset/raw/review_and_meta_categories/filtered_4_star_reviews_{category_idx}.parquet")
    if not path.parent.exists():
      path.parent.mkdir(parents=True, exist_ok=True)
    df_final.to_parquet(f"/content/dataset/raw/review_and_meta_categories/filtered_4_star_reviews_{category_idx}.parquet")
    logger.info(f"Save file in /content/dataset/raw/review_and_meta_categories/filtered_4_star_reviews_{category_idx}.parquet")
  return df_final

def clear_reviews_data(category_idx:str):
  logger = logging.getLogger(f'clear reviews data for {category_idx} ')
  logger.setLevel(logging.DEBUG)
  try:
    path = pathlib.Path(f"/content/dataset/raw/review_categories/{category_idx}.jsonl")
    path.unlink(missing_ok=True)
    logger.info(f'Removed /content/dataset/raw/review_categories/{category_idx}.jsonl')
  except:
    logger.info(f'Could not remove /content/dataset/raw/review_categories/{category_idx}.jsonl')


def clear_items_data(category_idx:str):
  logger = logging.getLogger(f'clear items data for {category_idx} ')
  logger.setLevel(logging.DEBUG)
  try:
    path = pathlib.Path(f"/content/dataset/raw/meta_categories/meta_{category_idx}.jsonl")
    path.unlink(missing_ok=True)
    logger.info(f'Removed /content/dataset/raw/meta_categories/meta_{category_idx}.jsonl')
  except:
    logger.info(f'Could not remove /content/datset/raw/meta_categories/meta_{category_idx}.jsonl')

def clear_filtered_reviews(category_idx:str):
  logger = logging.getLogger(f'clear filtered reviews data for {category_idx} ')
  logger.setLevel(logging.DEBUG)
  try:
    logger.info(f"Removed filtered 4 starts review data for category {category_idx}")
    path = pathlib.Path(f"/content/dataset/raw/review_categories/filtered_4_star_reviews_{category_idx}.parquet")
    path.unlink(missing_ok=True)
    logger.info(f'Removed /content/dataset/raw/review_categories/filtered_4_star_reviews_{category_idx}.jsonl')
  except:
    logger.info(f'Could not remove /content/dataset/raw/review_categories/filtered_4_star_reviews{category_idx}.jsonl')

def clear_filtered_items(category_idx:str):
  logger = logging.getLogger(f'clear filtered items data for {category_idx} ')
  logger.setLevel(logging.DEBUG)
  try:
    logger.info(f"Removed filtered 4 starts items data for category {category_idx}")
    path = pathlib.Path(f"/content/dataset/raw/meta_categories/filtered_items_{category_idx}.parquet")
    path.unlink(missing_ok=True)
    logger.info(f'Removed /content/dataset/raw/meta_categories/filtered_items_{category_idx}.jsonl')
  except:
    logger.info(f'Could not remove /content/dataset/raw/meta_categories/filtered_items_{category_idx}.jsonl')

def clear_filtered_meta_and_reviews_data(category_idx:str):
  logger = logging.getLogger(f'clear filtered reviews data for {category_idx} ')
  logger.setLevel(logging.DEBUG)
  try:
    logger.info(f"Removed filtered 4 starts review and meta data for category {category_idx}")
    path = pathlib.Path(f"/content/dataset/raw/review_and_meta_categories/filtered_4_star_reviews_{category_idx}.parquet")
    path.unlink(missing_ok=True)
    logger.info(f'Removed /content/dataset/raw/review_and_meta_categories/filtered_4_star_reviews_{category_idx}.jsonl')
  except:
    logger.info(f'Could not remove /content/dataset/raw/review_and_meta_categories/filtered_4_star_reviews_{category_idx}.jsonl')

def pipeline_for_fetch_reviews(category_idx,target_idxs):
  try:
    logger = logging.getLogger(f'Pipeline for fetch reviews data from category {category_idx} ')
    logger.setLevel(logging.DEBUG)
    download_reviews_data(category_idx)
    filepath_review_category = f'dataset/raw/review_categories/{category_idx}.jsonl'
    filtered_reviews = fetch_reviews_data(target_idxs,filepath_review_category)
    df_final = pd.DataFrame.from_records(filtered_reviews)
    df_final.to_parquet(f'/content/dataset/raw/review_categories/filtered_4_star_reviews_{category_idx}.parquet')
    logger.info(f"Successfully saved filtered 4 star reviews in raw/review_categories/filtered_4_star_reviews_{category_idx}.parquet")
    clear_reviews_data(category_idx)
  except Exception as e:
    logger.error(f'Failed to pipeline for fetch reviews data from category {category_idx} == Exception: {e}')

def pipeline_for_fetch_items(category_idx,target_idxs):
  try:
    logger = logging.getLogger(f'Pipeline for fetch items data ')
    logger.setLevel(logging.DEBUG)
    download_items_data(category_idx)
    filepath_meta_category = f'dataset/raw/meta_categories/meta_{category_idx}.jsonl'
    filtered_items_meta = fetch_items_data(target_idxs,filepath_meta_category)
    df_final = pd.DataFrame.from_records(filtered_items_meta)
    df_final.to_parquet(f'/content/dataset/raw/meta_categories/filtered_items_{category_idx}.parquet')
    logger.info(f"Successfully saved filtered items in dataset/raw/meta_categories/filtered_items_{category_idx}.parquet")
    clear_items_data(category_idx)
  except Exception as e:
    logger.error(f'Failed to pipeline for fetch items data == Exception: {e}')

In [6]:
target_idxs = set()
category_idxs = set()
with open('/content/dataset/asin2categoryfiltered.json') as f:
  raw_data = json.load(f)
  for k,v in raw_data.items():
    target_idxs.add(k)
    category_idxs.add(v)

import random
rand_idx = random.randint(0,len(category_idxs)-1)
category_idx = list(category_idxs)[rand_idx]

print(f'select category_idx: {category_idx}')



select category_idx: Sports and Outdoors


Setup login in HF

In [7]:
from huggingface_hub import login
from google.colab import userdata
import os
HF_TOKEN = userdata.get("HF_TOKEN")
login(HF_TOKEN)
hf_api = HfApi(token=HF_TOKEN)

Running pipeline

In [8]:
list_category_idxs = list(category_idxs)
print(f"Running pipeline for {len(list_category_idxs)} categories")

for i in range(len(list_category_idxs)):
  logger = logging.getLogger(f'Pipeline for fetch reviews data for batch number {i+1} ')
  logger.setLevel(logging.DEBUG)
  category_idx = list_category_idxs[i] if list_category_idxs[i].find(" ") == -1 else list_category_idxs[i].replace(" ","_")
  print(f'select category for batch: {category_idx}')
  try:
    repo_id = "Talissa/AmazonC4Augmented"
    filename = f"raw/review_and_meta_categories/filtered_4_star_reviews_{category_idx}.parquet"
    if not hf_api.file_exists(repo_id=repo_id,filename=filename,repo_type="dataset"):
      raise FileNotFoundError
  except FileNotFoundError:
    logger.info("Start pipeline")
    pipeline_for_fetch_reviews(category_idx,target_idxs)
    pipeline_for_fetch_items(category_idx,target_idxs)
    df_items = pd.read_parquet(f'/content/dataset/raw/meta_categories/filtered_items_{category_idx}.parquet')
    df_reviews = pd.read_parquet(f'/content/dataset/raw/review_categories/filtered_4_star_reviews_{category_idx}.parquet')
    df_final = build_final_dataframe(df_items,df_reviews,save_parquet=True,category_idx=category_idx)
    logger.info(f"Build final dataframe for {category_idx} with review and item metadata")
    upload_file(
        path_or_fileobj=f"/content/dataset/raw/review_and_meta_categories/filtered_4_star_reviews_{category_idx}.parquet",
        path_in_repo=f"/raw/review_and_meta_categories/filtered_4_star_reviews_{category_idx}.parquet",
        repo_id="Talissa/AmazonC4Augmented",
        repo_type="dataset",
        commit_message="Add files in repo"
    )
    logger.info(f"Added parquet file /raw/review_and_meta_categories/filtered_4_star_reviews_{category_idx}.parquet")
    clear_filtered_reviews(category_idx)
    clear_filtered_items(category_idx)
    clear_filtered_meta_and_reviews_data(category_idx)
    logger.info("End pipeline")

Running pipeline for 31 categories
select category for batch: Office_Products


INFO:Pipeline for fetch reviews data for batch number 1 :Start pipeline


Fetching 1 files:   0%|          | 0/1 [00:00<?, ?it/s]

raw/review_categories/Office_Products.js(…):   0%|          | 0.00/5.78G [00:00<?, ?B/s]

INFO:fetch reviews data for Office_Products :Successfully downloaded reviews data for category Office_Products
DEBUG:Filter 4 star reviews for dataset/raw/review_categories/Office_Products.jsonl :Processed 1000 reviews
INFO:Filter 4 star reviews for dataset/raw/review_categories/Office_Products.jsonl :Successfully fetched 1183 reviews data for category dataset/raw/review_categories/Office_Products.jsonl
INFO:Pipeline for fetch reviews data from category Office_Products :Successfully saved filtered 4 star reviews in raw/review_categories/filtered_4_star_reviews_Office_Products.parquet
INFO:clear reviews data for Office_Products :Removed /content/dataset/raw/review_categories/Office_Products.jsonl


Fetching 1 files:   0%|          | 0/1 [00:00<?, ?it/s]

raw/meta_categories/meta_Office_Products(…):   0%|          | 0.00/2.15G [00:00<?, ?B/s]

INFO:fetch items data for Office_Products :Successfully downloaded items data for category Office_Products
INFO:fetch reviews data for dataset/raw/meta_categories/meta_Office_Products.jsonl :Successfully fetched 567 items data for category dataset/raw/meta_categories/meta_Office_Products.jsonl
INFO:Pipeline for fetch items data :Successfully saved filtered items in dataset/raw/meta_categories/filtered_items_Office_Products.parquet
INFO:clear items data for Office_Products :Removed /content/dataset/raw/meta_categories/meta_Office_Products.jsonl
INFO:build final dataframe for Office_Products :Save file in /content/dataset/raw/review_and_meta_categories/filtered_4_star_reviews_Office_Products.parquet
INFO:Pipeline for fetch reviews data for batch number 1 :Build final dataframe for Office_Products with review and item metadata


Processing Files (0 / 0)      : |          |  0.00B /  0.00B            

New Data Upload               : |          |  0.00B /  0.00B            

  ...s_Office_Products.parquet: 100%|##########|  746kB /  746kB            

INFO:Pipeline for fetch reviews data for batch number 1 :Added parquet file /raw/review_and_meta_categories/filtered_4_star_reviews_Office_Products.parquet
INFO:clear filtered reviews data for Office_Products :Removed filtered 4 starts review data for category Office_Products
INFO:clear filtered reviews data for Office_Products :Removed /content/dataset/raw/review_categories/filtered_4_star_reviews_Office_Products.jsonl
INFO:clear filtered items data for Office_Products :Removed filtered 4 starts items data for category Office_Products
INFO:clear filtered items data for Office_Products :Removed /content/dataset/raw/meta_categories/filtered_items_Office_Products.jsonl
INFO:clear filtered reviews data for Office_Products :Removed filtered 4 starts review and meta data for category Office_Products
INFO:clear filtered reviews data for Office_Products :Removed /content/dataset/raw/review_and_meta_categories/filtered_4_star_reviews_Office_Products.jsonl
INFO:Pipeline for fetch reviews data f

select category for batch: Gift_Cards


INFO:Pipeline for fetch reviews data for batch number 2 :Start pipeline


Fetching 1 files:   0%|          | 0/1 [00:00<?, ?it/s]

raw/review_categories/Gift_Cards.jsonl:   0%|          | 0.00/50.2M [00:00<?, ?B/s]

INFO:fetch reviews data for Gift_Cards :Successfully downloaded reviews data for category Gift_Cards
INFO:Filter 4 star reviews for dataset/raw/review_categories/Gift_Cards.jsonl :Successfully fetched 15 reviews data for category dataset/raw/review_categories/Gift_Cards.jsonl
INFO:Pipeline for fetch reviews data from category Gift_Cards :Successfully saved filtered 4 star reviews in raw/review_categories/filtered_4_star_reviews_Gift_Cards.parquet
INFO:clear reviews data for Gift_Cards :Removed /content/dataset/raw/review_categories/Gift_Cards.jsonl


Fetching 1 files:   0%|          | 0/1 [00:00<?, ?it/s]

raw/meta_categories/meta_Gift_Cards.json(…):   0%|          | 0.00/2.04M [00:00<?, ?B/s]

INFO:fetch items data for Gift_Cards :Successfully downloaded items data for category Gift_Cards
INFO:fetch reviews data for dataset/raw/meta_categories/meta_Gift_Cards.jsonl :Successfully fetched 6 items data for category dataset/raw/meta_categories/meta_Gift_Cards.jsonl
INFO:Pipeline for fetch items data :Successfully saved filtered items in dataset/raw/meta_categories/filtered_items_Gift_Cards.parquet
INFO:clear items data for Gift_Cards :Removed /content/dataset/raw/meta_categories/meta_Gift_Cards.jsonl
INFO:build final dataframe for Gift_Cards :Save file in /content/dataset/raw/review_and_meta_categories/filtered_4_star_reviews_Gift_Cards.parquet
INFO:Pipeline for fetch reviews data for batch number 2 :Build final dataframe for Gift_Cards with review and item metadata


Processing Files (0 / 0)      : |          |  0.00B /  0.00B            

New Data Upload               : |          |  0.00B /  0.00B            

  ...eviews_Gift_Cards.parquet: 100%|##########| 11.3kB / 11.3kB            

INFO:Pipeline for fetch reviews data for batch number 2 :Added parquet file /raw/review_and_meta_categories/filtered_4_star_reviews_Gift_Cards.parquet
INFO:clear filtered reviews data for Gift_Cards :Removed filtered 4 starts review data for category Gift_Cards
INFO:clear filtered reviews data for Gift_Cards :Removed /content/dataset/raw/review_categories/filtered_4_star_reviews_Gift_Cards.jsonl
INFO:clear filtered items data for Gift_Cards :Removed filtered 4 starts items data for category Gift_Cards
INFO:clear filtered items data for Gift_Cards :Removed /content/dataset/raw/meta_categories/filtered_items_Gift_Cards.jsonl
INFO:clear filtered reviews data for Gift_Cards :Removed filtered 4 starts review and meta data for category Gift_Cards
INFO:clear filtered reviews data for Gift_Cards :Removed /content/dataset/raw/review_and_meta_categories/filtered_4_star_reviews_Gift_Cards.jsonl
INFO:Pipeline for fetch reviews data for batch number 2 :End pipeline
