In [1]:
from pyspark.sql import SparkSession

# Initialize Spark with HDFS configuration
spark = SparkSession.builder \
    .appName("Amazon Reviews Analysis") \
    .config("spark.hadoop.fs.defaultFS", "hdfs://namenode:9000") \
    .getOrCreate()
# Now you can read data from HDFS
# Replace the path with your actual path where JSON files are stored
df = spark.read.json("hdfs://namenode:9000/user/hadoop/amazon_reviews/data/filtered_data/Watches.filtered.json")

# Verify that data was loaded correctly
print(f"Number of records: {df.count()}")
print("Schema:")
#df.printSchema()

# Show a few sample records
df.show(5, truncate=False)

Number of records: 22713
Schema:
+-------------+-----------------+--------------------------------------------------------------------+------------------+-----------------------+------------+-----------------------------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

In [2]:
from pyspark.sql import SparkSession
import pyspark.sql.functions as F
from pyspark.sql.types import *

# Initialize Spark with connection to your cluster and HDFS
spark = SparkSession.builder \
    .appName("Amazon Reviews Topic Modeling") \
    .config("spark.hadoop.fs.defaultFS", "hdfs://namenode:9000") \
    .getOrCreate()

print("Connected to Spark!")

Connected to Spark!


In [3]:
# Adjust path to match your actual location in HDFS
base_path = "hdfs://namenode:9000/user/hadoop/amazon_reviews/data/filtered_data/"

# Get a count of reviews per category to understand data distribution
category_counts = []

# List all categories you have
categories = [
    "Amazon_Instant_Video",
    "Arts",
    "Automotive",
    "Baby",
    "Beauty",
    "Books",
    "Cell_Phones_&_Accessories",
    "Clothing_&_Accessories",
    "Electronics",
    "Gourmet_Foods",
    "Health",
    "Home_&_Kitchen",
    "Industrial_&_Scientific",
    "Jewelry",
    "Kindle_Store",
    "Movies_&_TV",
    "Music",
    "Musical_Instruments",
    "Office_Products",
    "Patio",
    "Pet_Supplies",
    "Shoes",
    "Software",
    "Sports_&_Outdoors",
    "Tools_&_Home_Improvement",
    "Toys_&_Games",
    "Video_Games",
    "Watches"
]

# Count records per category
for category in categories:
    try:
        df = spark.read.json(f"{base_path}{category}.filtered.json")
        count = df.count()
        category_counts.append((category, count))
        print(f"Category: {category}, Records: {count}")
    except:
        print(f"Error loading category: {category}")

# Check total number of reviews
print(f"Total categories loaded: {len(category_counts)}")
print(f"Total reviews: {sum([count for _, count in category_counts])}")

Category: Amazon_Instant_Video, Records: 102103
Category: Arts, Records: 18313
Category: Automotive, Records: 122187
Category: Baby, Records: 60774
Category: Beauty, Records: 119483
Category: Books, Records: 1126634
Category: Cell_Phones_&_Accessories, Records: 23723
Category: Clothing_&_Accessories, Records: 10758
Category: Electronics, Records: 340186
Category: Gourmet_Foods, Records: 97891
Category: Health, Records: 211388
Category: Home_&_Kitchen, Records: 409426
Category: Industrial_&_Scientific, Records: 25506
Category: Jewelry, Records: 22862
Category: Kindle_Store, Records: 11
Category: Movies_&_TV, Records: 732723
Category: Music, Records: 689520
Category: Musical_Instruments, Records: 54373
Category: Office_Products, Records: 65053
Category: Patio, Records: 115932
Category: Pet_Supplies, Records: 117584
Category: Shoes, Records: 3581
Category: Software, Records: 25999
Category: Sports_&_Outdoors, Records: 196970
Category: Tools_&_Home_Improvement, Records: 231206
Category: To

In [4]:
# Function to load all categories
def load_categories(categories, base_path):
    # Start with the first category
    print(f"Loading {categories[0]}...")
    all_df = spark.read.json(f"{base_path}{categories[0]}.filtered.json")
    all_df = all_df.withColumn("category", F.lit(categories[0]))
    
    # Add all other categories with a union
    for category in categories[1:]:
        try:
            print(f"Loading {category}...")
            df = spark.read.json(f"{base_path}{category}.filtered.json")
            df = df.withColumn("category", F.lit(category))
            all_df = all_df.union(df)
        except Exception as e:
            print(f"Error with {category}: {str(e)}")
    
    return all_df

