In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, IntegerType


In [2]:
spark = SparkSession.builder.appName("tweets").getOrCreate()

Using Spark's default log4j profile: org/apache/spark/log4j2-defaults.properties
25/11/17 18:41:12 WARN Utils: Your hostname, Soroushs-MacBook-Air.local, resolves to a loopback address: 127.0.0.1; using 192.168.1.101 instead (on interface en0)
25/11/17 18:41:12 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Using Spark's default log4j profile: org/apache/spark/log4j2-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/11/17 18:41:12 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [3]:
schema = StructType([
    StructField('target', IntegerType(), True),
    StructField('id', IntegerType(), True),
    StructField('date', StringType(), True),
    StructField('flag', StringType(), True),
    StructField('user', StringType(), True),
    StructField('text', StringType(), True)
])

In [4]:
df = spark.read.csv("./database", schema=schema)
df.show(n=5)

+------+----------+--------------------+--------+---------------+--------------------+
|target|        id|                date|    flag|           user|                text|
+------+----------+--------------------+--------+---------------+--------------------+
|     0|1467810369|Mon Apr 06 22:19:...|NO_QUERY|_TheSpecialOne_|@switchfoot http:...|
|     0|1467810672|Mon Apr 06 22:19:...|NO_QUERY|  scotthamilton|is upset that he ...|
|     0|1467810917|Mon Apr 06 22:19:...|NO_QUERY|       mattycus|@Kenichan I dived...|
|     0|1467811184|Mon Apr 06 22:19:...|NO_QUERY|        ElleCTF|my whole body fee...|
|     0|1467811193|Mon Apr 06 22:19:...|NO_QUERY|         Karoli|@nationwideclass ...|
+------+----------+--------------------+--------+---------------+--------------------+
only showing top 5 rows


In [5]:
df = df.select("target", "text")
df.show(n=5)

+------+--------------------+
|target|                text|
+------+--------------------+
|     0|@switchfoot http:...|
|     0|is upset that he ...|
|     0|@Kenichan I dived...|
|     0|my whole body fee...|
|     0|@nationwideclass ...|
+------+--------------------+
only showing top 5 rows


In [6]:
df.groupBy("target").count().show()

+------+------+
|target| count|
+------+------+
|     0|800000|
|     4|800000|
+------+------+



In [7]:
from pyspark.sql.functions import when
change_target = when(df['target'] == 4, 1).otherwise(df['target'])
df = df.withColumn('target', change_target)

In [8]:
df.groupBy("target").count().show()

+------+------+
|target| count|
+------+------+
|     0|800000|
|     1|800000|
+------+------+



In [9]:
null_values = df.filter(df['text'].isNull()).count()
null_values

0

In [10]:
import re
def preprocess_tweet(tweet):
    tweet = tweet.lower()
    # Remove URLs, mentions, and hashtags
    tweet = re.sub(r'@\w+|\w+://\S+|(#\S+)', '', tweet)
    # Remove non-letters e.g punctuation, numbers
    tweet = re.sub(r'[^a-zA-Z\s]+', '', tweet) 
    return tweet  

In [11]:
from pyspark.sql.functions import udf
preprocess_udf = udf(preprocess_tweet, StringType())
new_df = df.withColumn('text', preprocess_udf(df['text']))
new_df.show(n=5)

+------+--------------------+
|target|                text|
+------+--------------------+
|     0|   awww thats a b...|
|     0|is upset that he ...|
|     0| i dived many tim...|
|     0|my whole body fee...|
|     0| no its not behav...|
+------+--------------------+
only showing top 5 rows


In [12]:
rows = df.select('text').collect()
print(rows[8]["text"])

new_rows = new_df.select('text').collect()
print(new_rows[8]["text"])

@Tatiana_K nope they didn't have it 


                                                                                

 nope they didnt have it 


In [13]:
from nltk.stem import PorterStemmer

ps = PorterStemmer()

def porterStemmerTweet(tweet):
    if tweet is None:
        return None
    new_tweet = ""
    for word in tweet.split(" "):
        new_tweet += ps.stem(word) + " "
    return new_tweet.strip()

In [14]:
new_df.show(n=5)

+------+--------------------+
|target|                text|
+------+--------------------+
|     0|   awww thats a b...|
|     0|is upset that he ...|
|     0| i dived many tim...|
|     0|my whole body fee...|
|     0| no its not behav...|
+------+--------------------+
only showing top 5 rows


In [15]:
preprocess_stem_udf = udf(porterStemmerTweet, StringType())
new_df = new_df.withColumn('text', preprocess_stem_udf(new_df['text']))
new_df.show(n=5)

