In [None]:
from google.colab import drive
drive.mount('/content/drive')

In [1]:
!pip install pyspark
!pip install dask[dataframe]

zsh:1: no matches found: dask[dataframe]


In [None]:
from pyspark.sql import SparkSession

# Create a Spark session
spark = SparkSession.builder \
    .appName("Yelp Data Merge") \
    .getOrCreate()

In [None]:


# Define the paths to the CSV files
business_csv = '/content/drive/MyDrive/yelp_dataset_csv/yelp_academic_dataset_business.csv'
#review_csv = '/content/drive/MyDrive/yelp_dataset_csv/yelp_academic_dataset_review.csv'

# Load the CSV files into DataFrames
business_df = spark.read.csv(business_csv, header=True, inferSchema=True)

# Rename the stars column in the business DataFrame
business_df = business_df.withColumnRenamed("stars", "overall_star")

# Rename the stars column in the review DataFrame
#review_df = review_df.withColumnRenamed("stars", "personal_star")

# Join the DataFrames on the business_id column
#merged_df = business_df.join(review_df, on='business_id', how='inner')

In [None]:
business_df.show(5)

In [None]:
#merged_df.write.csv('/content/drive/MyDrive/yelp_dataset_csv/merged_yelp_data.csv', header=True)

In [None]:
# Show basic statistics
business_df.describe().show()

In [None]:
import matplotlib.pyplot as plt
# Collect data for plotting
review_counts = business_df.select("review_count").rdd.flatMap(lambda x: x).collect()

# Plotting the distribution of review counts
plt.figure(figsize=(10, 6))
plt.hist(review_counts, bins=50, color='skyblue', edgecolor='black')
plt.xlabel('Review Count')
plt.ylabel('Frequency')
plt.title('Distribution of Review Counts')
plt.grid(axis='y', alpha=0.75)
plt.show()

In [None]:
# Count of businesses by state
business_count_by_state = business_df.groupBy("state").count().orderBy("count", ascending=False)

# Collect the data into a Pandas DataFrame for plotting
state_counts = business_count_by_state.toPandas()

# Plotting the count of businesses by state
plt.figure(figsize=(12, 6))
plt.bar(state_counts['state'], state_counts['count'], color='orange')
plt.xlabel('State')
plt.ylabel('Number of Businesses')
plt.title('Number of Businesses by State')
plt.xticks(rotation=45)
plt.tight_layout()
plt.show()

In [None]:
# Average rating based on whether the business is open or closed
open_closed_rating = business_df.groupBy("is_open").agg({"overall_star": "avg", "business_id": "count"}).withColumnRenamed("count(business_id)", "business_count").toPandas()

# Plotting average rating based on business status
plt.figure(figsize=(8, 5))
plt.bar(open_closed_rating['is_open'].astype(str), open_closed_rating['avg(overall_star)'], color=['lightblue', 'salmon'])
plt.xlabel('Business Status (0=Closed, 1=Open)')
plt.ylabel('Average Rating')
plt.title('Average Rating of Open vs. Closed Businesses')
plt.grid(axis='y', alpha=0.75)
plt.show()

In [None]:
# Calculate the average rating by state
average_rating_by_state =business_df.groupBy("state").agg({"overall_star": "avg"}).withColumnRenamed("avg(overall_star)", "average_rating").orderBy("average_rating", ascending=False)

# Get the top 5 states
top_states = average_rating_by_state.limit(5)
top_states_pd = top_states.toPandas()
top_states_list = top_states_pd['state'].tolist()


In [None]:
from pyspark.sql.window import Window
from pyspark.sql.functions import rank
import matplotlib.pyplot as plt
import seaborn as sns

# Create a window specification to rank businesses within the top states
window_spec = Window.partitionBy("state").orderBy(business_df.overall_star.desc())

# Rank businesses within the top 5 states
ranked_businesses_top_states = business_df.filter(business_df.state.isin(top_states_list)).withColumn("rank", rank().over(window_spec))

# Filter for top 5 businesses per top state
top_businesses_final = ranked_businesses_top_states.filter(ranked_businesses_top_states.rank <= 5).select("state", "name", "overall_star")
top_businesses_final_pd = top_businesses_final.toPandas()

# Set the aesthetic style of the plots
sns.set(style="whitegrid")

# Create a color palette
palette = sns.color_palette("husl", len(top_states_list))

# Create a figure
plt.figure(figsize=(16, 8))

