# RDD Operations

In [None]:
from pyspark import StorageLevel
import time
import kagglehub

# Download dataset
path = kagglehub.dataset_download("bittlingmayer/amazonreviews")
print("Dataset path:", path)

# Load data
text_rdd = sc.textFile(f"{path}/train.ft.txt")  # Adjust path as needed

# Word Count without caching
start_time = time.time()
word_counts_no_cache = (text_rdd
                      .flatMap(lambda line: line.split(" "))
                      .map(lambda word: (word.lower(), 1))
                      .reduceByKey(lambda a, b: a + b))
word_counts_no_cache.count()
no_cache_time = time.time() - start_time

# Word Count with caching
start_time = time.time()
words_rdd = (text_rdd
             .flatMap(lambda line: line.split(" "))
             .map(lambda word: (word.lower(), 1))
             .persist(StorageLevel.MEMORY_ONLY))

word_counts_cache = words_rdd.reduceByKey(lambda a, b: a + b)
word_counts_cache.count()
cache_time = time.time() - start_time

# Results
print(f"Time without caching: {no_cache_time:.2f} seconds")
print(f"Time with caching: {cache_time:.2f} seconds")
print(f"Performance improvement: {(no_cache_time - cache_time)/no_cache_time*100:.1f}%")

# Show top 10 words
word_counts_cache.takeOrdered(10, key=lambda x: -x[1])

# Dataframe Operations

In [None]:
from pyspark.sql import functions as F
from pyspark.sql.types import StructType, StructField, StringType, IntegerType
import matplotlib.pyplot as plt

# Define schema for reviews
schema = StructType([
    StructField("label", StringType(), True),
    StructField("review", StringType(), True)
])

# Create DataFrame from text file
df = spark.read.csv(f"{path}/train.ft.txt", sep="\t", schema=schema)

# Add length column
df = df.withColumn("review_length", F.length("review"))

# RDD operation - calculate average review length by label
start_time = time.time()
rdd_result = (df.rdd
              .map(lambda row: (row.label, (row.review_length, 1)))
              .reduceByKey(lambda a, b: (a[0]+b[0], a[1]+b[1])))
              .mapValues(lambda x: x[0]/x[1])))
rdd_time = time.time() - start_time

# DataFrame operation - same calculation
start_time = time.time()
df_result = (df.groupBy("label")
             .agg(F.avg("review_length").alias("avg_length")))
df_time = time.time() - start_time

# Visualization
plt.figure(figsize=(8, 5))
plt.bar(['RDD', 'DataFrame'], [rdd_time, df_time], color=['blue', 'green'])
plt.title('Execution Time Comparison')
plt.ylabel('Time (seconds)')
plt.show()

# Show results
df_result.show()

# Spark SQL

In [None]:
# Create temporary view
df.createOrReplaceTempView("amazon_reviews")

# SQL query for sentiment analysis (assuming __label__1 is positive, __label__2 is negative)
sql_result = spark.sql("""
    SELECT 
        CASE 
            WHEN label = '__label__1' THEN 'positive'
            ELSE 'negative'
        END as sentiment,
        AVG(review_length) as avg_length,
        COUNT(*) as review_count
    FROM amazon_reviews
    GROUP BY sentiment
""")

# Show results
sql_result.show()

# Combined approach using DataFrame API
from pyspark.sql.functions import when

df_with_sentiment = df.withColumn(
    "sentiment", 
    when(df.label == "__label__1", "positive").otherwise("negative")
)

analysis_result = (df_with_sentiment
                 .groupBy("sentiment")
                 .agg(
                     F.avg("review_length").alias("avg_length"),
                     F.count("*").alias("review_count")
                 ))

# Compare performance
start_time = time.time()
spark.sql("""
    SELECT 
        CASE WHEN label = '__label__1' THEN 'positive' ELSE 'negative' END as sentiment,
        AVG(review_length) as avg_length
    FROM amazon_reviews
    GROUP BY sentiment
""").collect()
sql_time = time.time() - start_time