+------+--------------------+
|target|                text|
+------+--------------------+
|     0|awww that a bumme...|
|     0|is upset that he ...|
|     0|i dive mani time ...|
|     0|my whole bodi fee...|
|     0|no it not behav a...|
+------+--------------------+
only showing top 5 rows


In [16]:
from pyspark.ml.feature import Tokenizer, StopWordsRemover, HashingTF, IDF

In [17]:
tokenizer = Tokenizer(inputCol="text", outputCol="tokens")
remover = StopWordsRemover(inputCol="tokens", outputCol="filtered_tokens")
df_tokens = tokenizer.transform(df)
# df_tokens.show(truncate=False)

df_remover = remover.transform(df_tokens)
df_remover.show(truncate=False)

+------+---------------------------------------------------------------------------------------------------------------------+---------------------------------------------------------------------------------------------------------------------------------------------+------------------------------------------------------------------------------------------------------------+
|target|text                                                                                                                 |tokens                                                                                                                                       |filtered_tokens                                                                                             |
+------+---------------------------------------------------------------------------------------------------------------------+----------------------------------------------------------------------------------------------------------------------

In [18]:
hashing_tf = HashingTF(
    inputCol="filtered_tokens",
    outputCol="raw_features",
    numFeatures=2**14  # you can tune this
)
hashing_df = hashing_tf.transform(df_remover)
hashing_df.show(truncate=True)

+------+--------------------+--------------------+--------------------+--------------------+
|target|                text|              tokens|     filtered_tokens|        raw_features|
+------+--------------------+--------------------+--------------------+--------------------+
|     0|@switchfoot http:...|[@switchfoot, htt...|[@switchfoot, htt...|(16384,[45,3420,3...|
|     0|is upset that he ...|[is, upset, that,...|[upset, update, f...|(16384,[3420,8433...|
|     0|@Kenichan I dived...|[@kenichan, i, di...|[@kenichan, dived...|(16384,[1219,1616...|
|     0|my whole body fee...|[my, whole, body,...|[whole, body, fee...|(16384,[1353,5607...|
|     0|@nationwideclass ...|[@nationwideclass...|[@nationwideclass...|(16384,[1968,2096...|
|     0|@Kwesidei not the...|[@kwesidei, not, ...|[@kwesidei, whole...|(16384,[1157,7174...|
|     0|         Need a hug |      [need, a, hug]|         [need, hug]|(16384,[106,1241]...|
|     0|@LOLTrish hey  lo...|[@loltrish, hey, ...|[@loltrish, hey, ...

In [19]:
from pyspark.ml.classification import LinearSVC


In [20]:
train_df, test_df = hashing_df.randomSplit([0.9, 0.1], seed=42)

In [21]:
svm = LinearSVC(
    featuresCol="raw_features",
    labelCol="target",
    maxIter=50,
    regParam=0.1
)

In [22]:
model = svm.fit(train_df)

25/11/17 18:41:51 WARN InstanceBuilder: Failed to load implementation from:dev.ludovic.netlib.blas.JNIBLAS
                                                                                

In [23]:
predictions = model.transform(test_df)
predictions.select("target", "prediction", "rawPrediction").show(10, truncate=False)

[Stage 228:>                                                        (0 + 1) / 1]

+------+----------+--------------------------------------------+
|target|prediction|rawPrediction                               |
+------+----------+--------------------------------------------+
|0     |1.0       |[-0.038868793497977144,0.038868793497977144]|
|0     |0.0       |[0.5376943153814804,-0.5376943153814804]    |
|0     |0.0       |[0.4315355477161895,-0.4315355477161895]    |
|0     |0.0       |[1.1223282742957263,-1.1223282742957263]    |
|0     |0.0       |[0.12540390495921547,-0.12540390495921547]  |
|0     |1.0       |[-0.19429563959469032,0.19429563959469032]  |
|0     |0.0       |[0.5926172025433687,-0.5926172025433687]    |
|0     |0.0       |[1.4963414355729696,-1.4963414355729696]    |
|0     |0.0       |[3.3829374587804013,-3.3829374587804013]    |
|0     |1.0       |[-1.8526520991722015,1.8526520991722015]    |
+------+----------+--------------------------------------------+
only showing top 10 rows


                                                                                

In [24]:
from pyspark.ml.evaluation import BinaryClassificationEvaluator

evaluator = BinaryClassificationEvaluator(
    labelCol="target",
    rawPredictionCol="rawPrediction",   # default for LinearSVC & LogisticRegression
    metricName="areaUnderROC"           # or "areaUnderPR"
)

auc = evaluator.evaluate(predictions)
print("AUC (ROC):", auc)

                                                                                

AUC (ROC): 0.8208923795196732
