In [None]:
from pyspark.sql import SparkSession

# Initialize a Spark session
spark = SparkSession.builder \
    .appName("Large Dataset Import") \
    .config("spark.driver.memory", "8g") \
    .getOrCreate()

# Load the dataset
file_path = "/Users/swagatkumarpanda/Downloads/HotelCSVOutputs/output_part_1.csv"
df = spark.read.csv(file_path, header=True, inferSchema=True)

# Show the schema of the dataframe
df.printSchema()

# Show the first few rows of the dataframe
df.show(5)



root
 |-- hotel_url: string (nullable = true)
 |-- author: string (nullable = true)
 |-- date: timestamp (nullable = true)
 |-- rating: double (nullable = true)
 |-- title: string (nullable = true)
 |-- text: string (nullable = true)

+--------------------+------------+-------------------+------+--------------------+--------------------+
|           hotel_url|      author|               date|rating|               title|                text|
+--------------------+------------+-------------------+------+--------------------+--------------------+
|Hotel_Review-g194...|violettaf340|2019-01-01 00:00:00|   5.0|        Xmas holiday|We went here with...|
|Hotel_Review-g194...|   Lagaiuzza|2016-01-01 00:00:00|   5.0|  Baltic, what else?|We have spent in ...|
|Hotel_Review-g194...|  ashleyn763|2014-10-01 00:00:00|   5.0|Excellent in ever...|I visited Hotel B...|
|Hotel_Review-g194...| DavideMauro|2014-08-01 00:00:00|   5.0|The house of your...|I've travelled qu...|
|Hotel_Review-g194...|    Alem

                                                                                

In [7]:
# Check the total number of rows before cleaning
initial_row_count = df.count()
print(f"Initial row count: {initial_row_count}")

# Perform data cleaning
# For example, let's remove rows with any null values
df_cleaned = df.dropna()

# Check the total number of rows after cleaning
final_row_count = df_cleaned.count()
print(f"Final row count: {final_row_count}")

                                                                                

Initial row count: 6598369




Final row count: 6598291


                                                                                

In [8]:
# Drop the 'author' column from the cleaned dataframe
df_cleaned = df_cleaned.drop('author','date','title','hotel_url')

# Show the schema of the dataframe after dropping the column
df_cleaned.printSchema()

# Show the first few rows of the dataframe after dropping the column
df_cleaned.show(5)

root
 |-- rating: double (nullable = true)
 |-- text: string (nullable = true)

+------+--------------------+
|rating|                text|
+------+--------------------+
|   5.0|We went here with...|
|   5.0|We have spent in ...|
|   5.0|I visited Hotel B...|
|   5.0|I've travelled qu...|
|   4.0|We decided for th...|
+------+--------------------+
only showing top 5 rows



In [9]:
from pyspark.ml.feature import Tokenizer, StopWordsRemover, HashingTF, IDF
from pyspark.ml.classification import LogisticRegression
from pyspark.ml import Pipeline
from pyspark.sql.functions import col
from pyspark.sql.functions import rand
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

# Tokenize the text column
tokenizer = Tokenizer(inputCol="text", outputCol="words")

# Remove stop words
remover = StopWordsRemover(inputCol="words", outputCol="filtered")

# Apply HashingTF to convert words to term frequency vectors
hashingTF = HashingTF(inputCol="filtered", outputCol="rawFeatures", numFeatures=10000)

# Apply IDF to rescale the term frequency vectors
idf = IDF(inputCol="rawFeatures", outputCol="features")

# Initialize the Logistic Regression model
lr = LogisticRegression(labelCol="rating", featuresCol="features")

# Create a pipeline with the stages
pipeline = Pipeline(stages=[tokenizer, remover, hashingTF, idf, lr])

# Fit the pipeline to the cleaned dataframe
model = pipeline.fit(df_cleaned)

# Make predictions on the cleaned dataframe
predictions = model.transform(df_cleaned)

# Show the predictions
predictions.select("text", "rating", "prediction").show(5)
# Split the data into training and test sets (80% training, 20% test)
train_data, test_data = df_cleaned.orderBy(rand()).randomSplit([0.8, 0.2], seed=1234)

# Fit the pipeline to the training data
model = pipeline.fit(train_data)

# Make predictions on the test data
predictions = model.transform(test_data)

# Evaluate the model

evaluator = MulticlassClassificationEvaluator(labelCol="rating", predictionCol="prediction", metricName="accuracy")
accuracy = evaluator.evaluate(predictions)
print(f"Accuracy: {accuracy}")

# Show the predictions
predictions.select("text", "rating", "prediction").show(5)

                                                                                

+--------------------+------+----------+
|                text|rating|prediction|
+--------------------+------+----------+
|We went here with...|   5.0|       5.0|
|We have spent in ...|   5.0|       5.0|
|I visited Hotel B...|   5.0|       5.0|
|I've travelled qu...|   5.0|       5.0|
|We decided for th...|   4.0|       4.0|
+--------------------+------+----------+
only showing top 5 rows



                                                                                

Accuracy: 0.6737455952547019




+--------------------+------+----------+
|                text|rating|prediction|
+--------------------+------+----------+
|               dirty|   1.0|       1.0|
|"""non smoking"" ...|   1.0|       1.0|
|"A number of issu...|   1.0|       1.0|
|"Accommodation in...|   1.0|       4.0|
|"After 17 yrs of ...|   1.0|       1.0|
+--------------------+------+----------+
only showing top 5 rows



                                                                                

In [11]:
# Define the path where you want to save the model
model_save_path = "/Users/swagatkumarpanda/Downloads/HotelCSVOutputs/spark_model"

# Save the model using Spark's save method
model.save(model_save_path)

                                                                                