In [0]:
%pip install torch==2.1.0

In [0]:
pip install s3fs

In [0]:
import os
import pandas as pd
import seaborn as sns
import matplotlib as mpl
import sklearn
import numpy
import scipy
import plotly
import bs4 as bs
import urllib.request
import boto3

In [0]:
import io
import matplotlib.pyplot as plt

In [0]:
from pyspark.sql.functions import *
from pyspark.ml.feature import StringIndexer, OneHotEncoder, VectorAssembler
from pyspark.ml import Pipeline
from pyspark.ml.classification import LogisticRegression, LogisticRegressionModel
from pyspark.ml.evaluation import *
from pyspark.ml.tuning import *
import numpy as np

In [0]:
from pyspark.ml.feature import Binarizer
from pyspark.ml.feature import FeatureHasher
from pyspark.ml.feature import Tokenizer, RegexTokenizer
from pyspark.ml.feature import HashingTF
from pyspark.ml.feature import IDF

In [0]:
file_path = 's3a://my-bigdata-project-sa/landing/amazon_reviews_us_Apparel_v1_00.tsv'
sdf = spark.read.csv(file_path, sep='\t', header=True, inferSchema=True)

In [0]:
pdf = sdf.toPandas()

In [0]:
sdf = sdf.drop('clean_review_tf', 'clean_review_features')
pdf = sdf.toPandas()

In [0]:
pdf.head()

In [0]:
df = pd.read_csv(file_path, sep='\t', header=None, on_bad_lines='skip')

In [0]:
sdf.select(sdf.customer_id, sdf.review_body).show()

In [0]:
sdf.filter(sdf.star_rating > 3 ).show()

In [0]:
sdf = sdf.sample(False, 0.25)

In [0]:
sdf_drop = sdf.na.drop(subset=["star_rating", "review_body"])

In [0]:
# Count the number of nulls in each specified column of the cleaned DataFrame
sdf_drop.select([count(when(isnull(col(c)), c)).alias(c) for c in ["star_rating", "review_body"]]).show()


In [0]:
def ascii_only(mystring):
    if mystring:
        return mystring.encode('ascii', 'ignore').decode('ascii')
    else:
        return ""

ascii_udf = udf(ascii_only)
sdf = sdf.withColumn("clean_review_headline", ascii_udf('review_headline'))
sdf = sdf.withColumn("clean_review_body", ascii_udf('review_body'))
sdf.select("clean_review_headline", "clean_review_body").summary("count", "min", "max").show()

In [0]:
sdf.select("clean_review_headline", "clean_review_body").show(truncate=False)

In [0]:
sdf_clean = sdf.na.drop(subset=["clean_review_headline", "clean_review_body"])

In [0]:
sdf.select([count(when(isnull(c), c)).alias(c) for c in ["clean_review_headline", "clean_review_body"]]).show()


In [0]:
sdf.select("clean_review_headline", "clean_review_body").summary("count", "min", "max").show()

In [0]:
# Create a count of the review words
sdf = sdf.withColumn('review_body_wordcount', size(split(col('review_body'), ' ')))
sdf.select(sdf.review_body, sdf.review_body_wordcount).show()

In [0]:
# Filter out short review body texts
from pyspark.sql.functions import length

sdf = sdf.where(length(sdf.clean_review_body) > 10)
sdf.show(truncate=False)

In [0]:
row_count = sdf.count()
print(f"Number of rows with 'clean_review_body' length > 10: {row_count}")

In [0]:
# Filter out review body texts with 5 or fewer words
sdf = sdf.where(sdf.review_body_wordcount > 5)

In [0]:
# Set up the tokenizer TF and IDF
tokenizer = RegexTokenizer(inputCol="clean_review_body", outputCol="clean_review_words", pattern="\\w+", gaps=False)
sdf = tokenizer.transform(sdf)

In [0]:
# Run the hash function over the tokens
hashtf = HashingTF(numFeatures=2**16, inputCol="clean_review_words", outputCol='clean_review_tf')
sdf = hashtf.transform(sdf)

In [0]:
# Create the inverse document frequency vectors 
idf = IDF(inputCol='clean_review_tf', outputCol="clean_review_features", minDocFreq=5)
sdf = idf.fit(sdf).transform(sdf)

In [0]:
from pyspark.ml.feature import OneHotEncoder

In [0]:
sdf = sdf.drop('product_category_index')
sdf = sdf.drop('product_title_index') 
sdf = sdf.drop('product_id_index')  

In [0]:
import pyspark
print(pyspark.__version__)


In [0]:
from pyspark.ml.feature import StringIndexer, OneHotEncoder

# For 'product_category'
category_indexer = StringIndexer(inputCol="product_category", outputCol="product_category_index")
sdf = category_indexer.fit(sdf).transform(sdf)

# For 'product_title'
title_indexer = StringIndexer(inputCol="product_title", outputCol="product_title_index")
sdf = title_indexer.fit(sdf).transform(sdf)

