In [None]:
import numpy as np
import pandas as pd
import seaborn as sns
import matplotlib.pyplot as plt
%matplotlib inline
import warnings
warnings.filterwarnings('ignore')
from nltk.corpus import stopwords
from sklearn.feature_extraction.text import TfidfTransformer, CountVectorizer
from sklearn.metrics import classification_report, confusion_matrix
from sklearn.model_selection import train_test_split
import string, nltk
from nltk import word_tokenize
from nltk.stem import PorterStemmer
from nltk.stem import WordNetLemmatizer
nltk.download('wordnet')
nltk.download('omw-1.4')

[nltk_data] Downloading package wordnet to /root/nltk_data...
[nltk_data]   Package wordnet is already up-to-date!
[nltk_data] Downloading package omw-1.4 to /root/nltk_data...


True

In [1]:
import pandas as pd
df = pd.read_json('/Path/review-Alabama_10.json.gz', lines=True, compression='gzip')

In [2]:
alabama_df=df
alabama_df.head()

Unnamed: 0,user_id,name,time,rating,text,pics,resp,gmap_id
0,1.140438e+20,Kanisha Mixon,1597168272670,5,Very Personable staff! Beautiful and clean env...,,,0x8862134e67ff5c87:0x38b5e2ae99cd1fcf
1,1.16009e+20,Brandie Hodges,1609899039594,5,Best clothing intown,,,0x8862134e67ff5c87:0x38b5e2ae99cd1fcf
2,1.062399e+20,Sharon King,1547235290843,4,,,,0x8862134e67ff5c87:0x38b5e2ae99cd1fcf
3,1.049701e+20,Veronica Pierce,1517709403534,5,,,,0x8862134e67ff5c87:0x38b5e2ae99cd1fcf
4,1.105875e+20,Whitney Waldon Collier,1535245718492,5,,,,0x8862134e67ff5c87:0x38b5e2ae99cd1fcf


In [3]:
# alabama_df.isnull().sum() 2445616
total_rows = alabama_df.shape[0]
percent = alabama_df.isnull().sum()["text"] / total_rows
print(f"Total number of rows: {total_rows}")
print(f"Percentage of reviews whose text is null: {percent}")

Total number of rows: 5146330
Percentage of reviews whose text is null: 0.4752155419493114


In [4]:
alabama_df = alabama_df.dropna(subset=['text'])
alabama_df.head()

Unnamed: 0,user_id,name,time,rating,text,pics,resp,gmap_id
0,1.140438e+20,Kanisha Mixon,1597168272670,5,Very Personable staff! Beautiful and clean env...,,,0x8862134e67ff5c87:0x38b5e2ae99cd1fcf
1,1.16009e+20,Brandie Hodges,1609899039594,5,Best clothing intown,,,0x8862134e67ff5c87:0x38b5e2ae99cd1fcf
5,1.120426e+20,Emily Miles,1611850938780,1,"Not friendly at all, as I ask questions about ...",,,0x886268e8fdc4fd2f:0x746533eb9aa4d4df
6,1.08919e+20,Faye Ahzburjn,1516515504358,4,They have beautiful baby and children's clothi...,,,0x886268e8fdc4fd2f:0x746533eb9aa4d4df
7,1.018531e+20,Amber Winn,1562178900806,3,"Cute shop, but the lack of boy clothes is sad....",,,0x886268e8fdc4fd2f:0x746533eb9aa4d4df


In [None]:
# Data labeling using gemini AI

from abc import ABC, abstractmethod
import pandas as pd
from dataclasses import dataclass
import logging
from typing import List, Dict, Any, Optional, Union
import os
import time
import json
import random

# Configure logging
logging.basicConfig(level=logging.INFO,
                    format='%(asctime)s - %(levelname)s - %(message)s',
                    handlers=[
                        logging.StreamHandler(),  # Print to console
                        logging.FileHandler('labeling.log')  # Also save to file
                    ]
                    )
logger = logging.getLogger(__name__)
def setup_verbose_logging():
    """Call this to see all logs in Jupyter/console"""
    import sys

    # Create console handler with higher level
    console_handler = logging.StreamHandler(sys.stdout)
    console_handler.setLevel(logging.INFO)

    # Create formatter
    formatter = logging.Formatter('%(asctime)s - %(levelname)s - %(message)s')
    console_handler.setFormatter(formatter)

    # Add handler to logger
    logger.addHandler(console_handler)
    logger.setLevel(logging.INFO)

    # Also set root logger
    logging.getLogger().setLevel(logging.INFO)

    print("Verbose logging enabled!")
@dataclass
class LabelingConfig:
    """Configuration for the labeling task"""
    task_name: str
    labels: List[str]
    prompt_template: str
    batch_size: int = 1
    # max_retries: int = 3
    delay_between_requests: float = 1.0
    temperature: float = 0.1
    max_tokens: int = 100
    max_retries: int = 5  # Maximum number of retry attempts
    max_backoff_time: int = 60