# Iterate through top states and plot
for state in top_states_list:
    state_data = top_businesses_final_pd[top_businesses_final_pd['state'] == state]
    plt.bar(state_data['name'], state_data['overall_star'], label=state, alpha=0.7)

plt.xlabel('Business Name')
plt.ylabel('Overall Star Rating')
plt.title('Top 5 Businesses in the Top 5 States')
plt.xticks(rotation=45, ha='right')
plt.legend(title='State')
plt.grid(axis='y', alpha=0.75)
plt.tight_layout()
plt.show()


In [None]:
import dask.dataframe as dd
# If business_df is a PySpark DataFrame, convert it to Pandas
df = business_df.toPandas()  # Ensure you have imported necessary libraries for Spark
# Assuming df is your original Pandas DataFrame
df_dask = dd.from_pandas(df, npartitions=4)

In [None]:
# Now you can proceed with the analysis
average_rating_by_city = df_dask.groupby('city')['overall_star'].mean().nlargest(5).compute()
average_rating_by_city_pd = average_rating_by_city.reset_index()
average_rating_by_city_pd.columns = ['city', 'average_rating']

# Visualize using Matplotlib
import matplotlib.pyplot as plt
import seaborn as sns

sns.set(style="whitegrid")
plt.figure(figsize=(10, 6))
plt.bar(average_rating_by_city_pd['city'], average_rating_by_city_pd['average_rating'], color='skyblue')
plt.xlabel('City')
plt.ylabel('Average Rating')
plt.title('Top 5 Cities with Highest Average Ratings')
plt.xticks(rotation=45)
plt.ylim(0, 5)  # Assuming ratings are out of 5
plt.grid(axis='y', alpha=0.75)
plt.tight_layout()
plt.show()

In [None]:
!pip install datasets

In [None]:
pip install "dask[dataframe]"

In [None]:
import sys
print(sys.path)

In [None]:
import sys
sys.path.append('/Users/nushrat/Library/CloudStorage/OneDrive-LouisianaStateUniversity/LSU Class/Fall 24/Big Data/project/venv/lib/python3.12/site-packages')

import dask
print(dask.__version__)

In [None]:
import dask
print(dask.__version__)

In [None]:
import dask.dataframe as dd
print(dd)

In [12]:
#import dask.dataframe as dd
import csv
# Assuming the data is in a CSV file
#dask_df = dd.read_csv('yelp_academic_dataset_review.csv', blocksize=25e6, on_bad_lines='skip', lineterminator='\n', quoting=csv.QUOTE_NONE, low_memory=False)
#dask_df = dd.read_csv('yelp_academic_dataset_review.csv', on_bad_lines='skip', lineterminator='\n', quoting=csv.QUOTE_NONE, low_memory=False) 

In [None]:
import dask.dataframe as dd
import csv

# Load the full dataset, letting Dask handle it in chunks
dask_df = dd.read_csv(
    'yelp_academic_dataset_review.csv', 
    on_bad_lines='skip', 
    lineterminator='\n', 
    quoting=csv.QUOTE_NONE, 
    low_memory=False
)

# Print the first few rows to check the data is loading
print(dask_df.head())


In [14]:
cleaned_df = dask_df.compute()

In [None]:
import dask.dataframe as dd
import re
import numpy as np
import torch
import pandas as pd
from sklearn.model_selection import train_test_split
from sklearn.metrics import classification_report, accuracy_score
from transformers import BertTokenizer, BertForSequenceClassification, Trainer, TrainingArguments

In [None]:
# Convert 'stars' column to numeric, forcing errors to NaN
cleaned_df['stars'] = pd.to_numeric(cleaned_df['stars'], errors='coerce')

# Define a function to clean the text
def preprocess_text(text):
    if isinstance(text, str):
        text = text.lower()  # Lowercase
        text = re.sub(r'[^a-zA-Z0-9\s]', '', text)  # Remove special characters
        return text
    return ''  # Return empty string for NaN values

# Apply the preprocessing function
cleaned_df['clean_text'] = cleaned_df['text'].apply(preprocess_text)

# Map star ratings to classes
def map_stars_to_classes(stars):
    if stars >= 4:
        return 2  # Positive
    elif stars == 3:
        return 1  # Neutral
    else:
        return 0  # Negative

# Apply mapping function to create labels
cleaned_df['labels'] = cleaned_df['stars'].apply(map_stars_to_classes)