# For 'product_id'
id_indexer = StringIndexer(inputCol="product_id", outputCol="product_id_index")
sdf = id_indexer.fit(sdf).transform(sdf)


In [0]:
category_encoder = OneHotEncoder(inputCols=["product_category_index"], outputCols=["product_category_vec"])
title_encoder = OneHotEncoder(inputCols=["product_title_index"], outputCols=["product_title_vec"])
id_encoder = OneHotEncoder(inputCols=["product_id_index"], outputCols=["product_id_vec"])

sdf = category_encoder.transform(sdf)
sdf = title_encoder.transform(sdf)
sdf = id_encoder.transform(sdf)


In [0]:
from pyspark.ml.feature import VectorAssembler

# Create a VectorAssembler to combine all feature columns into one vector column
assembler = VectorAssembler(inputCols=["product_category_vec", 
                                       "product_title_vec", 
                                       "product_id_vec",
                                       "clean_review_tf",  # TF vector from previous steps
                                       "clean_review_features"  # TF-IDF vector from previous steps
                                      ],
                            outputCol="features")

# Transform the DataFrame
sdf = assembler.transform(sdf)


In [0]:
sdf.summary().show()

In [0]:
sdf.printSchema()

In [0]:
sdf = sdf.drop("product_parent", "vine", "verified_purchase")

In [0]:
sdf.printSchema()

In [0]:
import pandas as pd
import numpy as np
from collections import Counter

In [0]:
df.isnull().sum().sum()

In [0]:
data = pd.DataFrame(df)

data = df.iloc[1:, :]


In [0]:
def len_of_str(x):
    if type(x) != float:
        leng = len(x)
    else:
        leng = 0
    return leng

plt.hist(data["clean_review_body"].apply(len_of_str), range=[0, 1000], facecolor='gray', align='mid')

In [0]:
import re
import string

def clean_text(text):
    '''Make text lowercase, remove text in square brackets, remove punctuation and remove words containing numbers.'''
    text = text.lower()
    text = re.sub('\[.*?\]', '', text)
    text = re.sub('[%s]' % re.escape(string.punctuation), '', text)
    text = re.sub('\w*\d\w*', '', text)
    text = re.sub('[‘’“”…]', '', text)
    text = re.sub('\n', '', text)
    return text

In [0]:
df['product_title'] = df['product_title'].apply(clean_text)

In [0]:
sdf.columns

