## Data Loading

In [1]:
from pyspark.sql.types import *
from pyspark.ml.feature import Tokenizer, StopWordsRemover, CountVectorizer, IDF
from pyspark.ml.feature import StringIndexer
from pyspark.ml.classification import LogisticRegression
from pyspark.ml import Pipeline
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml.classification import RandomForestClassifier, RandomForestClassificationModel
from pyspark.ml.tuning import ParamGridBuilder
from pyspark.ml.tuning import CrossValidator
import time

In [2]:
twitter_path = './data/sentiment_tweets3.csv'
reddit_path = './data/depression_dataset_reddit_cleaned.csv'
mental_path = './data/mental_health.csv'

# schema = StructType([StructField('text', StringType(), True),
#                     StructField('label', IntegerType(), True)])

df_twitter = spark.read.csv(twitter_path, header=True, inferSchema=True)
df_reddit = spark.read.csv(reddit_path, header=True, inferSchema=True)
df_mental = spark.read.csv(mental_path, header=True, inferSchema=True)

In [3]:
df_twitter.printSchema()
df_reddit.printSchema()
df_mental.printSchema()

root
 |-- Index: integer (nullable = true)
 |-- message to examine: string (nullable = true)
 |-- label (depression result): string (nullable = true)

root
 |-- clean_text: string (nullable = true)
 |-- is_depression: integer (nullable = true)

root
 |-- text: string (nullable = true)
 |-- label: integer (nullable = true)



In [4]:
df_twitter = df_twitter.withColumnRenamed("message to examine", "text")
df_twitter = df_twitter.withColumnRenamed("label (depression result)", "label")
df_twitter = df_twitter.withColumn("label", df_twitter["label"].cast(IntegerType()))
df_twitter = df_twitter.drop('Index')

df_reddit = df_reddit.withColumnRenamed("clean_text", "text")
df_reddit = df_reddit.withColumnRenamed("is_depression", "label")

In [5]:
df_twitter.show()
df_reddit.show()
df_mental.show()

+--------------------+-----+
|                text|label|
+--------------------+-----+
|just had a real g...|    0|
|is reading manga ...|    0|
|@comeagainjen htt...|    0|
|@lapcat Need to s...|    0|
|ADD ME ON MYSPACE...|    0|
|so sleepy. good t...|    0|
|@SilkCharm re: #n...|    0|
|23 or 24ï¿½C poss...|    0|
|nite twitterville...|    0|
|@daNanner Night, ...|    0|
|Good morning ever...|    0|
|Finally! I just c...|    0|
|kisha they cnt ge...|    0|
|@nicolerichie Yes...|    0|
|I really love ref...|    0|
|@blueaero ooo it'...|    0|
|@rokchic28 no pro...|    0|
|@shipovalov &quot...|    0|
|Once again stayed...|    0|
|@Kal_Penn I just ...|    0|
+--------------------+-----+
only showing top 20 rows

+--------------------+-----+
|                text|label|
+--------------------+-----+
|we understand tha...|    1|
|welcome to r depr...|    1|
|anyone else inste...|    1|
|i ve kind of stuf...|    1|
|sleep is my great...|    1|
|i m year old turn...|    1|
|i live alone and

## Data Preprocessing

In [6]:
print(df_twitter.toPandas()['text'].isnull().sum())
print(df_reddit.toPandas()['text'].isnull().sum())
print(df_mental.toPandas()['text'].isnull().sum())

print(df_twitter.toPandas()['label'].isnull().sum())
print(df_reddit.toPandas()['label'].isnull().sum())
print(df_mental.toPandas()['label'].isnull().sum())

0
0
0
9
0
0


In [7]:
df_twitter = df_twitter.dropna()

In [8]:
print(df_twitter.toPandas()['text'].isnull().sum())
print(df_reddit.toPandas()['text'].isnull().sum())
print(df_mental.toPandas()['text'].isnull().sum())

print(df_twitter.toPandas()['label'].isnull().sum())
print(df_reddit.toPandas()['label'].isnull().sum())
print(df_mental.toPandas()['label'].isnull().sum())

0
0
0
0
0
0


In [9]:
df = df_twitter.union(df_reddit).union(df_mental)

df.groupby('label').count().show()
print(df.count())

+-----+-----+
|label|count|
+-----+-----+
|    1|19974|
|    0|26039|
+-----+-----+

46013


In [10]:
trainDF, testDF = df.randomSplit([.8, .2], seed=42)
print(f"""There are {trainDF.count()} rows in the training set, and {testDF.count()} in the test set""") 

