### Question: "Can we identify communities of users whose tweets have similar sentiment polarities, and analyze the connections among them to identify key influencers within each community?"

In [25]:
from pyspark.sql import SparkSession, Row
from pyspark.sql.types import StructType, StructField, StringType, IntegerType
from pyspark.sql.functions import regexp_replace, avg

from pyspark.ml.feature import Tokenizer, StopWordsRemover, HashingTF, IDF, StringIndexer
from pyspark.ml.classification import LogisticRegression, RandomForestClassifier
from pyspark.ml import Pipeline
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder

import pandas as pd
import random

In [2]:
spark = SparkSession.builder.appName("Tweets Sentiment Classification").getOrCreate()

### 1. Load the dataset

In [3]:
schema = StructType([
    StructField("target", IntegerType(), True),
    StructField("ids", IntegerType(), True),
    StructField("date", StringType(), True),
    StructField("flag", StringType(), True),
    StructField("user", StringType(), True),
    StructField("text", StringType(), True)
])

In [4]:
data = spark.read.csv(
    "datasets/tweets_data.csv",
    header=True,
    schema=schema,
    encoding="ISO-8859-1"
)

In [5]:
data.take(4)

[Row(target=0, ids=1467810672, date='Mon Apr 06 22:19:49 PDT 2009', flag='NO_QUERY', user='scotthamilton', text="is upset that he can't update his Facebook by texting it... and might cry as a result  School today also. Blah!"),
 Row(target=0, ids=1467810917, date='Mon Apr 06 22:19:53 PDT 2009', flag='NO_QUERY', user='mattycus', text='@Kenichan I dived many times for the ball. Managed to save 50%  The rest go out of bounds'),
 Row(target=0, ids=1467811184, date='Mon Apr 06 22:19:57 PDT 2009', flag='NO_QUERY', user='ElleCTF', text='my whole body feels itchy and like its on fire '),
 Row(target=0, ids=1467811193, date='Mon Apr 06 22:19:57 PDT 2009', flag='NO_QUERY', user='Karoli', text="@nationwideclass no, it's not behaving at all. i'm mad. why am i here? because I can't see you all over there. ")]

In [6]:
data.printSchema()

root
 |-- target: integer (nullable = true)
 |-- ids: integer (nullable = true)
 |-- date: string (nullable = true)
 |-- flag: string (nullable = true)
 |-- user: string (nullable = true)
 |-- text: string (nullable = true)



In [7]:
data.select('target', 'text').show(n=3, truncate=False)

+------+---------------------------------------------------------------------------------------------------------------+
|target|text                                                                                                           |
+------+---------------------------------------------------------------------------------------------------------------+
|0     |is upset that he can't update his Facebook by texting it... and might cry as a result  School today also. Blah!|
|0     |@Kenichan I dived many times for the ball. Managed to save 50%  The rest go out of bounds                      |
|0     |my whole body feels itchy and like its on fire                                                                 |
+------+---------------------------------------------------------------------------------------------------------------+
only showing top 3 rows



### 2. Preprocess the data

In [8]:
tweets_data = data
tweets_data.show(n=10, truncate=True)

+------+----------+--------------------+--------+---------------+--------------------+
|target|       ids|                date|    flag|           user|                text|
+------+----------+--------------------+--------+---------------+--------------------+
|     0|1467810672|Mon Apr 06 22:19:...|NO_QUERY|  scotthamilton|is upset that he ...|
|     0|1467810917|Mon Apr 06 22:19:...|NO_QUERY|       mattycus|@Kenichan I dived...|
|     0|1467811184|Mon Apr 06 22:19:...|NO_QUERY|        ElleCTF|my whole body fee...|
|     0|1467811193|Mon Apr 06 22:19:...|NO_QUERY|         Karoli|@nationwideclass ...|
|     0|1467811372|Mon Apr 06 22:20:...|NO_QUERY|       joy_wolf|@Kwesidei not the...|
|     0|1467811592|Mon Apr 06 22:20:...|NO_QUERY|        mybirch|         Need a hug |
|     0|1467811594|Mon Apr 06 22:20:...|NO_QUERY|           coZZ|@LOLTrish hey  lo...|
|     0|1467811795|Mon Apr 06 22:20:...|NO_QUERY|2Hood4Hollywood|@Tatiana_K nope t...|
|     0|1467812025|Mon Apr 06 22:20:...|NO_

In [9]:
tweets_data = tweets_data.dropna()

In [10]:
# Define the regex pattern for text cleaning
TEXT_CLEANING_RE = "@\S+|https?:\S+|http?:\S|[^A-Za-z0-9]+"

# Use regexp_replace to clean the text column
tweets_data = tweets_data.withColumn("cleaned_text", regexp_replace(tweets_data["text"], TEXT_CLEANING_RE, ' '))

# Show the DataFrame
tweets_data.show(n=10, truncate=True)

