In [3]:
from tensorflow.keras.layers import Input, Lambda, Dense, Flatten,Conv2D
from tensorflow.keras.models import Model
from tensorflow.keras.applications.vgg19 import VGG19
from tensorflow.keras.applications.resnet50 import preprocess_input
from tensorflow.keras.preprocessing import image
from tensorflow.keras.preprocessing.image import ImageDataGenerator,load_img
from tensorflow.keras.models import Sequential
from keras.datasets import mnist
import numpy as np
import pathlib
from glob import glob
import tensorflow as tf
import os
import matplotlib.pyplot as plt
from tensorflow.keras.layers import MaxPooling2D, Embedding, LSTM, Dense
from pyspark.sql import SparkSession
import imghdr
from pyspark.sql.functions import when, col
from tensorflow.keras.optimizers import Adam
from tensorflow.keras.callbacks import EarlyStopping
from tensorflow.keras.preprocessing.text import Tokenizer
from tensorflow.keras.preprocessing.sequence import pad_sequences
from tensorflow.keras.utils import to_categorical

  import imghdr


In [4]:
spark = SparkSession.builder.appName("YelpReviews").getOrCreate()

In [5]:
spark._jsc.sc().getExecutorMemoryStatus().size()

1

In [6]:
df = spark.read.json("yelp_academic_dataset_review.json")

In [8]:
#df= spark.createDataFrame(df)

In [9]:
df.printSchema()

# Display some data
df.show(5)

root
 |-- business_id: string (nullable = true)
 |-- cool: long (nullable = true)
 |-- date: string (nullable = true)
 |-- funny: long (nullable = true)
 |-- review_id: string (nullable = true)
 |-- stars: double (nullable = true)
 |-- text: string (nullable = true)
 |-- useful: long (nullable = true)
 |-- user_id: string (nullable = true)

+--------------------+----+-------------------+-----+--------------------+-----+--------------------+------+--------------------+
|         business_id|cool|               date|funny|           review_id|stars|                text|useful|             user_id|
+--------------------+----+-------------------+-----+--------------------+-----+--------------------+------+--------------------+
|XQfwVwDr-v0ZS3_Cb...|   0|2018-07-07 22:09:11|    0|KU_O5udG6zpxOg-Vc...|  3.0|If you decide to ...|     0|mh_-eMZ6K5RLWhZyI...|
|7ATYjTIgM3jUlt4UM...|   1|2012-01-03 15:28:18|    0|BiTunyQ73aT9WBnpR...|  5.0|I've taken a lot ...|     1|OyoGAe7OKpv6SyGZT...|
|YjUWPp

In [10]:
df = df.select("text", "stars")

In [11]:
df.show(5)

+--------------------+-----+
|                text|stars|
+--------------------+-----+
|If you decide to ...|  3.0|
|I've taken a lot ...|  5.0|
|Family diner. Had...|  3.0|
|Wow!  Yummy, diff...|  5.0|
|Cute interior and...|  4.0|
+--------------------+-----+
only showing top 5 rows



In [12]:
df = df.withColumn("label", when(col("stars") <= 2, 0)    # Negative
                             .when(col("stars") == 3, 1)  # Neutral
                             .otherwise(2))              # Positive

In [13]:
df.show(5)

+--------------------+-----+-----+
|                text|stars|label|
+--------------------+-----+-----+
|If you decide to ...|  3.0|    1|
|I've taken a lot ...|  5.0|    2|
|Family diner. Had...|  3.0|    1|
|Wow!  Yummy, diff...|  5.0|    2|
|Cute interior and...|  4.0|    2|
+--------------------+-----+-----+
only showing top 5 rows



Down-Sampling attempt

In [14]:
# Step 1: Check the distribution of classes
class_distribution = df.groupBy("label").count().collect()
total_count = sum([row['count'] for row in class_distribution])

# Calculate the fraction of each class in the original dataset
fractions = {row['label']: row['count'] / total_count for row in class_distribution}

In [15]:
# Step 2: Calculate how many samples to take from each class
target_samples = 500000
samples_per_class = {label: int(fractions[label] * target_samples) for label in fractions}

