In [3]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col
from pyspark.sql.functions import array_contains


In [4]:
# Initialize SparkSession
spark = SparkSession.builder.appName("DateParsing").config("spark.sql.legacy.timeParserPolicy", "LEGACY").getOrCreate()

23/10/24 17:05:55 WARN Utils: Your hostname, Liangchengs-MacBook-Pro.local resolves to a loopback address: 127.0.0.1; using 10.0.121.243 instead (on interface en0)
23/10/24 17:05:55 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
23/10/24 17:05:55 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [None]:
# Load the JSON data into a DataFrame
json_file_path = "Cell_Phones_and_Accessories_5.json"
df = spark.read.json(json_file_path)


from pyspark.sql.functions import year, to_date

# Convert the "reviewTime" to a date and extract the year
df = df.withColumn("reviewYear", year(to_date(df["reviewTime"], "MM dd, yyyy")))

# Filter the dataset to retain reviews with a year >= 2017
filtered_df = df.filter(df["reviewYear"] >= 2017)

# Drop the "reviewYear" column if you no longer need it
filtered_df = filtered_df.drop("reviewYear")

In [None]:
# Load the meta data from the JSON file
meta_data = spark.read.json("meta_Cell_Phones_and_Accessories.json")

# Perform a join based on the "asin" column, inner join to remove missing review or meta data
merged_df = filtered_df.join(meta_data, "asin", "inner")
selected_columns = [
    'asin', 'reviewerID', 'vote', 'reviewText', 'overall', 'summary', 'reviewTime',
    'title', 'price', 'rank', 'brand', 'category'
]

selected_df = merged_df.select(selected_columns)

In [None]:
selected_df = selected_df.coalesce(1)
selected_df.write.json("cellphone_accessories_review_meta_1718.json")

count = selected_df.count()
print("Number of listings:", count)

In [None]:
from pyspark.sql.functions import explode, col


# Step 3: Explode the "category" list column
df = selected_df.withColumn("category", explode(col("category")))

# Step 4: Filter for relevant categories
#filter_keywords = ["Cell Phones", "Accessories"]
#filtered_df = df.filter(df.category.isin(filter_keywords))

# Step 5: Print out distinct categories
distinct_categories = df.select("category").distinct()

distinct_categories.show()

In [None]:
# Save distinct categories to a text file
distinct_categories_txt = "distinct_categories.txt"
distinct_categories.write.text(distinct_categories_txt)

In [None]:
from pyspark.sql.functions import col, array_contains

# Filter out cellphone data
cellphone_df = selected_df.filter(array_contains(col("category"), "Cell Phones"))

# Filter out accessories data
accessories_df = selected_df.filter(~array_contains(col("category"), "Cell Phones"))

In [None]:
# Save DataFrames as JSON files
cellphone_df = cellphone_df.coalesce(1)
accessories_df = accessories_df.coalesce(1)
cellphone_df.write.json("cellphone.json")
accessories_df.write.json("accessories.json")

In [None]:
cellphone_df.count()

In [None]:
accessories_df.count()

## clean data

In [5]:
cellphone_df = spark.read.json("cellphone.json")

In [None]:
cellphone_df.head()

In [None]:
cellphone_df.printSchema() 

In [6]:
# Remove duplicates from cellphone_df
cellphone_df = cellphone_df.dropDuplicates()


In [8]:
cellphone_df.count()

7600

In [7]:
# convert price from string to numeric
cellphone_df.select('price').show(2)
from pyspark.sql.functions import col, regexp_replace, expr

# Remove the "$" symbol and convert the column to double
cellphone_df = cellphone_df.withColumn("price", 
                    expr("cast(regexp_replace(price, '[$,]', '') as double)")
                )
cellphone_df.select('price').show(2)

+-------+
|  price|
+-------+
|       |
|$219.99|
+-------+
only showing top 2 rows

+------+
| price|
+------+
|  null|
|219.99|
+------+
only showing top 2 rows



In [9]:
from pyspark.sql.functions import col

from pyspark.sql.functions import isnan, isnull

# Count null or NaN values in numeric columns
numeric_null_counts = [
    (col_name, cellphone_df.where(isnull(col_name) | isnan(col_name)).count())
    for col_name in cellphone_df.columns
    if col_name in ["overall","price"]  # Replace with your numeric column names
]

# Count null or NaN values in categorical and string columns
string_null_counts = [
    (col_name, cellphone_df.where(isnull(col_name)).count())
    for col_name in cellphone_df.columns
    if col_name in ['asin', 'reviewerID', 'vote', 'reviewText', 'summary', 'reviewTime',
    'title', 'rank', 'brand', 'category']  # Replace with your categorical and string column names
]

In [None]:
# Print null counts
for col_name, count in numeric_null_counts + string_null_counts:
    print(f"Null or NaN count in column '{col_name}': {count}")

In [None]:
# remove rows with missing reviewText and summary
# too many missing values in price and vote, not wise to remove missing rows

In [32]:
# Filter out rows with null or NaN values in specific columns
cellphone_df1 = cellphone_df.na.drop(subset=["reviewText", "summary"])


In [35]:
import string
import re
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType, ArrayType
from pyspark.ml.feature import Tokenizer, StopWordsRemover
import nltk
from nltk.corpus import stopwords
from nltk.stem import PorterStemmer
nltk.download("stopwords")

# Initialize NLTK Porter Stemmer
stemmer = PorterStemmer()

# Define a function to perform stemming and special character/number removal
def clean(text):
    # Apply stemming and remove special characters and numbers
    cleaned_words = [stemmer.stem(re.sub(r'[^a-zA-Z]', '', word)) for word in text if word != " "]
    # Remove empty strings
    cleaned_words = [word for word in cleaned_words if word]
    
    return cleaned_words