class LLMProvider(ABC):
    """Abstract base class for LLM providers"""

    @abstractmethod
    def generate_response(self, prompt: str, **kwargs) -> str:
        pass

class GeminiProvider(LLMProvider):
    """Google Gemini API provider (Free tier available)"""

    def __init__(self, api_key: str, model: str = "gemini-2.5-flash-lite"):
        self.api_key = api_key
        self.model = model
        try:
            import google.generativeai as genai
            genai.configure(api_key=api_key)
            self.client = genai.GenerativeModel(model)
        except ImportError:
            raise ImportError("Please install google-generativeai: pip install google-generativeai")

    def generate_response(self, prompt: str, **kwargs) -> str:
        try:
            import google.generativeai as genai

            # Configure generation settings
            generation_config = genai.types.GenerationConfig(
                temperature=kwargs.get('temperature', 0.1),
                max_output_tokens=kwargs.get('max_tokens', 100),
                top_p=0.95,
                top_k=64
            )

            response = self.client.generate_content(
                prompt,
                generation_config=generation_config
            )

            return response.text.strip()
        except Exception as e:
            logger.error(f"Gemini API error: {e}")
            raise

class LabelingTemplates:
    """Prompt templates for labeling spam, irrelevant, Rant Without Visit"""

    REVIEWS_CLASSIFICATION = """
You are a strict content moderator. Classify the following review into one or more categories.

Review text: "{text}"

Classify this review and respond with ONLY valid JSON in this exact format:
[
  {{
    "Label": "spam",
    "Confidence": <One of "high", "medium", "low">,
    "Abstain": <true/false>
  }},
  {{
    "Label": "irrelevant",
    "Confidence": <One of "high", "medium", "low">,
    "Abstain": <true/false>
  }},
  {{
    "Label": "rant_without_visit",
    "Confidence": <One of "high", "medium", "low">,
    "Abstain": <true/false>
  }},
  {{
    "Label": "genuine_review",
    "Confidence": <One of "high", "medium", "low">,
    "Abstain": <true/false>
  }}
]

Categories:
- spam: advertisements, promotions, links, coupons
- irrelevant: not about the place/business being reviewed
- rant_without_visit: admits or clearly implies they never visited
- genuine_review: authentic experience at the location

Confidence levels: high, medium, low
Set Abstain to true if you're unsure about that specific category.

Respond with ONLY the JSON array, no explanations.
"""



class DataLabeler:
    """Main class for labeling data with LLMs"""
    def __init__(self, llm_provider: LLMProvider, config: LabelingConfig):
        self.llm_provider = llm_provider
        self.config = config
        self.results = []

    def create_prompt(self, text: str) -> str:
        """Create prompt with the text to classify"""
        return self.config.prompt_template.format(text=text)

    def parse_label(self, response: str) -> List[str]:
        """Parse the label from LLM response"""
        try:
            response = response.strip()
            if response.startswith('```'):
                response = response.split('\n', 1)[1]
            if response.endswith('```'):
                response = response.rsplit('\n', 1)[0]

            # Parse JSON
            data = json.loads(response)
            result = []

            # Check each category
            for item in data:
                if not item.get("Abstain", True):  # Only include if not abstaining
                    confidence = item.get("Confidence", "low").lower()
                    if confidence in ["high", "medium"]:  # Only high/medium confidence
                        result.append(item["Label"])

            if len(result) == 0:
                logger.warning(f"No confident labels found in response: {response[:100]}...")
                return ["uncertain"]

            return result

        except json.JSONDecodeError as e:
            logger.error(f"JSON parsing error: {e}. Response: {response[:200]}...")
            return ["parse_error"]
        except Exception as e:
            logger.error(f"Error parsing label: {e}. Response: {response[:200]}...")
            return ["error"]


    def label_single_item(self, text: str, retries: int = 0) -> Dict[str, Any]:
        """Label a single data item"""
        retries = 0
        while retries < self.config.max_retries:
            try:
                prompt = self.create_prompt(text)
                raw_response = self.llm_provider.generate_response(
                    prompt,
                    temperature=self.config.temperature,
                    max_tokens=self.config.max_tokens
                )
                # If the request succeeds, process the response and return
                label = self.parse_label(raw_response)
                return {
                    'text': text,
                    'label': label,
                    'raw_response': raw_response,
                    'success': True,
                    'error': None
                }
            except Exception as e:
                # Check for specific rate-limit error (429)
                if '429' in str(e):
                    retries += 1
                    # Calculate exponential backoff with jitter
                    delay = (2 ** retries) + (random.uniform(0, 1))
                    delay = min(delay, self.config.max_backoff_time) # Truncated
                    logger.warning(f"Rate limit exceeded. Retrying in {delay:.2f} seconds... (Attempt {retries}/{self.config.max_retries})")
                    time.sleep(delay)
                else:
                    # For any other error, mark as a failure and break the loop
                    logger.error(f"Error labeling item '{text[:50]}...': {e}")
                    return {
                        'text': text,
                        'label': None,
                        'raw_response': None,
                        'success': False,
                        'error': str(e)
                    }
    
        # If max retries are reached without success
        logger.error(f"Max retries reached for item '{text[:50]}...'. Giving up.")
        return {
            'text': text,
            'raw_response': None,
            'success': False,
            'error': f"Max retries ({self.config.max_retries}) reached."
        }

    def get_label_distribution(self) -> Dict[str, int]:
        """Get distribution of labels"""
        if not self.results:
            return {}

        distribution = {}
        for result in self.results:
            if result['success'] and result['label']:
                label = result['label']
                distribution[label] = distribution.get(label, 0) + 1

        return distribution

    def get_success_rate(self) -> float:
        """Get success rate of labeling"""
        if not self.results:
            return 0.0

        successful = sum(1 for result in self.results if result['success'])
        return successful / len(self.results)





