In [None]:
!pip install -q spark-nlp==5.5.1 pyspark==3.5.3 Flask==3.1.0 pyngrok==7.2.1

In [None]:
import json
import sparknlp
from pyspark.ml.feature import CountVectorizer, IDF
from pyspark.ml.clustering import LDA
from pyspark.ml import Pipeline
from dataclasses import dataclass, asdict
from typing import List, Optional, Dict
from pyspark.sql.types import StringType
from pyspark.sql.functions import col
from sparknlp.annotator import *
from sparknlp.base import *

In [None]:
from typing import List, Optional, Dict, Tuple
from pydantic import BaseModel, Field
from datetime import datetime

class AspectSentiment(BaseModel):
    aspect: str
    sentiment: str
    confidence: float

class Review(BaseModel):
    reviewer_name: str
    review_title: str
    rating: int
    review_text: str
    date: str
    verified_purchase: bool
    review_imgs: List[str]
    sentiment: Optional[str] = None
    sentiment_score: Optional[float] = None
    aspect_sentiments: Optional[List[AspectSentiment]] = None

class Topic(BaseModel):
    name: str
    topic_name_explanation: Optional[str] = None
    top_terms: List[Tuple[str, float]]
    reviews: List[Review]
    topic_sentiment_distribution: Dict[str, float] = Field(default_factory=dict)

class Cluster(BaseModel):
    topics: List[Topic]
    cluster_name: str


In [None]:
class SentimentAnalyzer:
    def __init__(self, spark, model_name='sentimentdl_use_imdb'):
        self.model_name = model_name
        self.spark = spark
        self.pipeline = self._create_pipeline()

    def _create_pipeline(self):
        documentAssembler = DocumentAssembler()\
            .setInputCol("text")\
            .setOutputCol("document")

        use = UniversalSentenceEncoder.pretrained(name="tfhub_use", lang="en")\
            .setInputCols(["document"])\
            .setOutputCol("sentence_embeddings")

        sentimentdl = SentimentDLModel.pretrained(name=self.model_name, lang="en")\
            .setInputCols(["sentence_embeddings"])\
            .setOutputCol("sentiment")

        return Pipeline(stages=[documentAssembler, use, sentimentdl])

    def analyze(self, reviews: List[Review]) -> List[Review]:
        """Analyze sentiment for a list of reviews"""
        reviews_text = [rev.review_text for rev in reviews]
        sentiment_df = self.spark.createDataFrame(reviews_text, StringType()).toDF("text")
        sentiment_result = self.pipeline.fit(sentiment_df).transform(sentiment_df)

        updated_reviews = []
        for i, (review, row) in enumerate(zip(reviews, sentiment_result.select("document", "sentiment").collect())):
            # Create a new Review instance with updated sentiment
            review_dict = review.model_dump()
            review_dict["sentiment"] = row["sentiment"][0]["result"]
            updated_reviews.append(Review.model_validate(review_dict))

        return updated_reviews

In [None]:
class AspectBasedSentimentAnalyzer:
    def __init__(self, spark):
        self.spark = spark
        self.pipeline = self._create_pipeline()

    def _create_pipeline(self):
        document_assembler = DocumentAssembler() \
            .setInputCol('text')\
            .setOutputCol('document')

        sentence_detector = SentenceDetector() \
            .setInputCols(['document'])\
            .setOutputCol('sentence')

        tokenizer = Tokenizer()\
            .setInputCols(['sentence']) \
            .setOutputCol('token')

        word_embeddings = WordEmbeddingsModel.pretrained("glove_6B_300", "xx")\
            .setInputCols(["document", "token"])\
            .setOutputCol("embeddings")
            
        ner_model = NerDLModel.pretrained("ner_aspect_based_sentiment")\
            .setInputCols(["document", "token", "embeddings"])\
            .setOutputCol("ner")

        ner_converter = NerConverter()\
            .setInputCols(['sentence', 'token', 'ner']) \
            .setOutputCol('ner_chunk')

        # Add sentiment classifier for overall sentiment
        use = UniversalSentenceEncoder.pretrained(name="tfhub_use", lang="en")\
            .setInputCols(["document"])\
            .setOutputCol("sentence_embeddings")

        sentimentdl = SentimentDLModel.pretrained(name="sentimentdl_use_imdb", lang="en")\
            .setInputCols(["sentence_embeddings"])\
            .setOutputCol("sentiment")

        return Pipeline(stages=[
            document_assembler, 
            sentence_detector,
            tokenizer,
            word_embeddings,
            ner_model,
            ner_converter,
            use,
            sentimentdl
        ])

    def _extract_aspects(self, row):
        """Extract aspects and their associated sentiments from the NER results"""
        aspects = []
        if row.ner_chunk:
            for chunk in row.ner_chunk:
                # Parse the metadata to get sentiment and confidence
                metadata = chunk.metadata
                aspect = chunk.result
                sentiment = metadata.get('sentiment', 'neutral')
                confidence = float(metadata.get('probability', 0.0))  # Changed from 'confidence' to 'probability'
                
                aspects.append(AspectSentiment(
                    aspect=aspect,
                    sentiment=sentiment,
                    confidence=confidence
                ))
        return aspects

    def _extract_sentiment_score(self, sentiment_result):
        """Extract sentiment score from sentiment result"""
        try:
            # First try to get the score from probability field
            if 'probability' in sentiment_result['metadata']:
                return float(sentiment_result['metadata']['probability'])
            # If not found, try to get the highest class probability
            elif 'class_probability' in sentiment_result['metadata']:
                return max(float(p) for p in sentiment_result['metadata']['class_probability'].split('|'))
            # If neither exists, return a default score
            return 1.0
        except (KeyError, ValueError):
            return 1.0

    def analyze(self, reviews: List[Review]) -> List[Review]:
        """Analyze both overall sentiment and aspect-based sentiments for a list of reviews"""
        reviews_text = [rev.review_text for rev in reviews]
        sentiment_df = self.spark.createDataFrame(reviews_text, StringType()).toDF("text")
        
        # Apply the pipeline
        result = self.pipeline.fit(sentiment_df).transform(sentiment_df)
        
        updated_reviews = []
        for i, (review, row) in enumerate(zip(
            reviews, 
            result.select("document", "sentiment", "ner_chunk").collect()
        )):
            # Create a new Review instance with updated sentiments
            review_dict = review.model_dump()
            
            # Set overall sentiment
            sentiment_result = row["sentiment"][0]
            review_dict["overall_sentiment"] = sentiment_result["result"]
            review_dict["overall_sentiment_score"] = self._extract_sentiment_score(sentiment_result)
            
            # Extract aspect-based sentiments
            review_dict["aspect_sentiments"] = self._extract_aspects(row)
            
            updated_reviews.append(Review.model_validate(review_dict))

        return updated_reviews