# Drop any rows with NaN in the 'labels' column
cleaned_df = cleaned_df.dropna(subset=['labels'])

# Split data into training, validation, and test sets
train_df, temp_df = train_test_split(cleaned_df, test_size=0.3, stratify=cleaned_df['labels'], random_state=42)
val_df, test_df = train_test_split(temp_df, test_size=0.5, stratify=temp_df['labels'], random_state=42)

# Load the BERT tokenizer
tokenizer = BertTokenizer.from_pretrained('bert-base-uncased')

# Tokenize the text
train_encodings = tokenizer(train_df['clean_text'].tolist(), truncation=True, padding=True, max_length=128)
val_encodings = tokenizer(val_df['clean_text'].tolist(), truncation=True, padding=True, max_length=128)
test_encodings = tokenizer(test_df['clean_text'].tolist(), truncation=True, padding=True, max_length=128)

# Create a PyTorch dataset
class YelpDataset(torch.utils.data.Dataset):
    def __init__(self, encodings, labels):
        self.encodings = encodings
        self.labels = labels

    def __getitem__(self, idx):
        item = {key: torch.tensor(val[idx]) for key, val in self.encodings.items()}
        item['labels'] = torch.tensor(self.labels[idx])
        return item

    def __len__(self):
        return len(self.labels)

# Prepare datasets
train_dataset = YelpDataset(train_encodings, train_df['labels'].tolist())
val_dataset = YelpDataset(val_encodings, val_df['labels'].tolist())
test_dataset = YelpDataset(test_encodings, test_df['labels'].tolist())

# Load BERT model
model = BertForSequenceClassification.from_pretrained('bert-base-uncased', num_labels=3)

# Define training arguments
training_args = TrainingArguments(
    output_dir='./results',
    num_train_epochs=3,
    per_device_train_batch_size=16,
    per_device_eval_batch_size=16,
    warmup_steps=500,
    weight_decay=0.01,
    logging_dir='./logs',
    logging_steps=10,
    evaluation_strategy="epoch",
)

# Create Trainer
trainer = Trainer(
    model=model,
    args=training_args,
    train_dataset=train_dataset,
    eval_dataset=val_dataset,
)

# Train the model
trainer.train()

# Evaluate the model on validation set
val_predictions, val_labels, _ = trainer.predict(val_dataset)
val_pred_labels = np.argmax(val_predictions, axis=1)
val_accuracy = accuracy_score(val_labels, val_pred_labels)

# Evaluate the model on test set
test_predictions, test_labels, _ = trainer.predict(test_dataset)
test_pred_labels = np.argmax(test_predictions, axis=1)
test_accuracy = accuracy_score(test_labels, test_pred_labels)

# Print results
print(f'Training complete.\nValidation Accuracy: {val_accuracy:.4f}')
print(f'Test Accuracy: {test_accuracy:.4f}')

# Generate classification report for the test set
report = classification_report(test_labels, test_pred_labels, target_names=['Negative', 'Neutral', 'Positive'])
print(report)

## TF-IDF

In [None]:
!pip install scikit-learn

In [None]:
import dask.dataframe as dd
import pandas as pd
import numpy as np
import re
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.model_selection import train_test_split
from sklearn.linear_model import LogisticRegression
from sklearn.metrics import accuracy_score, classification_report

# Convert 'stars' column to numeric, forcing errors to NaN
cleaned_df['stars'] = pd.to_numeric(cleaned_df['stars'], errors='coerce')

# Define a function to clean the text
def preprocess_text(text):
    if isinstance(text, str):
        text = text.lower()  # Lowercase
        text = re.sub(r'[^a-zA-Z0-9\s]', '', text)  # Remove special characters
        return text
    return ''  # Return empty string for NaN values

# Apply the preprocessing function
cleaned_df['clean_text'] = cleaned_df['text'].apply(preprocess_text)

# Drop any rows with NaN in the 'stars' column
cleaned_df = cleaned_df.dropna(subset=['stars'])

# Map star ratings to classes
def map_stars_to_classes(stars):
    if stars >= 4:
        return 2  # Positive
    elif stars == 3:
        return 1  # Neutral
    else:
        return 0  # Negative

# Apply mapping function to create labels
cleaned_df['labels'] = cleaned_df['stars'].apply(map_stars_to_classes)

# Drop any rows with NaN in the 'labels' column
cleaned_df = cleaned_df.dropna(subset=['labels'])