There are 37021 rows in the training set, and 8992 in the test set


## Feature Extraction

In [11]:
tokenizer = Tokenizer(inputCol='text', outputCol='tokens')
stopwords_remover = StopWordsRemover(inputCol='tokens', outputCol='filtered_tokens')
vectorizer = CountVectorizer(inputCol='filtered_tokens', outputCol='vectors')
idf = IDF(inputCol='vectors', outputCol='features')

23/04/13 14:58:09 WARN StopWordsRemover: Default locale set was [en_HK]; however, it was not found in available locales in JVM, falling back to en_US locale. Set param `locale` in order to respect another locale.


## Logistic Regression

In [12]:
lr = LogisticRegression(featuresCol='features', labelCol='label')

pipeline = Pipeline(stages=[tokenizer, stopwords_remover, vectorizer, idf, lr])

pipelineModel = pipeline.fit(trainDF)

                                                                                

23/04/13 14:58:11 WARN DAGScheduler: Broadcasting large task binary with size 1123.2 KiB




23/04/13 14:58:13 WARN DAGScheduler: Broadcasting large task binary with size 1124.2 KiB


                                                                                

23/04/13 14:58:13 WARN DAGScheduler: Broadcasting large task binary with size 2.4 MiB




23/04/13 14:58:14 WARN DAGScheduler: Broadcasting large task binary with size 2.4 MiB


                                                                                

23/04/13 14:58:14 WARN DAGScheduler: Broadcasting large task binary with size 2.4 MiB




23/04/13 14:58:15 WARN DAGScheduler: Broadcasting large task binary with size 2.4 MiB
23/04/13 14:58:15 WARN BLAS: Failed to load implementation from: com.github.fommil.netlib.NativeSystemBLAS
23/04/13 14:58:15 WARN BLAS: Failed to load implementation from: com.github.fommil.netlib.NativeRefBLAS
23/04/13 14:58:15 WARN DAGScheduler: Broadcasting large task binary with size 2.4 MiB


                                                                                

23/04/13 14:58:15 WARN DAGScheduler: Broadcasting large task binary with size 2.4 MiB
23/04/13 14:58:15 WARN DAGScheduler: Broadcasting large task binary with size 2.4 MiB
23/04/13 14:58:16 WARN DAGScheduler: Broadcasting large task binary with size 2.4 MiB
23/04/13 14:58:16 WARN DAGScheduler: Broadcasting large task binary with size 2.4 MiB
23/04/13 14:58:16 WARN DAGScheduler: Broadcasting large task binary with size 2.4 MiB
23/04/13 14:58:16 WARN DAGScheduler: Broadcasting large task binary with size 2.4 MiB
23/04/13 14:58:16 WARN DAGScheduler: Broadcasting large task binary with size 2.4 MiB
23/04/13 14:58:16 WARN DAGScheduler: Broadcasting large task binary with size 2.4 MiB
23/04/13 14:58:16 WARN DAGScheduler: Broadcasting large task binary with size 2.4 MiB
23/04/13 14:58:16 WARN DAGScheduler: Broadcasting large task binary with size 2.4 MiB
23/04/13 14:58:16 WARN DAGScheduler: Broadcasting large task binary with size 2.4 MiB
23/04/13 14:58:16 WARN DAGScheduler: Broadcasting larg

In [13]:
predDF = pipelineModel.transform(testDF)

predDF.show(5)