# Load data from all categories (this might take a while)
all_reviews = load_categories(categories, base_path)
print(f"Total reviews loaded: {all_reviews.count()}")

Loading Amazon_Instant_Video...
Loading Arts...
Loading Automotive...
Loading Baby...
Loading Beauty...
Loading Books...


ERROR:root:KeyboardInterrupt while sending command.
Traceback (most recent call last):
  File "/usr/local/spark/python/lib/py4j-0.10.9.7-src.zip/py4j/java_gateway.py", line 1038, in send_command
    response = connection.send_command(command)
               ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/spark/python/lib/py4j-0.10.9.7-src.zip/py4j/clientserver.py", line 511, in send_command
    answer = smart_decode(self.stream.readline()[:-1])
                          ^^^^^^^^^^^^^^^^^^^^^^
  File "/opt/conda/lib/python3.11/socket.py", line 706, in readinto
    return self._sock.recv_into(b)
           ^^^^^^^^^^^^^^^^^^^^^^^
KeyboardInterrupt


Loading Cell_Phones_&_Accessories...


KeyboardInterrupt: 

In [None]:
from pyspark.ml.feature import RegexTokenizer, StopWordsRemover
from pyspark.sql.types import ArrayType, StringType

# First, combine the review summary and text
all_reviews = all_reviews.withColumn(
    "full_text", 
    F.concat_ws(" ", 
                F.col("Review/Summary"), 
                F.col("Review/Text"))
)

# Step 1: Tokenize the text (split into words)
tokenizer = RegexTokenizer(
    inputCol="full_text", 
    outputCol="words", 
    pattern="\\W+"  # Split on non-word characters
)
reviews_tokenized = tokenizer.transform(all_reviews)

# Step 2: Remove stopwords (common words like "the", "and", etc.)
remover = StopWordsRemover(
    inputCol="words", 
    outputCol="filtered_words"
)
reviews_no_stopwords = remover.transform(reviews_tokenized)

# Step 3: Filter out very short words
filter_short_udf = F.udf(
    lambda words: [word for word in words if len(word) > 2],
    ArrayType(StringType())
)
processed_reviews = reviews_no_stopwords.withColumn(
    "filtered_words",
    filter_short_udf(F.col("filtered_words"))
)

# Check our processing results
processed_reviews.select("category", "full_text", "filtered_words").show(2, truncate=False)

In [None]:
from pyspark.ml.feature import CountVectorizer

# Create a CountVectorizer to convert words to vectors
# This counts how many times each word appears
vectorizer = CountVectorizer(
    inputCol="filtered_words", 
    outputCol="features",
    vocabSize=10000,  # Keep the top 10,000 words
    minDF=5           # Word must appear in at least 5 documents
)

# Fit the vectorizer on our data
vectorizer_model = vectorizer.fit(processed_reviews)
vectorized_reviews = vectorizer_model.transform(processed_reviews)

# See what we get
vectorized_reviews.select("category", "filtered_words", "features").show(2)

# Get the vocabulary for later use
vocabulary = vectorizer_model.vocabulary
print(f"Vocabulary size: {len(vocabulary)}")
print(f"Some example words: {vocabulary[:10]}")

In [None]:
from pyspark.ml.clustering import LDA

# Set the number of topics (this is a key parameter!)
num_topics = 20

# Create the LDA model
lda = LDA(
    k=num_topics,           # Number of topics
    maxIter=20,             # Number of iterations
    featuresCol="features", # Column with our word vectors
    optimizer="em"          # Use expectation-maximization algorithm
)

# Train the model (this might take a while!)
print("Training LDA model...")
lda_model = lda.fit(vectorized_reviews)
print("Model training complete!")

In [None]:
from pyspark.sql.types import ArrayType, StringType

# Get topics with their top terms
topics = lda_model.describeTopics(maxTermsPerTopic=15)

# Convert term indices to actual words
def term_indices_to_words(indices):
    return [vocabulary[idx] for idx in indices]