+------+----------+--------------------+--------+---------------+--------------------+--------------------+
|target|       ids|                date|    flag|           user|                text|        cleaned_text|
+------+----------+--------------------+--------+---------------+--------------------+--------------------+
|     0|1467810672|Mon Apr 06 22:19:...|NO_QUERY|  scotthamilton|is upset that he ...|is upset that he ...|
|     0|1467810917|Mon Apr 06 22:19:...|NO_QUERY|       mattycus|@Kenichan I dived...|  I dived many ti...|
|     0|1467811184|Mon Apr 06 22:19:...|NO_QUERY|        ElleCTF|my whole body fee...|my whole body fee...|
|     0|1467811193|Mon Apr 06 22:19:...|NO_QUERY|         Karoli|@nationwideclass ...|  no it s not beh...|
|     0|1467811372|Mon Apr 06 22:20:...|NO_QUERY|       joy_wolf|@Kwesidei not the...|  not the whole c...|
|     0|1467811592|Mon Apr 06 22:20:...|NO_QUERY|        mybirch|         Need a hug |         Need a hug |
|     0|1467811594|Mon Apr 0

In [11]:
target_indexer = StringIndexer(inputCol="target", outputCol="label")

indexer = target_indexer.fit(tweets_data)

tweets_data = indexer.transform(tweets_data)

In [12]:
# Tokenization
tokenizer = Tokenizer(inputCol="cleaned_text", outputCol="tokens")

# Removing Stop Words
remover = StopWordsRemover(inputCol="tokens", outputCol="words")

# Term Frequency
hashingTF = HashingTF(inputCol="words", outputCol="rawFeatures", numFeatures=2**16)

# Inverse Document Frequency
idf = IDF(inputCol="rawFeatures", outputCol="features", minDocFreq=5)

### 3. Build the Model

In [13]:
# Find distribution of target categories
category_counts = tweets_data.groupBy("label").count().collect()

# Calculate fractions for stratified sampling
total = tweets_data.count()
fractions = {}
for category, count in category_counts:
    fractions[category] = count / total

# Split the data into train (60%), test (20%), and validation (20%) sets with a stratified approach
train_data = tweets_data.sampleBy("label", fractions, seed=42).limit(int(total * 0.6))
remaining_data = tweets_data.subtract(train_data)
test_data = remaining_data.sampleBy("label", fractions, seed=42).limit(int(total * 0.2))
validation_data = remaining_data.subtract(test_data)

### Logistic Regressor

In [14]:
# Model
lr = LogisticRegression(maxIter=100, regParam=0.001, labelCol="label")

# Build the pipeline
pipeline = Pipeline(stages=[tokenizer, remover, hashingTF, idf, lr])

# Fit the model
lr_model = pipeline.fit(train_data)

In [15]:
# Predict the sentiments
predictions = lr_model.transform(validation_data)

In [16]:
predictions.select('label', 'prediction').show()

+-----+----------+
|label|prediction|
+-----+----------+
|  1.0|       1.0|
|  1.0|       1.0|
|  1.0|       1.0|
|  1.0|       1.0|
|  1.0|       0.0|
|  1.0|       0.0|
|  1.0|       0.0|
|  1.0|       0.0|
|  1.0|       0.0|
|  1.0|       1.0|
|  1.0|       0.0|
|  1.0|       0.0|
|  1.0|       1.0|
|  1.0|       0.0|
|  1.0|       1.0|
|  1.0|       1.0|
|  1.0|       1.0|
|  1.0|       0.0|
|  1.0|       1.0|
|  1.0|       1.0|
+-----+----------+
only showing top 20 rows



In [17]:
evaluator = BinaryClassificationEvaluator(labelCol="label", 
                                          rawPredictionCol="rawPrediction", 
                                          metricName="areaUnderROC")

auc_lr = evaluator.evaluate(predictions)
print("Area Under ROC: ", auc_lr)

Area Under ROC:  0.8266830451615446


In [18]:
accuracy = predictions.filter(predictions.label == predictions.prediction).count() / float(validation_data.count())
print("Accuracy: ", accuracy)

Accuracy:  0.7199934521621764


### 4. Building the Graph

In [None]:
# Extract distinct users directly from DataFrame
users = [row['user'] for row in tweets_data.select("user").distinct().collect()]

# Generate synthetic edges
edges_data = []
for _ in range(len(users) * 2):  # Assuming we want to create twice as many edges as there are users
    src = random.choice(users)
    dst = random.choice(users)
    while src == dst:  # Avoid self-mentions for this example
        dst = random.choice(users)
    edges_data.append((src, dst))

In [27]:
# Create a pandas dataframe
pandas_df = pd.DataFrame(edges_data, columns=['src', 'dst'])

# Convert pandas dataframe to Spark dataframe
edges = spark.createDataFrame(pandas_df)

# Show the dataframe
edges.show()

Py4JError: An error occurred while calling o455.legacyInferArrayTypeFromFirstElement. Trace:
py4j.Py4JException: Method legacyInferArrayTypeFromFirstElement([]) does not exist
	at py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:318)
	at py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:326)
	at py4j.Gateway.invoke(Gateway.java:274)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
	at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
	at java.base/java.lang.Thread.run(Thread.java:834)