# Split data into training, validation, and test sets
train_df, temp_df = train_test_split(cleaned_df, test_size=0.3, stratify=cleaned_df['labels'], random_state=42)
val_df, test_df = train_test_split(temp_df, test_size=0.5, stratify=temp_df['labels'], random_state=42)

# Use TF-IDF to vectorize the text
vectorizer = TfidfVectorizer(max_features=5000)  # Limit to top 5000 words by frequency
X_train = vectorizer.fit_transform(train_df['clean_text'])
X_val = vectorizer.transform(val_df['clean_text'])
X_test = vectorizer.transform(test_df['clean_text'])

y_train = train_df['labels']
y_val = val_df['labels']
y_test = test_df['labels']

# Train a Logistic Regression model
clf = LogisticRegression(max_iter=1000, random_state=42)
clf.fit(X_train, y_train)

# Evaluate on validation set
val_predictions = clf.predict(X_val)
val_accuracy = accuracy_score(y_val, val_predictions)
print(f'Validation Accuracy: {val_accuracy:.4f}')

# Evaluate on test set
test_predictions = clf.predict(X_test)
test_accuracy = accuracy_score(y_test, test_predictions)
print(f'Test Accuracy: {test_accuracy:.4f}')

# Generate classification report for the test set
report = classification_report(y_test, test_predictions, target_names=['Negative', 'Neutral', 'Positive'])
print(report)


Linear Regression, SVM

In [None]:
import dask.dataframe as dd
import pandas as pd
import numpy as np
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.model_selection import train_test_split
from sklearn.linear_model import LogisticRegression
from sklearn.svm import SVC
from sklearn.metrics import accuracy_score, classification_report

# Convert 'stars' column to numeric, forcing errors to NaN
cleaned_df['stars'] = pd.to_numeric(cleaned_df['stars'], errors='coerce')

# Define a function to clean the text
def preprocess_text(text):
    if isinstance(text, str):
        text = text.lower()  # Lowercase
        text = re.sub(r'[^a-zA-Z0-9\s]', '', text)  # Remove special characters
        return text
    return ''  # Return empty string for NaN values

# Apply the preprocessing function
cleaned_df['clean_text'] = cleaned_df['text'].apply(preprocess_text)

# Drop any rows with NaN in the 'stars' column
cleaned_df = cleaned_df.dropna(subset=['stars'])

# Map star ratings to classes
def map_stars_to_classes(stars):
    if stars >= 4:
        return 2  # Positive
    elif stars == 3:
        return 1  # Neutral
    else:
        return 0  # Negative

# Apply mapping function to create labels
cleaned_df['labels'] = cleaned_df['stars'].apply(map_stars_to_classes)

# Drop any rows with NaN in the 'labels' column
cleaned_df = cleaned_df.dropna(subset=['labels'])

# Split data into training, validation, and test sets
train_df, temp_df = train_test_split(cleaned_df, test_size=0.3, stratify=cleaned_df['labels'], random_state=42)
val_df, test_df = train_test_split(temp_df, test_size=0.5, stratify=temp_df['labels'], random_state=42)

# Use TF-IDF to vectorize the text
vectorizer = TfidfVectorizer(max_features=5000)  # Limit to top 5000 words by frequency
X_train = vectorizer.fit_transform(train_df['clean_text'])
X_val = vectorizer.transform(val_df['clean_text'])
X_test = vectorizer.transform(test_df['clean_text'])

y_train = train_df['labels']
y_val = val_df['labels']
y_test = test_df['labels']

# ---- Logistic Regression Model ----
clf_lr = LogisticRegression(max_iter=1000, random_state=42)
clf_lr.fit(X_train, y_train)

# Evaluate Logistic Regression on validation set
val_predictions_lr = clf_lr.predict(X_val)
val_accuracy_lr = accuracy_score(y_val, val_predictions_lr)
print(f'Logistic Regression Validation Accuracy: {val_accuracy_lr:.4f}')

# Evaluate Logistic Regression on test set
test_predictions_lr = clf_lr.predict(X_test)
test_accuracy_lr = accuracy_score(y_test, test_predictions_lr)
print(f'Logistic Regression Test Accuracy: {test_accuracy_lr:.4f}')

# Generate classification report for Logistic Regression
report_lr = classification_report(y_test, test_predictions_lr, target_names=['Negative', 'Neutral', 'Positive'])
print("Logistic Regression Classification Report:\n", report_lr)


