In [1]:
spark = SparkSession.builder \
   .master("local") \
   .appName("Natural Language Processing") \
   .config("spark.executor.memory", "6gb") \
   .getOrCreate()

In [2]:
df = spark.read.format('com.databricks.spark.csv')\
                    .options(header='true', inferschema='true')\
                    .load('TherapyBotSession.csv')


In [3]:
df.show()

+-----------+-----------+--------------------+----+----+----+----+----+
|response_id|      class|       response_text| _c3| _c4| _c5| _c6| _c7|
+-----------+-----------+--------------------+----+----+----+----+----+
| response_1|not_flagged|I try and avoid t...|null|null|null|null|null|
| response_2|    flagged|Had a friend open...|null|null|null|null|null|
| response_3|    flagged|I saved a girl fr...|null|null|null|null|null|
| response_4|not_flagged|i cant think of o...|null|null|null|null|null|
| response_5|not_flagged|"Only really one ...|    |null|null|null|null|
| response_6|not_flagged|a couple of years...|null|null|null|null|null|
| response_7|    flagged|Roommate when he ...|null|null|null|null|null|
| response_8|    flagged|i've had a couple...|null|null|null|null|null|
| response_9|not_flagged|Listened to someo...|null|null|null|null|null|
|response_10|    flagged|I will always lis...|null|null|null|null|null|
|response_11|not_flagged|Took a week off w...|null|null|null|nul

In [4]:
df = df.select('class', 'response_text')

In [5]:
df.show()

+-----------+--------------------+
|      class|       response_text|
+-----------+--------------------+
|not_flagged|I try and avoid t...|
|    flagged|Had a friend open...|
|    flagged|I saved a girl fr...|
|not_flagged|i cant think of o...|
|not_flagged|"Only really one ...|
|not_flagged|a couple of years...|
|    flagged|Roommate when he ...|
|    flagged|i've had a couple...|
|not_flagged|Listened to someo...|
|    flagged|I will always lis...|
|not_flagged|Took a week off w...|
|    flagged|On the memorial a...|
|not_flagged|Anxious girlfrien...|
|not_flagged|               Never|
|not_flagged|        You as a mom|
|    flagged|ex gf was a cutte...|
|not_flagged|I have helped adv...|
|not_flagged|I've helped frien...|
|not_flagged|A friend that is ...|
|not_flagged|expressing concer...|
+-----------+--------------------+
only showing top 20 rows



In [6]:
df.groupBy("class") \
    .count() \
    .orderBy("count", ascending = False) \
    .show()

+-----------+-----+
|      class|count|
+-----------+-----+
|not_flagged|   55|
|    flagged|   25|
+-----------+-----+



In [7]:
import pyspark.sql.functions as F

In [8]:
df = df.withColumn('word_count',F.size(F.split(F.col('response_text'),' ')))

In [9]:
df.show()

+-----------+--------------------+----------+
|      class|       response_text|word_count|
+-----------+--------------------+----------+
|not_flagged|I try and avoid t...|         8|
|    flagged|Had a friend open...|        25|
|    flagged|I saved a girl fr...|        29|
|not_flagged|i cant think of o...|        11|
|not_flagged|"Only really one ...|        74|
|not_flagged|a couple of years...|        25|
|    flagged|Roommate when he ...|        21|
|    flagged|i've had a couple...|        38|
|not_flagged|Listened to someo...|        13|
|    flagged|I will always lis...|        41|
|not_flagged|Took a week off w...|        60|
|    flagged|On the memorial a...|        22|
|not_flagged|Anxious girlfrien...|         6|
|not_flagged|               Never|         1|
|not_flagged|        You as a mom|         4|
|    flagged|ex gf was a cutte...|        16|
|not_flagged|I have helped adv...|        12|
|not_flagged|I've helped frien...|         5|
|not_flagged|A friend that is ...|

In [10]:
df.groupBy('class')\
    .agg(F.avg('word_count').alias('avg_word_count'))\
    .orderBy('avg_word_count', ascending = False) \
    .show()


+-----------+-----------------+
|      class|   avg_word_count|
+-----------+-----------------+
|    flagged|             50.6|
|not_flagged|22.69090909090909|
+-----------+-----------------+



In [11]:
from textblob import TextBlob
def sentiment_score(response_text):
    try:
        return TextBlob(response_text).sentiment.polarity
    except:
        return 'This is not working'