# Create a UDF (User-Defined Function) for this
term_indices_to_words_udf = F.udf(term_indices_to_words, ArrayType(StringType()))

# Apply the UDF to get readable topics
topics_with_words = topics.withColumn(
    "terms", 
    term_indices_to_words_udf(F.col("termIndices"))
)

# Show each topic with its top terms
topics_with_words.select("topic", "terms", "termWeights").show(truncate=False)

In [None]:
from pyspark.sql.types import FloatType

# Apply the model to get topic distributions for each review
reviews_with_topics = lda_model.transform(vectorized_reviews)

# Extract the primary topic for each review
def get_primary_topic(distribution):
    return float(distribution.argmax())

# Create a UDF for this
get_primary_topic_udf = F.udf(get_primary_topic, FloatType())

# Get the primary topic for each review
reviews_with_topics = reviews_with_topics.withColumn(
    "primary_topic", 
    get_primary_topic_udf(F.col("topicDistribution"))
)

# Check distribution of topics
reviews_with_topics.groupBy("primary_topic").count().orderBy("primary_topic").show()

In [None]:
# Get topic distribution by category
topic_by_category = reviews_with_topics.groupBy("category", "primary_topic").count()

# Calculate percentages within each category
category_totals = reviews_with_topics.groupBy("category").count().withColumnRenamed("count", "total")

# Join the counts with totals
topic_percentage = topic_by_category.join(category_totals, on="category")
topic_percentage = topic_percentage.withColumn(
    "percentage", 
    F.round((F.col("count") / F.col("total") * 100), 2)
)

# Show results sorted by category and percentage
topic_percentage.orderBy("category", F.desc("percentage")).show(50)

In [None]:
# Convert to Pandas for easier manipulation
topics_df = topics_with_words.toPandas()

print("TOPIC SUMMARY:")
print("-" * 80)

for i, row in topics_df.iterrows():
    topic_id = row['topic']
    terms = row['terms']
    weights = row['termWeights']
    
    # Sort terms by weight
    term_weights = sorted(zip(terms, weights), key=lambda x: x[1], reverse=True)
    
    # Display top terms with weights
    print(f"Topic {topic_id}:")
    print(", ".join([f"{term} ({weight:.3f})" for term, weight in term_weights[:10]]))
    
    # Show what categories this topic appears in most
    top_categories = topic_percentage.filter(F.col("primary_topic") == topic_id) \
                                      .orderBy(F.desc("percentage")) \
                                      .limit(3)
    
    top_categories_pd = top_categories.toPandas()
    if not top_categories_pd.empty:
        category_info = ", ".join([f"{row['category']} ({row['percentage']}%)" 
                                  for _, row in top_categories_pd.iterrows()])
        print(f"Most common in: {category_info}")
    
    print("-" * 80)

In [None]:
# Save topics for reference
topics_with_words.write.mode("overwrite").parquet(
    "hdfs://namenode:9000/user/hadoop/amazon_reviews/results/lda_topics"
)

# Save review topic assignments
reviews_with_topics.select(
    "ProductID", "category", "Review/Score", "primary_topic", "topicDistribution"
).write.mode("overwrite").parquet(
    "hdfs://namenode:9000/user/hadoop/amazon_reviews/results/review_topics"
)

print("Results saved successfully!")

In [None]:
from pyspark.ml.feature import RegexTokenizer, StopWordsRemover, CountVectorizer
from pyspark.ml.clustering import LDA
from pyspark.sql.types import ArrayType, StringType