def process_dataframe(df: pd.DataFrame,
                     text_field: str,
                     labels: List[str],
                     api_key: str,
                     batch_size: int = 10,
                     max_items: Optional[int] = None,
                     save_progress: bool = True,
                     start_idx: int = 0,
                     verbose: bool = True) -> pd.DataFrame:
    """
    Process DataFrame with review classification

    Args:
        df: Pandas DataFrame containing the data
        text_field: Column name containing text to label
        labels: List of possible labels
        api_key: Gemini API key
        batch_size: Items to process at once
        max_items: Maximum items to process (None for all)
        save_progress: Whether to save intermediate progress
        progress_file: File to save progress to

    Returns:
        DataFrame with new label columns added
    """
    if verbose:
        setup_verbose_logging()
    logger.info(f"Starting to process DataFrame with {len(df)} rows")

    # Validate text field exists
    if text_field not in df.columns:
        raise ValueError(f"Text field '{text_field}' not found in DataFrame columns: {df.columns.tolist()}")

    # Create copy of DataFrame and add label columns
    new_df = df.copy()
    new_df = new_df.reset_index(drop=True)
    for label in labels:
        new_df[label] = 0

    # Add metadata columns
    new_df['labeling_success'] = False
    new_df['labeling_error'] = ""
    new_df['raw_response'] = ""

    # Setup configuration and provider
    config = LabelingConfig(
        task_name="review_classification",
        labels=labels,
        prompt_template=LabelingTemplates.REVIEWS_CLASSIFICATION,
        batch_size=batch_size,
        delay_between_requests=0.5,
        max_tokens=500
    )

    provider = GeminiProvider(api_key=api_key)
    labeler = DataLabeler(provider, config)

    # Load previous progress if exists
    start_idx = start_idx


    # Determine how many items to process
    total_items = min(len(df), max_items) if max_items else len(df)

    try:
        # Process in batches
        for i in range(start_idx, total_items, batch_size):
            end_idx = min(i + batch_size, total_items)
            batch_texts = df.iloc[i:end_idx][text_field].tolist()

            logger.info(f"Processing batch {i//batch_size + 1}: items {i} to {end_idx-1}")

            # Process each text in the batch
            for j, text in enumerate(batch_texts):
                row_idx = i + j

                # Skip empty texts
                if pd.isna(text) or str(text).strip() == "":
                    logger.warning(f"Skipping empty text at index {row_idx}")
                    continue

                # Label the text
                result = labeler.label_single_item(str(text))


                logger.info(f"Labeling result: {result}")

                logger.info(f"Label is {result['label']}")

                # Update DataFrame with results
                new_df.at[row_idx, 'labeling_success'] = result['success']
                new_df.at[row_idx, 'raw_response'] = result['raw_response']
                new_df.at[row_idx, 'labeling_error'] = result['error']

                logger.info(f"Updating df at id: {row_idx}")
                logger.info(f"The row is: {new_df.iloc[row_idx]}")

                # Set label columns
                if result['success'] and result['label']:
                    for label in result['label']:
                        if label in labels:  # Only set known labels
                            new_df.at[row_idx, label] = 1

                # Add delay between requests
                time.sleep(config.delay_between_requests)

            # Save progress
            if save_progress:
                

                # Save intermediate DataFrame
                new_df.to_csv(f"intermediate_results_{end_idx}.csv", index=False)
                logger.info(f"Saved intermediate results up to index {end_idx}")

    except KeyboardInterrupt:
        logger.info("Processing interrupted by user")
        return new_df
    except Exception as e:
        logger.error(f"Error during processing: {e}")
        raise

    # Calculate and log statistics
    success_rate = (new_df['labeling_success'].sum() / len(new_df)) * 100
    logger.info(f"Processing completed!")
    logger.info(f"Success rate: {success_rate:.2f}%")

    # Label distribution
    for label in labels:
        count = new_df[label].sum()
        percentage = (count / len(new_df)) * 100
        logger.info(f"{label}: {count} ({percentage:.2f}%)")

    return new_df