23/04/13 14:58:35 WARN DAGScheduler: Broadcasting large task binary with size 3.2 MiB
+--------------------+-----+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+----------+
|                text|label|              tokens|     filtered_tokens|             vectors|            features|       rawPrediction|         probability|prediction|
+--------------------+-----+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+----------+
| I'm happy and co...|    0|[, i'm, happy, an...|  [, happy, content]|(88045,[0,95,1444...|(88045,[0,95,1444...|[8.03964118940184...|[0.99967767930539...|       0.0|
| Stray Cats ~ Str...|    0|[, stray, cats, ~...|[, stray, cats, ~...|(88045,[0,899,187...|(88045,[0,899,187...|[46.5586610189792...|           [1.0,0.0]|       0.0|
| Ty - now quit re...|    0|[, ty, -, now, qu...|[, ty, -, quit, r...|(88045,[0,171,

In [14]:
evaluator = MulticlassClassificationEvaluator(predictionCol='prediction', labelCol='label', metricName='accuracy')

accuracy = evaluator.evaluate(predDF)
print(f'Accuracy for logistic regression: {accuracy}')

23/04/13 14:58:35 WARN DAGScheduler: Broadcasting large task binary with size 3.1 MiB
Accuracy for logistic regression: 0.8470862989323843


In [15]:
# Save model
model_path = "../backend/models/lr_model"
pipelineModel.save(model_path)

23/04/13 14:58:36 WARN TaskSetManager: Stage 278 contains a task of very large size (1779 KiB). The maximum recommended task size is 1000 KiB.


                                                                                

23/04/13 14:58:37 WARN TaskSetManager: Stage 282 contains a task of very large size (1408 KiB). The maximum recommended task size is 1000 KiB.


In [16]:
# Load model
from pyspark.ml import PipelineModel

loaded_pipeline_model  = PipelineModel.load("../backend/models/lr_model")
df = pipelineModel.transform(df)

23/04/13 14:58:38 WARN StopWordsRemover: Default locale set was [en_HK]; however, it was not found in available locales in JVM, falling back to en_US locale. Set param `locale` in order to respect another locale.
23/04/13 14:58:39 WARN StopWordsRemover: Default locale set was [en_HK]; however, it was not found in available locales in JVM, falling back to en_US locale. Set param `locale` in order to respect another locale.


## Random Forest

In [17]:
rf = RandomForestClassifier(labelCol='label', maxBins=40, seed=42)

In [18]:
paramGrid = (ParamGridBuilder()
            .addGrid(rf.maxDepth, [2, 4, 6])
            .addGrid(rf.numTrees, [10, 100])
            .build())

In [19]:
evaluator = MulticlassClassificationEvaluator(predictionCol='prediction', labelCol='label', metricName='accuracy')

In [20]:
cv = CrossValidator(estimator=rf,
                    evaluator=evaluator,
                    estimatorParamMaps=paramGrid,
                    numFolds=3,
                    parallelism=4,
                    seed=42)

pipeline = Pipeline(stages=[tokenizer, stopwords_remover, vectorizer, idf, rf])

In [21]:
start_time = time.time()

pipelineModel = pipeline.fit(trainDF)
print('Time spent:', time.time() - start_time)

                                                                                

23/04/13 14:58:40 WARN DAGScheduler: Broadcasting large task binary with size 1123.2 KiB
23/04/13 14:58:41 WARN DAGScheduler: Broadcasting large task binary with size 1124.2 KiB
23/04/13 14:58:41 WARN DAGScheduler: Broadcasting large task binary with size 2.4 MiB
23/04/13 14:58:41 WARN DAGScheduler: Broadcasting large task binary with size 2.4 MiB
23/04/13 14:58:42 WARN DAGScheduler: Broadcasting large task binary with size 3.3 MiB


                                                                                

23/04/13 14:58:52 WARN DAGScheduler: Broadcasting large task binary with size 4.1 MiB
23/04/13 14:58:53 WARN MemoryStore: Not enough space to cache rdd_868_4 in memory! (computed 38.0 MiB so far)
23/04/13 14:58:53 WARN MemoryStore: Not enough space to cache rdd_868_0 in memory! (computed 59.5 MiB so far)
23/04/13 14:58:53 WARN MemoryStore: Not enough space to cache rdd_868_3 in memory! (computed 59.5 MiB so far)
23/04/13 14:58:53 WARN MemoryStore: Not enough space to cache rdd_868_2 in memory! (computed 59.5 MiB so far)
23/04/13 14:58:53 WARN MemoryStore: Not enough space to cache rdd_868_5 in memory! (computed 38.0 MiB so far)
23/04/13 14:58:53 WARN BlockManager: Persisting block rdd_868_4 to disk instead.
23/04/13 14:58:53 WARN BlockManager: Persisting block rdd_868_3 to disk instead.
23/04/13 14:58:53 WARN BlockManager: Persisting block rdd_868_5 to disk instead.
23/04/13 14:58:53 WARN BlockManager: Persisting block rdd_868_2 to disk instead.
23/04/13 14:58:53 WARN BlockManager: Per

[Stage 320:>                                                        (0 + 6) / 6]

23/04/13 14:58:54 WARN MemoryStore: Not enough space to cache rdd_868_1 in memory! (computed 319.5 MiB so far)
23/04/13 14:58:54 WARN BlockManager: Persisting block rdd_868_1 to disk instead.




23/04/13 14:59:07 WARN MemoryStore: Not enough space to cache rdd_868_1 in memory! (computed 59.5 MiB so far)
23/04/13 14:59:11 WARN MemoryStore: Not enough space to cache rdd_868_0 in memory! (computed 59.5 MiB so far)
23/04/13 14:59:12 WARN MemoryStore: Not enough space to cache rdd_868_2 in memory! (computed 59.5 MiB so far)
23/04/13 14:59:12 WARN MemoryStore: Not enough space to cache rdd_868_4 in memory! (computed 21.8 MiB so far)
23/04/13 14:59:12 WARN MemoryStore: Not enough space to cache rdd_868_3 in memory! (computed 91.7 MiB so far)


                                                                                

23/04/13 14:59:17 WARN DAGScheduler: Broadcasting large task binary with size 4.1 MiB
23/04/13 14:59:17 WARN MemoryStore: Not enough space to cache rdd_868_4 in memory! (computed 21.8 MiB so far)
23/04/13 14:59:17 WARN MemoryStore: Not enough space to cache rdd_868_1 in memory! (computed 38.0 MiB so far)
23/04/13 14:59:17 WARN MemoryStore: Not enough space to cache rdd_868_0 in memory! (computed 11.1 MiB so far)
23/04/13 14:59:17 WARN MemoryStore: Not enough space to cache rdd_868_2 in memory! (computed 21.8 MiB so far)


[Stage 322:>                                                        (0 + 6) / 6]

23/04/13 14:59:17 WARN MemoryStore: Not enough space to cache rdd_868_3 in memory! (computed 91.7 MiB so far)


                                                                                

23/04/13 14:59:24 WARN DAGScheduler: Broadcasting large task binary with size 4.2 MiB
23/04/13 14:59:25 WARN MemoryStore: Not enough space to cache rdd_868_4 in memory! (computed 38.0 MiB so far)
23/04/13 14:59:25 WARN MemoryStore: Not enough space to cache rdd_868_2 in memory! (computed 11.1 MiB so far)
23/04/13 14:59:25 WARN MemoryStore: Not enough space to cache rdd_868_0 in memory! (computed 21.8 MiB so far)
23/04/13 14:59:25 WARN MemoryStore: Not enough space to cache rdd_868_1 in memory! (computed 21.8 MiB so far)
23/04/13 14:59:25 WARN MemoryStore: Not enough space to cache rdd_868_3 in memory! (computed 21.8 MiB so far)


                                                                                

23/04/13 14:59:33 WARN DAGScheduler: Broadcasting large task binary with size 4.3 MiB
23/04/13 14:59:33 WARN MemoryStore: Not enough space to cache rdd_868_3 in memory! (computed 21.8 MiB so far)
23/04/13 14:59:33 WARN MemoryStore: Not enough space to cache rdd_868_2 in memory! (computed 21.8 MiB so far)
23/04/13 14:59:33 WARN MemoryStore: Not enough space to cache rdd_868_4 in memory! (computed 21.8 MiB so far)
23/04/13 14:59:33 WARN MemoryStore: Not enough space to cache rdd_868_0 in memory! (computed 21.8 MiB so far)
23/04/13 14:59:33 WARN MemoryStore: Not enough space to cache rdd_868_1 in memory! (computed 21.8 MiB so far)


                                                                                

23/04/13 14:59:39 WARN DAGScheduler: Broadcasting large task binary with size 4.3 MiB
23/04/13 14:59:39 WARN MemoryStore: Not enough space to cache rdd_868_4 in memory! (computed 11.1 MiB so far)
23/04/13 14:59:39 WARN MemoryStore: Not enough space to cache rdd_868_1 in memory! (computed 11.1 MiB so far)
23/04/13 14:59:39 WARN MemoryStore: Not enough space to cache rdd_868_0 in memory! (computed 11.1 MiB so far)
23/04/13 14:59:39 WARN MemoryStore: Not enough space to cache rdd_868_3 in memory! (computed 38.0 MiB so far)
23/04/13 14:59:40 WARN MemoryStore: Not enough space to cache rdd_868_2 in memory! (computed 91.7 MiB so far)


                                                                                

Time spent: 67.56023573875427


In [22]:
predDF = pipelineModel.transform(testDF)
accuracy = evaluator.evaluate(predDF)
print(f'Accuracy for random forest: {accuracy}')

23/04/13 14:59:47 WARN DAGScheduler: Broadcasting large task binary with size 2.6 MiB
Accuracy for random forest: 0.7068505338078291