In [None]:
class TopicModeler:
    def __init__(self, spark, num_topics=5, vocab_size=1000):
        self.num_topics = num_topics
        self.vocab_size = vocab_size
        self.spark = spark
        self.pipeline = self._create_pipeline()
        self.model = None
        self.vocabulary = None

    def _create_pipeline(self):
        document = DocumentAssembler().setInputCol("text").setOutputCol("document")
        sentence = SentenceDetector().setInputCols(["document"]).setOutputCol("sentences")
        tokenizer = Tokenizer().setInputCols(["sentences"]).setOutputCol("tokens")
        normalizer = Normalizer().setInputCols(["tokens"]).setOutputCol("normalized").setLowercase(True)
        stopwords_cleaner = StopWordsCleaner().setInputCols(["normalized"]).setOutputCol("cleanTokens").setCaseSensitive(False)
        lemmatizer = LemmatizerModel.pretrained().setInputCols(["cleanTokens"]).setOutputCol("lemmas")
        finisher = Finisher().setInputCols(["lemmas"]).setOutputCols(["finished_lemmas"]).setCleanAnnotations(False)
        cv = CountVectorizer(inputCol="finished_lemmas", outputCol="raw_features", vocabSize=self.vocab_size, minDF=2.0)
        idf = IDF(inputCol="raw_features", outputCol="features")
        lda = LDA(k=self.num_topics, maxIter=10, featuresCol="features", optimizer="online")

        return Pipeline(stages=[document, sentence, tokenizer, normalizer, stopwords_cleaner, lemmatizer, finisher, cv, idf, lda])

    def fit_transform(self, reviews: List[Review]) -> Cluster:
        """Fit the topic model and transform the reviews into topics"""
        reviews_text = [review.review_text for review in reviews]
        df = self.spark.createDataFrame([(text,) for text in reviews_text], ["text"])

        self.model = self.pipeline.fit(df)
        transformed = self.model.transform(df)

        self.vocabulary = self.model.stages[7].vocabulary

        return self._create_cluster(transformed, reviews)

    def _calculate_sentiment_distribution(self, reviews: List[Review]) -> Dict[str, float]:
        """Calculate sentiment distribution for a list of reviews"""
        sentiments = [review.sentiment for review in reviews]
        distribution = {}
        total = len(sentiments)

        for sentiment in set(sentiments):
            if sentiment:
                count = sentiments.count(sentiment)
                distribution[sentiment] = count / total * 100 if total > 0 else 0

        return distribution

    def _create_cluster(self, transformed_df, reviews: List[Review]) -> Cluster:
        """Create a cluster from transformed data"""
        topics = self.model.stages[-1].describeTopics()
        topics_list = []

        for topic_id, terms in enumerate(topics.collect()):
            top_terms = [(self.vocabulary[term_id], float(weight))  # Convert numpy float to Python float
                        for term_id, weight in zip(terms['termIndices'], terms['termWeights'])]
            topic_name = f"Topic {topic_id}"
            reviews_for_topic = []

            for row in transformed_df.select("text", "topicDistribution").collect():
                if row["topicDistribution"][topic_id] > 0.1:
                    for review in reviews:
                        if row.text == review.review_text:
                            reviews_for_topic.append(review)

            sentiment_distribution = self._calculate_sentiment_distribution(reviews_for_topic)

            topics_list.append(Topic(
                name=topic_name,
                top_terms=top_terms,
                reviews=reviews_for_topic,
                topic_sentiment_distribution=sentiment_distribution
            ))

        return Cluster(topics=topics_list, cluster_name='first')

