In [1]:
SparkContext

pyspark.context.SparkContext

In [2]:
sc.master

'local[*]'

In [7]:
import time

from pyspark.sql import SparkSession
from pyspark.sql.functions import col
from sentiment_analysis.config import logger
from sentiment_analysis.utils import run_command, delete_local_file
from sentiment_analysis.load import load_amazon_reviews, load_model
from sentiment_analysis.process import batch_sentiment_analysis
import sentiment_analysis.process as process



In [4]:
def write_df_to_hdfs_csv(df, hdfs_path, csv_file_name):
    logger.info(f"WRITING ANALYSIS SUMMARY OUTPUT {csv_file_name} TO HDFS...")
    write_path = hdfs_path + csv_file_name
    df.write.option("header", "true").mode("overwrite").csv(write_path)
    hdfs_mv_cmd = [
        "/home/almalinux/hadoop-3.4.0/bin/hdfs",
        "dfs",
        "-mv",
        write_path + "/part-00000-*.csv",
        write_path + ".csv",
    ]
    run_command(hdfs_mv_cmd)
    hdfs_rm_cmd = [
        "/home/almalinux/hadoop-3.4.0/bin/hdfs",
        "dfs",
        "-rm",
        "-r",
        write_path,
    ]
    run_command(hdfs_rm_cmd)


In [5]:
def merge_results_csv_in_hdfs(read_hdfs_path, write_hdfs_path, csv_file_name):
    logger.info(f"WRITING ANALYSIS SUMMARY OUTPUT {csv_file_name} TO HDFS...")
    final_path = f"{write_hdfs_path}/{csv_file_name}.csv"
    temp_csv_path = "/tmp/merged_results.csv"
    # Merge csv files from hdfs and save them locally
    merge_command = [
        "/home/almalinux/hadoop-3.4.0/bin/hdfs",
        "dfs",
        "-getmerge",
        f"{read_hdfs_path}/part-*.csv",
        temp_csv_path,
    ]
    run_command(merge_command)

    # Upload the merged csv to hdfs
    upload_command = [
        "/home/almalinux/hadoop-3.4.0/bin/hdfs",
        "dfs",
        "-put",
        "-f",
        temp_csv_path,
        final_path,
    ]
    run_command(upload_command)
    # Remove the local merged file
    delete_local_file(temp_csv_path)


In [8]:
read_hdfs_path, write_hdfs_path = "/analysis_outputs", "/summary_outputs"
merge_results_csv_in_hdfs(read_hdfs_path, write_hdfs_path, "sentiment_analysis_results")