In [16]:
# Step 3: Downsample the dataset
# Filter each class separately and sample the required number of entries
downsampled_df = df.sampleBy("label", fractions={label: samples_per_class[label] / df.filter(col("label") == label).count() for label in samples_per_class})

In [17]:
# Step 4: Show the downsampled data
downsampled_df.show(5)

+--------------------+-----+-----+
|                text|stars|label|
+--------------------+-----+-----+
|I am a long term ...|  1.0|    0|
|This easter inste...|  3.0|    1|
|I go to blow bar ...|  5.0|    2|
|I recently had di...|  5.0|    2|
|I've been to this...|  3.0|    1|
+--------------------+-----+-----+
only showing top 5 rows



In [18]:
# Verify the size of the downsampled DataFrame
print("Total count in downsampled DataFrame:", downsampled_df.count())

Total count in downsampled DataFrame: 500234


In [19]:
downsampled_df.createOrReplaceTempView("down_df")
positive_counts = spark.sql("SELECT COUNT(*) FROM down_df WHERE label=2")

In [20]:
positive_counts.show()

+--------+
|count(1)|
+--------+
|  335288|
+--------+



In [21]:
neutral_counts = spark.sql("SELECT COUNT(*) FROM down_df WHERE label=1")

In [22]:
neutral_counts.show()

+--------+
|count(1)|
+--------+
|   49301|
+--------+



In [23]:
negative_counts = spark.sql("SELECT COUNT(*) FROM down_df WHERE label=0")

In [24]:
negative_counts.show()

+--------+
|count(1)|
+--------+
|  115645|
+--------+



In [25]:
# null_counts = downsampled_df.select([sum(col(c).isNull().cast("int")).alias(c) for c in downsampled_df.columns])

# # Show the count of nulls in each column
# null_counts.show()
null_count_label = downsampled_df.where(col("label").isNull())

In [26]:
null_count_label.show()

+----+-----+-----+
|text|stars|label|
+----+-----+-----+
+----+-----+-----+



In [27]:
null_count_label = downsampled_df.where(col("text").isNull())

In [28]:
null_count_label.show()

+----+-----+-----+
|text|stars|label|
+----+-----+-----+
+----+-----+-----+



End Down Sampling

In [30]:
# texts_rdd = downsampled_df.select("text").rdd.flatmap(lambda row: row.text)
# labels_rdd = downsampled_df.select("label").rdd.flatmap(lambda row: row.label)

In [31]:
text =downsampled_df.select("text")
labels = downsampled_df.select("label")

In [32]:
text.show(5)

+--------------------+
|                text|
+--------------------+
|I am a long term ...|
|This easter inste...|
|I go to blow bar ...|
|I recently had di...|
|I've been to this...|
+--------------------+
only showing top 5 rows



In [33]:
#text_rdd = downsampled_df.select("text").rdd.map(lambda row: row[0]).collect()

In [34]:
df.columns

['text', 'stars', 'label']

Pandas Pre-Processing

In [35]:
import pandas as pd

In [36]:
pand_df = downsampled_df.toPandas()

In [37]:
pand_df.shape

(500234, 3)

In [38]:
X = pand_df["text"]
y = pand_df["label"]

In [41]:
import nltk
from nltk.corpus import stopwords
nltk.download('stopwords')
stop = stopwords.words('english')

pos_tweets = [('I love this car', 'positive'),
    ('This view is amazing', 'positive'),
    ('I feel great this morning', 'positive'),
    ('I am so excited about the concert', 'positive'),
    ('He is my best friend', 'positive')]


# Exclude stopwords with Python's list comprehension and pandas.DataFrame.apply.
X['reviews_without_stopwords'] = X.apply(lambda x: ' '.join([word for word in x.split() if word not in (stop)]))
print(X)

[nltk_data] Downloading package stopwords to
[nltk_data]     C:\Users\atrav\AppData\Roaming\nltk_data...
[nltk_data]   Unzipping corpora\stopwords.zip.
A value is trying to be set on a copy of a slice from a DataFrame

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  X['reviews_without_stopwords'] = X.apply(lambda x: ' '.join([word for word in x.split() if word not in (stop)]))