start_time = time.time()
(df.withColumn("sentiment", when(df.label == "__label__1", "positive").otherwise("negative"))
 .groupBy("sentiment")
 .agg(F.avg("review_length"))
 .collect())
df_api_time = time.time() - start_time

print(f"SQL execution time: {sql_time:.4f} seconds")
print(f"DataFrame API execution time: {df_api_time:.4f} seconds")

# Optimization and Monitoring

In [None]:
from pyspark.sql import SparkSession

# Configure Spark for optimization
spark = (SparkSession.builder
         .appName("AmazonReviewsAnalysis")
         .config("spark.sql.shuffle.partitions", "8")
         .config("spark.executor.memory", "2g")
         .getOrCreate())

# 1. Selective persistence
optimized_df = (df.filter(F.length("review") > 0)  # Remove empty reviews
               .select("label", "review", "review_length")
               .persist(StorageLevel.MEMORY_AND_DISK))

# 2. Strategic partitioning
(optimized_df.repartition(4, "label")
            .write
            .mode("overwrite")
            .parquet("data/amazon_reviews_optimized"))

# 3. Reading optimized data
optimized_reviews = spark.read.parquet("data/amazon_reviews_optimized")

# Show partitioning information
print(f"Number of partitions: {optimized_reviews.rdd.getNumPartitions()}")
optimized_reviews.groupBy("label").count().show()

# Example optimized analysis
from pyspark.ml.feature import Tokenizer, StopWordsRemover

# Tokenize reviews
tokenizer = Tokenizer(inputCol="review", outputCol="words")
words_df = tokenizer.transform(optimized_reviews)

# Remove stop words
remover = StopWordsRemover(inputCol="words", outputCol="filtered_words")
cleaned_df = remover.transform(words_df)

# Show results
cleaned_df.select("label", "filtered_words").show(5, truncate=False)

# Comparative Analysis

In [None]:
from prettytable import PrettyTable

# Create comparison table
comparison_table = PrettyTable()
comparison_table.field_names = ["Operation", "RDD", "DataFrame", "Spark SQL"]

comparison_table.add_row(["Word Count", f"{no_cache_time:.2f}s", "N/A", "N/A"])
comparison_table.add_row(["Word Count (cached)", f"{cache_time:.2f}s", "N/A", "N/A"])
comparison_table.add_row(["Aggregation", f"{rdd_time:.2f}s", f"{df_time:.2f}s", f"{sql_time:.2f}s"])
comparison_table.add_row(["Sentiment Analysis", "N/A", f"{df_api_time:.2f}s", f"{sql_time:.2f}s"])

print("Performance Comparison:")
print(comparison_table)

# Additional Analysis Example

In [None]:
# Sentiment word frequency analysis
from collections import defaultdict

# Get positive and negative reviews
positive_reviews = cleaned_df.filter(cleaned_df.label == "__label__1")
negative_reviews = cleaned_df.filter(cleaned_df.label == "__label__2")

# Function to count words
def count_words(df):
    word_counts = defaultdict(int)
    for row in df.select("filtered_words").collect():
        for word in row.filtered_words:
            word_counts[word] += 1
    return word_counts

# Count words in positive and negative reviews
positive_counts = count_words(positive_reviews)
negative_counts = count_words(negative_reviews)

# Find most distinctive words
distinctive_words = []
for word in set(positive_counts.keys()).union(set(negative_counts.keys())):
    pos = positive_counts.get(word, 0)
    neg = negative_counts.get(word, 0)
    if pos + neg > 100:  # Only consider words with significant occurrence
        ratio = (pos + 1) / (neg + 1)  # Add 1 to avoid division by zero
        distinctive_words.append((word, ratio, pos, neg))

# Top 10 positive words
sorted(distinctive_words, key=lambda x: -x[1])[:10]

# Top 10 negative words
sorted(distinctive_words, key=lambda x: x[1])[:10]