In [None]:
def process_raw_reviews(raw_reviews: dict) -> List[Review]:
    return [
        Review(
            reviewer_name=review['reviewer_name'],
            review_title=review['review_title'],
            rating=review['rating'],
            review_text=review['review_text'],
            date=review['date'],
            verified_purchase=review['verified_perchase'],  # Note: fixed typo in field name
            review_imgs=review['review_imgs']
        )
        for review in raw_reviews
    ]

In [None]:
spark = sparknlp.start()

In [None]:
sentiment_analyzer = SentimentAnalyzer(spark)
# aspect_based_sentiment_analyser = AspectBasedSentimentAnalyzer(spark)
topic_modeler = TopicModeler(spark, num_topics=5, vocab_size=1000)

In [None]:
from openai import OpenAI
from typing import List, Tuple, Dict, Optional
import time
from pydantic import BaseModel
import logging

class TopicNamingResponse(BaseModel):
    topic_name: str
    explanation: Optional[str] = None

class OpenAITopicNamer:
    def __init__(self, api_key: str, model: str = "gpt-4o-mini-2024-07-18"):
        self.client = OpenAI(api_key=api_key)
        self.model = model
        self.logger = logging.getLogger(__name__)

    def _create_prompt(self, top_terms: List[Tuple[str, float]]) -> str:
        terms_str = ", ".join([f"{term} ({weight:.3f})" for term, weight in top_terms])

        return f"""Given these weighted terms from a topic model: {terms_str}

                  Please:
                  1. Analyze these terms and their weights
                  2. Identify the underlying theme or concept they represent
                  3. Create a short (2-4 words), clear name for this topic
                  4. Provide a brief explanation of why this name fits the terms

                  Respond with a JSON object having these fields:
                  - topic_name (string): A short, clear name for the topic
                  - explanation (optional string): A brief explanation of why this name fits"""

    def name_topic(self, top_terms: List[Tuple[str, float]], temperature: float = 0.3) -> TopicNamingResponse:
        try:
            prompt = self._create_prompt(top_terms)

            response = self.client.beta.chat.completions.parse(
                model=self.model,
                messages=[
                    {
                        "role": "system",
                        "content": "You are a helpful AI that specializes in analyzing topic models and creating insightful topic names."
                    },
                    {"role": "user", "content": prompt}
                ],
                temperature=temperature,
                response_format=TopicNamingResponse,
            )

            return response.choices[0].message.parsed

        except Exception as e:
            self.logger.error(f"Error naming topic: {str(e)}")
            return TopicNamingResponse(
                topic_name=f"Topic ({', '.join(term for term, _ in top_terms[:3])})",
                explanation="Fallback name using top terms due to naming service error"
            )

    def update_cluster_topics(self, cluster: 'Cluster') -> 'Cluster':
        try:
            updated_topics = []

            for topic in cluster.topics:
                naming_response = self.name_topic(topic.top_terms)

                topic_dict = topic.model_dump()
                topic_dict['name'] = naming_response.topic_name
                topic_dict['topic_name_explanation'] = naming_response.explanation
                updated_topic = Topic.model_validate(topic_dict)
                updated_topics.append(updated_topic)

                time.sleep(0.5)

            cluster_dict = cluster.model_dump()
            cluster_dict['topics'] = updated_topics
            return Cluster.model_validate(cluster_dict)

        except Exception as e:
            self.logger.error(f"Error updating cluster topics: {str(e)}")
            return cluster

In [None]:
from flask import Flask, request, jsonify, make_response
from google.colab import userdata
from pyngrok import ngrok

PORT = 5000

ngrok.set_auth_token(userdata.get('NGROK_AUTH_TOKEN'))
public_url = ngrok.connect(PORT).public_url

app = Flask(__name__)

@app.route("/", methods=['POST'])
def main():
  data = request.get_json()

  if not data:
    return jsonify({"error": "No JSON data provided"}), 400

  reviews = process_raw_reviews(data)

  reviews_with_sentiment = sentiment_analyzer.analyze(reviews)
  # reviews_with_sentiment = aspect_based_sentiment_analyser.analyze(reviews)

  # for review in reviews_with_sentiment:
  #   for aspect in review.aspect_sentiments:
  #     print(aspect.aspect)
  #     print(aspect.sentiment)
  #   print('-'*50)

  # topic_modeler = TopicModeler(spark, num_topics=5, vocab_size=1000)
  cluster = topic_modeler.fit_transform(reviews_with_sentiment)

  topic_namer = OpenAITopicNamer(api_key=userdata.get('OPENAIAPI_SECRET_KEY'))
  updated_cluster = topic_namer.update_cluster_topics(cluster)

  return make_response(
        updated_cluster.model_dump_json(),
        200,
        {'Content-Type': 'application/json'}
    )


if __name__ == '__main__':
  print(f"Access the app on {public_url}")
  app.run(port=PORT)