In [12]:
from pyspark.sql.types import FloatType
sentiment_score_udf = F.udf(lambda x: sentiment_score(x), FloatType())
df = df.select('class', 'response_text','word_count',
                   sentiment_score_udf('response_text').alias('sentiment_score'))

In [13]:
df.show()

+-----------+--------------------+----------+---------------+
|      class|       response_text|word_count|sentiment_score|
+-----------+--------------------+----------+---------------+
|not_flagged|I try and avoid t...|         8|            0.0|
|    flagged|Had a friend open...|        25|          -0.05|
|    flagged|I saved a girl fr...|        29|          0.495|
|not_flagged|i cant think of o...|        11|            0.0|
|not_flagged|"Only really one ...|        74|    0.036363635|
|not_flagged|a couple of years...|        25|           -0.1|
|    flagged|Roommate when he ...|        21|            0.0|
|    flagged|i've had a couple...|        38|     0.16666667|
|not_flagged|Listened to someo...|        13|            0.0|
|    flagged|I will always lis...|        41|         -0.025|
|not_flagged|Took a week off w...|        60|     0.16666667|
|    flagged|On the memorial a...|        22|            0.0|
|not_flagged|Anxious girlfrien...|         6|          -0.25|
|not_fla

In [14]:
df.groupBy('class')\
    .agg(F.avg('sentiment_score').alias('avg_sentiment_score'))\
    .orderBy('avg_sentiment_score', ascending = False) \
    .show()

+-----------+-------------------+
|      class|avg_sentiment_score|
+-----------+-------------------+
|    flagged|0.09282141797244549|
|not_flagged|0.03127356884493069|
+-----------+-------------------+



In [15]:
df = df.withColumn('words',F.split(F.col('response_text'),' '))
df.show()