# Create a user-defined function (UDF) to apply preprocessing
clean_udf = udf(clean, ArrayType(StringType()))

# Tokenize and remove stopwords
tokenizer = Tokenizer(inputCol="reviewText", outputCol="words")
stop_words = list(set(stopwords.words('english')))
stopword_remover = StopWordsRemover(inputCol="words", outputCol="filtered_words", stopWords=stop_words)

def preprocess(df):
    df_token = tokenizer.transform(df)
    df_stopword = stopword_remover.transform(df_token)
    df_cleaned = df_stopword.withColumn("cleaned_review", clean_udf(df_stopword["filtered_words"]))
    return df_cleaned



cellphone_df_cleaned = preprocess(cellphone_df1)



[nltk_data] Downloading package stopwords to
[nltk_data]     /Users/liangchengzhang/nltk_data...
[nltk_data]   Package stopwords is already up-to-date!


In [16]:
cellphone_df_cleaned.select('filtered_words').show(3)

+--------------------+
|      filtered_words|
+--------------------+
|[great, business/...|
|[everything, fine...|
|[back, cover, slo...|
+--------------------+
only showing top 3 rows



## same processing for accessories

In [37]:
accessories_df = spark.read.json("accessories.json")

In [38]:
# Remove duplicates 
accessories_df = accessories_df.dropDuplicates()
# convert price from string to numeric

# Remove the "$" symbol and convert the column to double
accessories_df = accessories_df.withColumn("price", 
                    expr("cast(regexp_replace(price, '[$,]', '') as double)")
                )
accessories_df.select('price').show(2)


# Count null or NaN values in numeric columns
numeric_null_counts = [
    (col_name, accessories_df.where(isnull(col_name) | isnan(col_name)).count())
    for col_name in accessories_df.columns
    if col_name in ["overall","price"]  # Replace with your numeric column names
]

# Count null or NaN values in categorical and string columns
string_null_counts = [
    (col_name, accessories_df.where(isnull(col_name)).count())
    for col_name in accessories_df.columns
    if col_name in ['asin', 'reviewerID', 'vote', 'reviewText', 'summary', 'reviewTime',
    'title', 'rank', 'brand', 'category']  # Replace with your categorical and string column names
]

for col_name, count in numeric_null_counts + string_null_counts:
    print(f"Null or NaN count in column '{col_name}': {count}")
    
    # Filter out rows with null or NaN values in specific columns
accessories_df1 = accessories_df.na.drop(subset=["reviewText", "summary"])
accessories_df_cleaned = preprocess(accessories_df1)

+-----+
|price|
+-----+
| null|
| null|
+-----+
only showing top 2 rows

Null or NaN count in column 'overall': 0
Null or NaN count in column 'price': 42523
Null or NaN count in column 'asin': 0
Null or NaN count in column 'brand': 0
Null or NaN count in column 'category': 0
Null or NaN count in column 'rank': 0
Null or NaN count in column 'reviewText': 187
Null or NaN count in column 'reviewTime': 0
Null or NaN count in column 'reviewerID': 0
Null or NaN count in column 'summary': 110
Null or NaN count in column 'title': 0
Null or NaN count in column 'vote': 160275


In [39]:
accessories_df_cleaned.show(1)

[Stage 361:>                                                        (0 + 1) / 1]

+----------+---------+--------------------+-------+-----+--------------------+--------------------+-----------+--------------+--------------------+--------------------+----+--------------------+--------------------+--------------------+
|      asin|    brand|            category|overall|price|                rank|          reviewText| reviewTime|    reviewerID|             summary|               title|vote|               words|      filtered_words|      cleaned_review|
+----------+---------+--------------------+-------+-----+--------------------+--------------------+-----------+--------------+--------------------+--------------------+----+--------------------+--------------------+--------------------+
|9707716371|PowerBear|[Cell Phones & Ac...|    2.0| null|[">#14,390 in Cel...|I bought this in ...|04 12, 2018|A1DX757BJI4V4K|Battery doesn't l...|PowerBear Samsung...|null|[i, bought, this,...|[bought, may, 201...|[bought, may, fir...|
+----------+---------+--------------------+-------+-

                                                                                

In [36]:
cellphone_df_cleaned = cellphone_df_cleaned.coalesce(1)
cellphone_df_cleaned.write.json("cellphone_clean.json")

                                                                                

In [40]:
accessories_df_cleaned = accessories_df_cleaned.coalesce(1)
accessories_df_cleaned.write.json("accessories_clean.json")

                                                                                

In [None]:
# biased review

In [44]:
from textblob import TextBlob

# Define a function to calculate sentiment polarity
def get_sentiment(text):
    analysis = TextBlob(text)
    if analysis.sentiment.polarity > 0.2:  # Adjust the threshold as needed
        return "positive"
    elif analysis.sentiment.polarity < -0.2:  # Adjust the threshold as needed
        return "negative"
    else:
        return "neutral"

# Create a user-defined function (UDF) to apply sentiment analysis
sentiment_udf = udf(get_sentiment, StringType())

# Add a new column to the DataFrame with sentiment labels
cellphone_df_cleaned = cellphone_df_cleaned.withColumn("sentiment", sentiment_udf(cellphone_df_cleaned["reviewText"]))



In [46]:
# Filter and show reviews with extreme sentiments
extreme_sentiments = cellphone_df_cleaned.filter(
    (cellphone_df_cleaned["sentiment"] == "positive") | (cellphone_df_cleaned["sentiment"] == "negative")
)

extreme_sentiments.select('reviewText').show(truncate=False)




+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

                                                                                

In [47]:
spark.stop()

ConnectionRefusedError: [Errno 61] Connection refused