2025-04-09 12:14:09,498 - INFO - WRITING ANALYSIS SUMMARY OUTPUT sentiment_analysis_results TO HDFS...
2025-04-09 12:14:11,646 - INFO - Command /home/almalinux/hadoop-3.4.0/bin/hdfs dfs -getmerge /analysis_outputs/part-*.csv /tmp/merged_results.csv Output: 
2025-04-09 12:14:11,650 - ERROR - Command /home/almalinux/hadoop-3.4.0/bin/hdfs dfs -getmerge /analysis_outputs/part-*.csv /tmp/merged_results.csv Error: getmerge: `/analysis_outputs/part-*.csv': No such file or directory

2025-04-09 12:14:13,906 - INFO - Command /home/almalinux/hadoop-3.4.0/bin/hdfs dfs -put -f /tmp/merged_results.csv /summary_outputs/sentiment_analysis_results.csv Output: 
2025-04-09 12:14:13,910 - INFO - /tmp/merged_results.csv local file has been deleted.


In [11]:
input_path, output_path = "/Subscription_Boxes.jsonl", "/analysis_outputs"
# Initialize Spark session
spark = SparkSession.builder.appName("SentimentAnalysis").getOrCreate()
# Load the dataset
reviews_df = load_amazon_reviews(spark, input_path)
# Count total reviews
total_reviews = reviews_df.count()
logger.info(f"Processing {total_reviews:,} reviews")
# Load model and tokenizer
tokenizer, model = load_model()
# Broadcast model and tokenizer to all workers
process.bc_tokenizer = spark.sparkContext.broadcast(tokenizer)
process.bc_model = spark.sparkContext.broadcast(model)
sentiment_results_df = reviews_df.withColumn(
    "result",
    process.batch_sentiment_analysis(reviews_df["text"]),
)
# Flatten the result column
sentiment_results_df = sentiment_results_df.select(
    col("asin"),
    col("user_id"),
    col("result.review_text"),
    col("result.sentiment"),
    col("result.score"),
)
start_time = time.time()
sentiment_results_df.write.option("header", "true").mode("overwrite").csv(output_path)
end_time = time.time()
logger.info(f"Done processing all reviews in {end_time - start_time:.2f} seconds")
# Combine results into a single csv file
merge_results_csv_in_hdfs(
    output_path, "/summary_outputs", "sentiment_analysis_results"
)
# # Read the Parquet files
# df = spark.read.parquet(output_path).coalesce(1)
# # Write to CSV
# write_df_to_hdfs_csv(
#     df,
#     "/summary_outputs/",
#     "sentiment_analysis_results",
# )
# Stop Spark session
spark.stop()


25/04/09 12:16:44 WARN CacheManager: Asked to cache already cached data.        
2025-04-09 12:16:44,953 - INFO - Processing 16,216 reviews
2025-04-09 12:16:44,954 - INFO - Loading BERTweet model and tokenizer...
25/04/09 12:16:49 WARN Utils: Suppressing exception in finally: Java heap space]
java.lang.OutOfMemoryError: Java heap space
	at java.base/java.nio.HeapByteBuffer.<init>(HeapByteBuffer.java:61)
	at java.base/java.nio.ByteBuffer.allocate(ByteBuffer.java:348)
	at org.apache.spark.broadcast.TorrentBroadcast$.$anonfun$blockifyObject$1(TorrentBroadcast.scala:360)
	at org.apache.spark.broadcast.TorrentBroadcast$.$anonfun$blockifyObject$1$adapted(TorrentBroadcast.scala:360)
	at org.apache.spark.broadcast.TorrentBroadcast$$$Lambda$2171/0x000000084101f040.apply(Unknown Source)
	at org.apache.spark.util.io.ChunkedByteBufferOutputStream.allocateNewChunkIfNeeded(ChunkedByteBufferOutputStream.scala:87)
	at org.apache.spark.util.io.ChunkedByteBufferOutputStream.write(ChunkedByteBufferOutput

Py4JJavaError: An error occurred while calling o22.broadcast.
: java.lang.OutOfMemoryError: Java heap space
	at java.base/java.nio.HeapByteBuffer.<init>(HeapByteBuffer.java:61)
	at java.base/java.nio.ByteBuffer.allocate(ByteBuffer.java:348)
	at org.apache.spark.broadcast.TorrentBroadcast$.$anonfun$blockifyObject$1(TorrentBroadcast.scala:360)
	at org.apache.spark.broadcast.TorrentBroadcast$.$anonfun$blockifyObject$1$adapted(TorrentBroadcast.scala:360)
	at org.apache.spark.broadcast.TorrentBroadcast$$$Lambda$2171/0x000000084101f040.apply(Unknown Source)
	at org.apache.spark.util.io.ChunkedByteBufferOutputStream.allocateNewChunkIfNeeded(ChunkedByteBufferOutputStream.scala:87)
	at org.apache.spark.util.io.ChunkedByteBufferOutputStream.write(ChunkedByteBufferOutputStream.scala:75)
	at net.jpountz.lz4.LZ4BlockOutputStream.flushBufferedData(LZ4BlockOutputStream.java:225)
	at net.jpountz.lz4.LZ4BlockOutputStream.write(LZ4BlockOutputStream.java:178)
	at java.base/java.io.ObjectOutputStream$BlockDataOutputStream.write(ObjectOutputStream.java:1849)
	at java.base/java.io.ObjectOutputStream.write(ObjectOutputStream.java:708)
	at org.apache.spark.util.Utils$.$anonfun$copyStream$1(Utils.scala:272)
	at org.apache.spark.util.Utils$$$Lambda$2275/0x000000084108d840.apply$mcJ$sp(Unknown Source)
	at scala.runtime.java8.JFunction0$mcJ$sp.apply(JFunction0$mcJ$sp.scala:17)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally(SparkErrorUtils.scala:64)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally$(SparkErrorUtils.scala:61)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:94)
	at org.apache.spark.util.Utils$.copyStream(Utils.scala:279)
	at org.apache.spark.api.python.PythonBroadcast.$anonfun$writeObject$1(PythonRDD.scala:772)
	at org.apache.spark.api.python.PythonBroadcast$$Lambda$3615/0x00000008415a0040.apply$mcJ$sp(Unknown Source)
	at scala.runtime.java8.JFunction0$mcJ$sp.apply(JFunction0$mcJ$sp.scala:17)
	at org.apache.spark.util.SparkErrorUtils.tryOrIOException(SparkErrorUtils.scala:35)
	at org.apache.spark.util.SparkErrorUtils.tryOrIOException$(SparkErrorUtils.scala:33)
	at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:94)
	at org.apache.spark.api.python.PythonBroadcast.writeObject(PythonRDD.scala:768)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.base/java.lang.reflect.Method.invoke(Method.java:566)
	at java.base/java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:1016)
	at java.base/java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1487)
	at java.base/java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1423)


[Stage 5:>                                                          (0 + 3) / 3]