labelled_alabama_df = process_dataframe(
    df=alabama_df,
    text_field="text",
    labels=["spam", "irrelevant", "rant_without_visit", "genuine_review"],
    api_key="YOUR_API_KEY",
    batch_size=20,
    max_items=5000,
    verbose=False,
    start_idx=2990
)


2025-08-30 22:03:00,224 - INFO - Starting to process DataFrame with 2700714 rows
2025-08-30 22:03:01,411 - INFO - Processing batch 150: items 2990 to 3009
2025-08-30 22:03:02,334 - INFO - Labeling result: {'text': 'Love my family at Healthwest.', 'label': ['genuine_review'], 'raw_response': '```json\n[\n  {\n    "Label": "spam",\n    "Confidence": "low",\n    "Abstain": false\n  },\n  {\n    "Label": "irrelevant",\n    "Confidence": "low",\n    "Abstain": false\n  },\n  {\n    "Label": "rant_without_visit",\n    "Confidence": "low",\n    "Abstain": false\n  },\n  {\n    "Label": "genuine_review",\n    "Confidence": "high",\n    "Abstain": false\n  }\n]\n```', 'success': True, 'error': None}
2025-08-30 22:03:02,334 - INFO - Label is ['genuine_review']
2025-08-30 22:03:02,335 - INFO - Updating df at id: 2990
2025-08-30 22:03:02,337 - INFO - The row is: user_id                                         108744781792966672384.0
name                                                     Betty Co

KeyError: 'label'

In [26]:
import pandas as pd

# List all your intermediate CSV files in the correct order
file_list = [
    '/PATH/intermediate_results_70.csv',
    '/PATH/intermediate_results_530.csv',
    '/PATH/intermediate_results_1000.csv',
    '/PATH/intermediate_results_1990.csv',
    '/PATH/intermediate_results_2990.csv',
    '/PATH/intermediate_results_3990.csv'
    

    
]

# Create an empty list to store the labeled DataFrames
labeled_dfs = []
total_rows_to_process = 3990
rows_processed = 0

# Loop through the list of files and read each one
for file in file_list:
    df = pd.read_csv(file)
    
    # Calculate how many rows to take from the current file
    rows_to_take = min(len(df), total_rows_to_process - rows_processed)
    
    if rows_to_take <= 0:
        # If we have already collected all 1000 rows, stop the loop
        break
        
    # Take only the labeled rows (assuming 'labeling_success' is True for labeled rows)
    labeled_rows_in_file = df[df['labeling_success'] == True]
    
    # Take only the required number of rows from this file
    chunk_to_add = labeled_rows_in_file.head(rows_to_take)
    
    labeled_dfs.append(chunk_to_add)
    
    rows_processed += len(chunk_to_add)
    
    print(f"Read {len(chunk_to_add)} labeled rows from {file}. Total so far: {rows_processed}")

# Concatenate all DataFrames in the list into a single DataFrame
final_labeled_df = pd.concat(labeled_dfs, ignore_index=True)

# Save the combined DataFrame to a new CSV file
final_labeled_df.to_csv('final_labeled_data_v2.csv', index=False)

print("\n--------------------")
print(f"Successfully combined labeled data into 'final_labeled_data.csv'.")
print(f"The final DataFrame has {len(final_labeled_df)} rows.")

  df = pd.read_csv(file)


Read 70 labeled rows from /Users/shuangshuang/Downloads/intermediate_results_70.csv. Total so far: 70


  df = pd.read_csv(file)


Read 460 labeled rows from /Users/shuangshuang/Downloads/intermediate_results_530.csv. Total so far: 530


  df = pd.read_csv(file)


Read 470 labeled rows from /Users/shuangshuang/Downloads/intermediate_results_1000.csv. Total so far: 1000


  df = pd.read_csv(file)


Read 990 labeled rows from /Users/shuangshuang/Downloads/intermediate_results_1990.csv. Total so far: 1990


  df = pd.read_csv(file)


Read 1000 labeled rows from /Users/shuangshuang/Downloads/intermediate_results_2990.csv. Total so far: 2990


  df = pd.read_csv(file)


Read 1000 labeled rows from /Users/shuangshuang/Downloads/intermediate_results_3990.csv. Total so far: 3990

--------------------
Successfully combined labeled data into 'final_labeled_data.csv'.
The final DataFrame has 3990 rows.