# ---- Support Vector Machine (SVM) Model ----
clf_svm = SVC(kernel='linear', random_state=42)
clf_svm.fit(X_train, y_train)

# Evaluate SVM on validation set
val_predictions_svm = clf_svm.predict(X_val)
val_accuracy_svm = accuracy_score(y_val, val_predictions_svm)
print(f'SVM Validation Accuracy: {val_accuracy_svm:.4f}')

# Evaluate SVM on test set
test_predictions_svm = clf_svm.predict(X_test)
test_accuracy_svm = accuracy_score(y_test, test_predictions_svm)
print(f'SVM Test Accuracy: {test_accuracy_svm:.4f}')

# Generate classification report for SVM
report_svm = classification_report(y_test, test_predictions_svm, target_names=['Negative', 'Neutral', 'Positive'])
print("SVM Classification Report:\n", report_svm)


In [None]:
import dask.dataframe as dd
import pandas as pd
import numpy as np
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.model_selection import train_test_split
from sklearn.linear_model import LogisticRegression
from sklearn.svm import SVC
from sklearn.metrics import accuracy_score, classification_report

# Convert 'stars' column to numeric, forcing errors to NaN
cleaned_df['stars'] = pd.to_numeric(cleaned_df['stars'], errors='coerce')

# Define a function to clean the text
def preprocess_text(text):
    if isinstance(text, str):
        text = text.lower()  # Lowercase
        text = re.sub(r'[^a-zA-Z0-9\s]', '', text)  # Remove special characters
        return text
    return ''  # Return empty string for NaN values

# Apply the preprocessing function
cleaned_df['clean_text'] = cleaned_df['text'].apply(preprocess_text)

# Drop any rows with NaN in the 'stars' column
cleaned_df = cleaned_df.dropna(subset=['stars'])

# Map star ratings to classes
def map_stars_to_classes(stars):
    if stars >= 4:
        return 2  # Positive
    elif stars == 3:
        return 1  # Neutral
    else:
        return 0  # Negative

# Apply mapping function to create labels
cleaned_df['labels'] = cleaned_df['stars'].apply(map_stars_to_classes)

# Drop any rows with NaN in the 'labels' column
cleaned_df = cleaned_df.dropna(subset=['labels'])

# Split data into training, validation, and test sets
train_df, temp_df = train_test_split(cleaned_df, test_size=0.3, stratify=cleaned_df['labels'], random_state=42)
val_df, test_df = train_test_split(temp_df, test_size=0.5, stratify=temp_df['labels'], random_state=42)

# Use TF-IDF to vectorize the text
vectorizer = TfidfVectorizer(max_features=5000)  # Limit to top 5000 words by frequency
X_train = vectorizer.fit_transform(train_df['clean_text'])
X_val = vectorizer.transform(val_df['clean_text'])
X_test = vectorizer.transform(test_df['clean_text'])

y_train = train_df['labels']
y_val = val_df['labels']
y_test = test_df['labels']



# ---- Support Vector Machine (SVM) Model ----
clf_svm = SVC(kernel='linear', random_state=42)
clf_svm.fit(X_train, y_train)

# Evaluate SVM on validation set
val_predictions_svm = clf_svm.predict(X_val)
val_accuracy_svm = accuracy_score(y_val, val_predictions_svm)
print(f'SVM Validation Accuracy: {val_accuracy_svm:.4f}')

# Evaluate SVM on test set
test_predictions_svm = clf_svm.predict(X_test)
test_accuracy_svm = accuracy_score(y_test, test_predictions_svm)
print(f'SVM Test Accuracy: {test_accuracy_svm:.4f}')

# Generate classification report for SVM
report_svm = classification_report(y_test, test_predictions_svm, target_names=['Negative', 'Neutral', 'Positive'])
print("SVM Classification Report:\n", report_svm)


## Confusion Matrix

In [None]:
import matplotlib.pyplot as plt
import seaborn as sns
from sklearn.metrics import confusion_matrix, ConfusionMatrixDisplay

# Function to plot confusion matrix
def plot_confusion_matrix(y_true, y_pred, model_name):
    cm = confusion_matrix(y_true, y_pred)
    disp = ConfusionMatrixDisplay(confusion_matrix=cm, display_labels=['Negative', 'Neutral', 'Positive'])

    plt.figure(figsize=(8,6))
    disp.plot(cmap=plt.cm.Blues, values_format='d')
    plt.title(f'Confusion Matrix for {model_name}')
    plt.show()