+-----------+--------------------+----------+---------------+--------------------+
|      class|       response_text|word_count|sentiment_score|               words|
+-----------+--------------------+----------+---------------+--------------------+
|not_flagged|I try and avoid t...|         8|            0.0|[I, try, and, avo...|
|    flagged|Had a friend open...|        25|          -0.05|[Had, a, friend, ...|
|    flagged|I saved a girl fr...|        29|          0.495|[I, saved, a, gir...|
|not_flagged|i cant think of o...|        11|            0.0|[i, cant, think, ...|
|not_flagged|"Only really one ...|        74|    0.036363635|["Only, really, o...|
|not_flagged|a couple of years...|        25|           -0.1|[a, couple, of, y...|
|    flagged|Roommate when he ...|        21|            0.0|[Roommate, when, ...|
|    flagged|i've had a couple...|        38|     0.16666667|[i've, had, a, co...|
|not_flagged|Listened to someo...|        13|            0.0|[Listened, to, so...|
|   

In [16]:
stop_words = ['i','me','my','myself','we','our','ours','ourselves',
              'you','your','yours','yourself','yourselves','he','him',
              'his','himself','she','her','hers','herself','it','its',
              'itself','they','them','their','theirs','themselves',
              'what','which','who','whom','this','that','these','those',
              'am','is','are','was','were','be','been','being','have',
              'has','had','having','do','does','did','doing','a','an',
              'the','and','but','if','or','because','as','until','while',
              'of','at','by','for','with','about','against','between',
              'into','through','during','before','after','above','below',
              'to','from','up','down','in','out','on','off','over','under',
              'again','further','then','once','here','there','when','where',
              'why','how','all','any','both','each','few','more','most',
              'other','some','such','no','nor','not','only','own','same',
              'so','than','too','very','can','will','just','don','should','now']

In [17]:
from pyspark.ml.feature import StopWordsRemover 

In [18]:
stopwordsRemovalFeature = StopWordsRemover(inputCol="words", 
                                           outputCol="words without stop").setStopWords(stop_words)

In [19]:
from pyspark.ml import Pipeline
stopWordRemovalPipeline = Pipeline(stages=[stopwordsRemovalFeature])

In [20]:
pipelineFitRemoveStopWords = stopWordRemovalPipeline.fit(df)
df = pipelineFitRemoveStopWords.transform(df)
df.show(5)

+-----------+--------------------+----------+---------------+--------------------+--------------------+
|      class|       response_text|word_count|sentiment_score|               words|  words without stop|
+-----------+--------------------+----------+---------------+--------------------+--------------------+
|not_flagged|I try and avoid t...|         8|            0.0|[I, try, and, avo...|[try, avoid, sort...|
|    flagged|Had a friend open...|        25|          -0.05|[Had, a, friend, ...|[friend, open, me...|
|    flagged|I saved a girl fr...|        29|          0.495|[I, saved, a, gir...|[saved, girl, sui...|
|not_flagged|i cant think of o...|        11|            0.0|[i, cant, think, ...|[cant, think, one...|
|not_flagged|"Only really one ...|        74|    0.036363635|["Only, really, o...|["Only, really, o...|
+-----------+--------------------+----------+---------------+--------------------+--------------------+
only showing top 5 rows



In [21]:
label = F.udf(lambda x: 1.0 if x == 'flagged' else 0.0, FloatType())
df = df.withColumn('label', label('class'))

In [22]:
df.select('class', 'label').show()

+-----------+-----+
|      class|label|
+-----------+-----+
|not_flagged|  0.0|
|    flagged|  1.0|
|    flagged|  1.0|
|not_flagged|  0.0|
|not_flagged|  0.0|
|not_flagged|  0.0|
|    flagged|  1.0|
|    flagged|  1.0|
|not_flagged|  0.0|
|    flagged|  1.0|
|not_flagged|  0.0|
|    flagged|  1.0|
|not_flagged|  0.0|
|not_flagged|  0.0|
|not_flagged|  0.0|
|    flagged|  1.0|
|not_flagged|  0.0|
|not_flagged|  0.0|
|not_flagged|  0.0|
|not_flagged|  0.0|
+-----------+-----+
only showing top 20 rows



In [23]:
import pyspark.ml.feature as feat
TF_ = feat.HashingTF(inputCol="words without stop", outputCol="rawFeatures", numFeatures=10000)
IDF_ = feat.IDF(inputCol="rawFeatures", outputCol="features")

In [24]:
pipelineTFIDF = Pipeline(stages=[TF_, IDF_])

In [25]:
pipelineFit = pipelineTFIDF.fit(df)
df = pipelineFit.transform(df)

In [26]:
df.select('label', 'rawFeatures','features').show()

+-----+--------------------+--------------------+
|label|         rawFeatures|            features|
+-----+--------------------+--------------------+
|  0.0|(10000,[4109,4159...|(10000,[4109,4159...|
|  1.0|(10000,[2087,2801...|(10000,[2087,2801...|
|  1.0|(10000,[399,828,1...|(10000,[399,828,1...|
|  0.0|(10000,[1564,2044...|(10000,[1564,2044...|
|  0.0|(10000,[60,232,52...|(10000,[60,232,52...|
|  0.0|(10000,[618,696,1...|(10000,[618,696,1...|
|  1.0|(10000,[482,903,1...|(10000,[482,903,1...|
|  1.0|(10000,[264,821,8...|(10000,[264,821,8...|
|  0.0|(10000,[3118,4260...|(10000,[3118,4260...|
|  1.0|(10000,[76,230,10...|(10000,[76,230,10...|
|  0.0|(10000,[87,230,11...|(10000,[87,230,11...|
|  1.0|(10000,[695,1120,...|(10000,[695,1120,...|
|  0.0|(10000,[1843,5666...|(10000,[1843,5666...|
|  0.0|(10000,[2945],[1.0])|(10000,[2945],[3....|
|  0.0|(10000,[5956],[1.0])|(10000,[5956],[3....|
|  1.0|(10000,[274,828,1...|(10000,[274,828,1...|
|  0.0|(10000,[1619,4255...|(10000,[1619,4255...|


In [27]:
(trainingDF, testDF) = df.randomSplit([0.75, 0.25], seed = 1234)

In [28]:
from pyspark.ml.classification import LogisticRegression
logreg = LogisticRegression(regParam=0.25)

In [29]:
logregModel = logreg.fit(trainingDF)

In [30]:
predictionDF = logregModel.transform(testDF)

In [31]:
predictionDF.crosstab('label', 'prediction').show()

+----------------+---+---+
|label_prediction|0.0|1.0|
+----------------+---+---+
|             1.0|  2|  2|
|             0.0| 12|  0|
+----------------+---+---+



In [32]:
from sklearn import metrics
actual = predictionDF.select('label').toPandas()
predicted = predictionDF.select('prediction').toPandas()

In [33]:
metrics.accuracy_score(actual, predicted)

0.875

In [34]:
predictionDF.describe('label').show()

+-------+-------------------+
|summary|              label|
+-------+-------------------+
|  count|                 16|
|   mean|               0.25|
| stddev|0.44721359549995804|
|    min|                0.0|
|    max|                1.0|
+-------+-------------------+