0                            I am a long term frequent customer of this est...
1                            This easter instead of going to Lopez Lake we ...
2                            I go to blow bar to get my brows done by natal...
3                            I recently had dinner here with my wife over t...
4                            I've been to this location many times when I l...
                                                   ...                        
500230                       I decided to try this place out after Christma...
500231                       Food was great (lots of gluten-free options!!!...
500232                       Took a colleague here for dinner as we were tr...
500233                       It is very rare for a restaurant to be this go...
reviews_without_stopwords    0         I long term frequent customer establ...
Name: text, Length: 500235, dtype: object


In [42]:
pand_df = pand_df.applymap(lambda x: x.lower() if isinstance(x, str) else x)

In [43]:
# REMOVING URL's
import re

# Define a regex pattern to match URLs
url_pattern = re.compile(r'https?://\S+')

# Define a function to remove URLs from text
def remove_urls(text):
    return url_pattern.sub('', text)

# Apply the function to the 'text' column and create a new column 'clean_text'
pand_df['text'] = pand_df['text'].apply(remove_urls)


In [44]:
# Removing non-whitespace characters
pand_df = pand_df.replace(to_replace=r'[^\w\s]', value='', regex=True)

In [45]:
# Removing Digits
pand_df = pand_df.replace(to_replace=r'\d', value='', regex=True)

In [48]:
import nltk
from nltk.tokenize import word_tokenize
nltk.download('punkt')

pand_df['text'] = pand_df['text'].apply(word_tokenize)

[nltk_data] Downloading package punkt to
[nltk_data]     C:\Users\atrav\AppData\Roaming\nltk_data...
[nltk_data]   Unzipping tokenizers\punkt.zip.


In [49]:
import nltk
from nltk.stem import PorterStemmer
from nltk.tokenize import word_tokenize
import pandas as pd

# Initialize the Porter Stemmer
stemmer = PorterStemmer()

# Define a function to perform stemming on the 'text' column
def stem_words(words):
    return [stemmer.stem(word) for word in words]

# Define a function to perform stemming on the 'text' column
def stem_words(words):
    return [stemmer.stem(word) for word in words]

# Apply the function to the 'text' column and create a new column 'stemmed_text'
pand_df['stemmed_messages'] = pand_df['text'].apply(stem_words)

In [50]:
texts = pand_df["stemmed_messages"].values
labels = df['label'].values

In [51]:
texts = pand_df['stemmed_messages'].values
labels = pand_df['label'].values

In [52]:
print("Sample texts:", texts[:5])
print("Sample labels:", labels[:5])
print("Type of texts:", type(texts))
print("Type of labels:", type(labels))

Sample texts: [list(['i', 'am', 'a', 'long', 'term', 'frequent', 'custom', 'of', 'thi', 'establish', 'i', 'just', 'went', 'in', 'to', 'order', 'take', 'out', 'app', 'and', 'wa', 'told', 'theyr', 'too', 'busi', 'to', 'do', 'it', 'realli', 'the', 'place', 'is', 'mayb', 'half', 'full', 'at', 'best', 'doe', 'your', 'dick', 'reach', 'your', 'ass', 'ye', 'go', 'fuck', 'yourself', 'im', 'a', 'frequent', 'custom', 'and', 'great', 'tipper', 'glad', 'that', 'kanella', 'just', 'open', 'never', 'go', 'back', 'to', 'dmitri'])
 list(['thi', 'easter', 'instead', 'of', 'go', 'to', 'lopez', 'lake', 'we', 'went', 'to', 'lo', 'padr', 'nation', 'forest', 'which', 'is', 'realli', 'pretti', 'but', 'if', 'you', 'go', 'to', 'white', 'rock', 'the', 'staff', 'need', 'to', 'cut', 'down', 'all', 'the', 'dead', 'grass', 'that', 'invad', 'the', 'rock', 'and', 'the', 'water', 'i', 'would', 'wish', 'the', 'staff', 'would', 'also', 'clean', 'or', 'get', 'rid', 'of', 'the', 'dead', 'grass', 'that', 'also', 'live', 'by'

In [53]:
print("Shape of texts array:", texts.shape)
print("Shape of labels array:", labels.shape)

Shape of texts array: (500234,)
Shape of labels array: (500234,)


In [54]:
print("Number of empty texts:", sum(pd.isnull(pand_df['stemmed_messages'])))
print("Number of empty labels:", sum(pd.isnull(pand_df['label'])))

Number of empty texts: 0
Number of empty labels: 0


In [55]:
tokenizer = Tokenizer(num_words=5000)  # Limit to top 5000 words
tokenizer.fit_on_texts(texts)

# Convert texts to sequences of integers
sequences = tokenizer.texts_to_sequences(texts)

In [56]:
print("Sample sequences:", sequences[:5])

Sample sequences: [[3, 152, 4, 228, 1598, 964, 146, 8, 17, 731, 3, 48, 115, 11, 5, 40, 122, 41, 854, 2, 6, 180, 502, 100, 170, 5, 76, 7, 66, 1, 28, 9, 354, 353, 304, 30, 89, 444, 58, 4981, 1453, 58, 1910, 545, 42, 2597, 744, 96, 4, 964, 146, 2, 32, 585, 13, 48, 233, 110, 42, 52, 5], [17, 4109, 454, 8, 42, 5, 2825, 14, 115, 5, 1720, 2322, 4227, 63, 9, 66, 160, 18, 44, 20, 42, 5, 587, 1040, 1, 95, 120, 5, 434, 184, 43, 1, 1757, 2735, 13, 1, 1040, 2, 1, 331, 3, 54, 431, 1, 95, 54, 70, 198, 57, 39, 3111, 8, 1, 1757, 2735, 13, 70, 276, 81, 1, 331, 1, 331, 9, 66, 470, 2, 809, 1720, 2322, 4227, 95, 120, 5, 118, 380, 5, 1814, 17, 4227, 86, 160, 2, 23, 45, 4, 78, 2825, 86, 45, 16, 278, 4, 222, 80, 1556], [3, 42, 5, 1551, 141, 5, 39, 12, 2912, 277, 81, 2912, 3895, 63, 3, 283, 104, 62, 9, 32, 444, 4, 32, 325, 19, 12, 2133, 18, 137, 3, 77, 4, 1551, 81, 690, 3, 6, 427, 3, 21, 930, 1195, 1757, 429, 2, 62, 263, 37, 15, 1, 1520, 2415, 98, 163, 22, 61, 203, 538, 18, 108, 45, 7, 33, 133, 25, 59, 26, 12,

In [57]:
# Pad sequences to ensure uniform input size
max_sequence_length = 100  # Adjust this based on your data
padded_sequences = pad_sequences(sequences, maxlen=max_sequence_length)
print("Padded sequences shape:", padded_sequences.shape)

# Convert labels to categorical (one-hot encoding)
num_classes = len(np.unique(labels))  # Ensure num_classes is set correctly
categorical_labels = to_categorical(labels, num_classes=num_classes)
print("Categorical labels shape:", categorical_labels.shape)

Padded sequences shape: (500234, 100)
Categorical labels shape: (500234, 3)


In [58]:
from sklearn.model_selection import train_test_split

# Split the data into training and validation sets
X_train, X_val, y_train, y_val = train_test_split(
    padded_sequences,
    categorical_labels,
    test_size=0.2,  # 20% of data for validation
    random_state=42  # For reproducibility
)

In [59]:
# Define the LSTM model
model = Sequential()
model.add(Embedding(input_dim=5000, output_dim=128, input_length=max_sequence_length))
model.add(LSTM(128))
model.add(Dense(num_classes, activation='softmax'))

# Compile the model
model.compile(loss='categorical_crossentropy', optimizer='adam', metrics=['accuracy'])
model.summary()

# Train the model
history = model.fit(
    X_train, y_train,
    epochs=5,
    batch_size=32,
    validation_data=(X_val, y_val)
)
model.save('lstm_model.h5')



Epoch 1/5
[1m12506/12506[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m1285s[0m 102ms/step - accuracy: 0.8445 - loss: 0.4104 - val_accuracy: 0.8815 - val_loss: 0.3090
Epoch 2/5
[1m 3080/12506[0m [32m━━━━[0m[37m━━━━━━━━━━━━━━━━[0m [1m12:27[0m 79ms/step - accuracy: 0.8869 - loss: 0.2900


KeyboardInterrupt