# ---- Logistic Regression Confusion Matrix ----
plot_confusion_matrix(y_test, test_predictions_lr, "Logistic Regression")

# ---- Support Vector Machine (SVM) Confusion Matrix ----
plot_confusion_matrix(y_test, test_predictions_svm, "SVM")

# Additional visualizations for accuracy comparison
models = ['Logistic Regression', 'SVM']
accuracies = [test_accuracy_lr, test_accuracy_svm]

plt.figure(figsize=(8, 6))
sns.barplot(x=models, y=accuracies, palette='Blues_d')
plt.title('Test Accuracy Comparison')
plt.ylabel('Accuracy')
plt.show()

# Plotting Precision, Recall, F1-Score for each class for both models
from sklearn.metrics import precision_recall_fscore_support

# Get precision, recall, f1-score for both models
metrics_lr = precision_recall_fscore_support(y_test, test_predictions_lr, average=None, labels=[0, 1, 2])
metrics_svm = precision_recall_fscore_support(y_test, test_predictions_svm, average=None, labels=[0, 1, 2])

# Create a DataFrame to hold the metrics for both models
df_metrics = pd.DataFrame({
    'Class': ['Negative', 'Neutral', 'Positive'],
    'Precision (LR)': metrics_lr[0],
    'Recall (LR)': metrics_lr[1],
    'F1-Score (LR)': metrics_lr[2],
    'Precision (SVM)': metrics_svm[0],
    'Recall (SVM)': metrics_svm[1],
    'F1-Score (SVM)': metrics_svm[2]
})

# Plot Precision, Recall, F1-Score Comparison
df_metrics.plot(x='Class', kind='bar', figsize=(10, 7), rot=0, title='Precision, Recall, F1-Score Comparison for LR and SVM')
plt.show()


Word Cloud

In [None]:
!pip install wordcloud matplotlib pandas

In [20]:
import pandas as pd
import re
from wordcloud import WordCloud
import matplotlib.pyplot as plt

# Function to clean text
def preprocess_text(text):
    if isinstance(text, str):
        text = text.lower()  # Lowercase
        text = re.sub(r'[^a-zA-Z0-9\s]', '', text)
        return text
    return ''
file_path = 'yelp_academic_dataset_review.csv'


text_data = ''


chunk_size = 10000
for chunk in pd.read_csv(file_path, chunksize=chunk_size):
    chunk['clean_text'] = chunk['text'].apply(preprocess_text)
    text_data += ' '.join(chunk['clean_text'].tolist())


wordcloud = WordCloud(width=800, height=400, background_color='white', max_words=100).generate(text_data)


plt.figure(figsize=(10, 5))
plt.imshow(wordcloud, interpolation='bilinear')
plt.axis('off') 
plt.title("Word Cloud of Most Frequent Words", fontsize=16)
plt.show()


Exception ignored in: <bound method IPythonKernel._clean_thread_parent_frames of <ipykernel.ipkernel.IPythonKernel object at 0x10649b440>>
Traceback (most recent call last):
  File "/Users/nushrat/Library/Python/3.12/lib/python/site-packages/ipykernel/ipkernel.py", line 775, in _clean_thread_parent_frames
    def _clean_thread_parent_frames(

KeyboardInterrupt: 


: 

: 

In [None]:
import dask.dataframe as dd
import re
from wordcloud import WordCloud
import matplotlib.pyplot as plt

# Function to clean text
def preprocess_text(text):
    if isinstance(text, str):
        text = text.lower()  # Lowercase
        text = re.sub(r'[^a-zA-Z0-9\s]', '', text)
        return text
    return ''

file_path = 'yelp_academic_dataset_review.csv'

# Load the data with Dask
dask_df = dd.read_csv(file_path)

# Apply preprocessing in parallel
dask_df['clean_text'] = dask_df['text'].apply(preprocess_text, meta=('text', 'str'))

# Compute the result and convert to a single string
text_data = dask_df['clean_text'].compute().str.cat(sep=' ')

# Generate word cloud
wordcloud = WordCloud(width=800, height=400, background_color='white', max_words=100).generate(text_data)

# Plot the word cloud
plt.figure(figsize=(10, 5))
plt.imshow(wordcloud, interpolation='bilinear')
plt.axis('off')
plt.title("Word Cloud of Most Frequent Words", fontsize=16)
plt.show()