In [0]:
df = sdf.select('clean_review_body").toPandas()
df = sdf.select('clean_review_body").sample(False, 0.25).toPandas()

In [0]:
from pyspark.sql.functions import col, isnan, when, count, udf, to_date, year, month, date_format, size, split
from pyspark.ml.stat import Correlation
from pyspark.ml.feature import VectorAssembler

In [0]:
sdf.show()

In [0]:
pdf = sdf.na.drop(subset=["star_rating"]).select('star_rating').toPandas()

In [0]:
print(pdf['star_rating'])

In [0]:
print(pdf['star_rating'].isnull().sum())


In [0]:
print(pdf['star_rating'].apply(type).value_counts())


In [0]:
mask = pdf['star_rating'].str.match('^\d+$') == False
print(pdf[mask])


In [0]:
pdf.loc[mask, 'star_rating'] = 0

In [0]:
pdf['star_rating'] = pdf['star_rating'].fillna(0).astype(int)

In [0]:
pdf['star_rating'] = pdf['star_rating'].astype(float).astype(int)

In [0]:
print(pdf['star_rating'].apply(type).value_counts())

In [0]:
print(pdf['star_rating'].isnull().sum())

In [0]:
import matplotlib.pyplot as plt

rating_counts = pdf['star_rating'].value_counts().sort_index()

# bar chart
plt.figure(figsize=(10, 6))
rating_counts.plot(kind='bar', edgecolor='black')
plt.title('Frequency of Star Ratings')
plt.xlabel('Star Rating')
plt.ylabel('Frequency')
plt.xticks(rotation=0) 
plt.show()




In [0]:
import seaborn as sns
import matplotlib.pyplot as plt

# Compute the correlation matrix
corr = pdf[['star_rating', 'helpful_votes', 'total_votes']].corr()

# Set up the matplotlib figure
plt.figure(figsize=(8, 6))

# Draw the heatmap with the mask and correct aspect ratio
sns.heatmap(corr, annot=True, fmt=".2f", cmap='coolwarm', square=True, linewidths=.5, cbar_kws={"shrink": .5})

plt.title('Correlation Heatmap')
plt.show()


In [0]:
from collections import Counter
import seaborn as sns

# Aggregate all review text into one large string
all_words = ' '.join(pdf['clean_review_body']).split()

# Count the most common words
word_freq = Counter(all_words).most_common(20)

# Create a DataFrame with the word frequencies
words_df = pd.DataFrame(word_freq, columns=['word', 'frequency'])

# Plot the frequencies
plt.figure(figsize=(12, 6))
sns.barplot(x='word', y='frequency', data=words_df)
plt.xticks(rotation=45)
plt.title('Top 20 Words in Reviews')
plt.show()


In [0]:
from collections import Counter
import matplotlib.pyplot as plt
import seaborn as sns
import pandas as pd

words_of_interest = ["excellent", "good", "ok", "bad"]

word_count = Counter()

for review in pdf['clean_review_body']:
    words = review.split()
    word_count.update(word for word in words if word in words_of_interest)

frequencies = {word: word_count[word] for word in words_of_interest}

words_df = pd.DataFrame(list(frequencies.items()), columns=['word', 'frequency'])

words_df = words_df.sort_values(by='frequency', ascending=False)

# Plot the frequencies
plt.figure(figsize=(10, 6))
sns.barplot(x='word', y='frequency', data=words_df, palette='viridis')
plt.title('Frequency of Specific Words in Reviews')
plt.xlabel('Word')
plt.ylabel('Frequency')
plt.show()


In [0]:
# Convert review_date to datetime
pdf['review_date'] = pd.to_datetime(pdf['review_date'])

# Plot average star rating over time
pdf.groupby('review_date')['star_rating'].mean().plot(title='Average Star Rating Over Time')
plt.xlabel('Date')
plt.ylabel('Average Star Rating')
plt.show()


In [0]:
# Plot a histogram of review word counts
pdf['review_body_wordcount'].astype(int).plot(kind='hist', bins=20, edgecolor='black', title='Distribution of Review Word Counts')
plt.xlabel('Word Count')
plt.ylabel('Number of Reviews')
plt.show()


In [0]:
import matplotlib.pyplot as plt
import seaborn as sns
import pandas as pd

# Convert 'review_date' to datetime if it isn't already
pdf['review_date'] = pd.to_datetime(pdf['review_date'])

# Count the number of transactions for each date
transaction_counts = pdf['review_date'].value_counts().sort_index()

# Create a time series plot
plt.figure(figsize=(14, 7))
transaction_counts.plot(kind='line', color='dodgerblue', linewidth=2)

plt.title('Transaction Count by Date')
plt.xlabel('Date')
plt.ylabel('Number of Transactions')
plt.grid(True)
plt.show()


In [0]:
import matplotlib.pyplot as plt
import pandas as pd


pdf['review_date'] = pd.to_datetime(pdf['review_date'])

transaction_counts_by_date = pdf.groupby('review_date').size()

# Create a time series plot
plt.figure(figsize=(14, 7))
transaction_counts_by_date.plot()

plt.title('Transaction Count by Date')
plt.xlabel('Date')
plt.ylabel('Number of Products')
plt.grid(True)
plt.show()


In [0]:
import seaborn as sns
import matplotlib.pyplot as plt

df = sdf.select('helpful_votes', 'total_votes').toPandas()

# Set the titles and labels
sns.set_style("white")
plt.subplots_adjust(top=0.9)
plt.suptitle('Relationship between Helpful Votes and Total Votes')
sns.lmplot(x='helpful_votes', y='total_votes', data=df)

# Show the plot
plt.show()


In [0]:
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.stat import Correlation
from pyspark.sql.functions import col

import seaborn as sns
import matplotlib.pyplot as plt
import pandas as pd


In [0]:
from pyspark.sql.functions import col
from pyspark.sql.types import FloatType
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.stat import Correlation
import seaborn as sns
import matplotlib.pyplot as plt
import pandas as pd

numeric_columns = ['star_rating', 'helpful_votes', 'total_votes'] 

for column in numeric_columns:
    sdf = sdf.withColumn(column, col(column).cast(FloatType())).na.fill(0)

assembler = VectorAssembler(inputCols=numeric_columns, outputCol="features")
sdf_vector = assembler.transform(sdf)

correlation_matrix = Correlation.corr(sdf_vector, "features").head()[0].toArray()

correlation_matrix_df = pd.DataFrame(correlation_matrix, columns=numeric_columns, index=numeric_columns)

sns.set_style("white")
plt.figure(figsize=(16, 5))
sns.heatmap(correlation_matrix_df, annot=True, cmap="Greens")
plt.show()


In [0]:
print(pdf.columns)

In [0]:
print(pdf.info())


In [0]:
y = pdf['star_rating']

In [0]:
feature_columns = ['product_id', 'product_category', 'product_parent', 'product_title']  # Add more relevant columns here
X = pdf[feature_columns]

In [0]:
output_file_path="s3://my-bigdata-project-sa/raw/cleaned_amazon_reviews_us_Beauty_v1_00.parquet"
sdf.write.parquet(output_file_path)

In [0]:
df = spark.read.parquet('s3://my-bigdata-project-sa/raw/cleaned_amazon_reviews_us_Beauty_v1_00.parquet')
print(df.columns)