def process_one_category(category):
    print(f"Starting to process: {category}")
    
    # Step 3.1: Load just this category's data
    try:
        df = spark.read.json(f"{base_path}{category}.filtered.json")
        print(f"  Loaded {df.count()} reviews for {category}")
    except Exception as e:
        print(f"  ERROR loading {category}: {str(e)}")
        return None
    
    # Step 3.2: Combine review title and text
    df = df.withColumn(
        "full_text", 
        F.concat_ws(" ", F.col("Review/Summary"), F.col("Review/Text"))
    )
    
    # Step 3.3: Tokenize (split text into words)
    tokenizer = RegexTokenizer(
        inputCol="full_text", 
        outputCol="words", 
        pattern="\\W+"  # Split on non-word characters
    )
    df_tokenized = tokenizer.transform(df)
    
    # Step 3.4: Remove stopwords
    remover = StopWordsRemover(
        inputCol="words", 
        outputCol="filtered_words"
    )
    df_filtered = remover.transform(df_tokenized)
    
    # Step 3.5: Remove short words (less than 3 chars)
    filter_short_udf = F.udf(
        lambda words: [word for word in words if len(word) > 2],
        ArrayType(StringType())
    )
    df_filtered = df_filtered.withColumn(
        "filtered_words",
        filter_short_udf(F.col("filtered_words"))
    )
    
    # Step 3.6: Convert words to vectors
    vectorizer = CountVectorizer(
        inputCol="filtered_words", 
        outputCol="features",
        vocabSize=5000,  # Smaller vocabulary to save memory
        minDF=3          # Word must appear in at least 3 documents
    )
    
    # Fit and transform
    vectorizer_model = vectorizer.fit(df_filtered)
    df_vectorized = vectorizer_model.transform(df_filtered)
    
    # Get vocabulary for later use
    vocabulary = vectorizer_model.vocabulary
    print(f"  Created vocabulary with {len(vocabulary)} words")
    
    # Step 3.7: Run LDA
    num_topics = 10  # Fewer topics to save memory
    
    lda = LDA(
        k=num_topics,
        maxIter=10,    # Fewer iterations to save memory
        featuresCol="features",
        optimizer="online"  # More memory-efficient
    )
    
    # Train the model
    print(f"  Training LDA model for {category}...")
    try:
        lda_model = lda.fit(df_vectorized)
        print(f"  LDA model for {category} complete!")
        
        # Step 3.8: Get topics with their terms
        topics = lda_model.describeTopics(maxTermsPerTopic=10)
        
        # Convert term indices to actual words
        def indices_to_words(indices):
            return [vocabulary[idx] for idx in indices]
        
        indices_to_words_udf = F.udf(indices_to_words, ArrayType(StringType()))
        
        # Apply the conversion
        topics_with_words = topics.withColumn(
            "terms", 
            indices_to_words_udf(F.col("termIndices"))
        )
        
        # Add category name
        topics_with_words = topics_with_words.withColumn("category", F.lit(category))
        
        # Step 3.9: Save the results

        # Save model
        model_path = f"hdfs://namenode:9000/user/jovyan/amazon_reviews/models/lda_model_{category}"
        lda_model.write().overwrite().save(model_path)
        print(f"  Saved LDA model to {model_path}")

        # Save vocabulary (needed for the CountVectorizer)
        vocab_rdd = spark.sparkContext.parallelize([(word,) for word in vocabulary])
        vocab_df = spark.createDataFrame(vocab_rdd, ["word"])
        vocab_df.write.mode("overwrite").parquet(f"hdfs://namenode:9000/user/jovyan/amazon_reviews/models/vocabulary_{category}")

        # 3. Save the CountVectorizer model
        vectorizer_path = f"hdfs://namenode:9000/user/jovyan/amazon_reviews/models/vectorizer_{category}"
        vectorizer_model.save(vectorizer_path)
        print(f"  Saved CountVectorizer model to {vectorizer_path}")
        
        output_path = f"hdfs://namenode:9000/user/jovyan/amazon_reviews/results/topics_{category}"
        topics_with_words.write.mode("overwrite").parquet(output_path)
        print(f"  Saved results to {output_path}")
        
        # Return as pandas for local analysis
        return topics_with_words.toPandas()
        
    except Exception as e:
        print(f"  ERROR in LDA for {category}: {str(e)}")
        return None

In [None]:
# Use a dictionary to store results
results = {}

# Process each category
for category in categories:
    # Process this category
    result = process_one_category(category)
    
    # Save result if successful
    if result is not None:
        results[category] = result
        print(f"Successfully processed {category}")
    else:
        print(f"Failed to process {category}")
    
    # IMPORTANT: Clear cache between categories to free memory
    spark.catalog.clearCache()
    print(f"Cleared cache after {category}")
    print("-" * 50)

print("All categories processed!")