In [1]:
import pandas as pd
from pyspark.sql import SparkSession
import pyspark.sql.functions as F
import pyspark.sql.types as T
from pyspark.ml.feature import Tokenizer, HashingTF, IDF
from pyspark.ml.classification import LogisticRegression

In [2]:
# Build a spark context
hc = (SparkSession.builder
                  .appName('Toxic Comment Classification')
                  .enableHiveSupport()
                  .config("spark.executor.memory", "4G")
                  .config("spark.driver.memory","18G")
                  .config("spark.executor.cores","7")
                  .config("spark.python.worker.memory","4G")
                  .config("spark.driver.maxResultSize","0")
                  .config("spark.sql.crossJoin.enabled", "true")
                  .config("spark.serializer","org.apache.spark.serializer.KryoSerializer")
                  .config("spark.default.parallelism","2")
                  .getOrCreate())

In [3]:
hc.sparkContext.setLogLevel('INFO')

In [4]:
hc.version

'2.4.4'

In [5]:
def to_spark_df(fin):
    """
    Parse a filepath to a spark dataframe using the pandas api.
    
    Parameters
    ----------
    fin : str
        The path to the file on the local filesystem that contains the csv data.
        
    Returns
    -------
    df : pyspark.sql.dataframe.DataFrame
        A spark DataFrame containing the parsed csv data.
    """
    df = pd.read_csv(fin)
    df.fillna("", inplace=True)
    df = hc.createDataFrame(df)
    return(df)

# Load the train-test sets
train = to_spark_df("/Users/dayti/kaggle/Toxic Comment/train.csv")
test = to_spark_df("/Users/dayti/kaggle/Toxic Comment/test.csv")


In [6]:
out_cols = [i for i in train.columns if i not in ["id", "comment_text"]]

In [7]:
train.show(1)

+----------------+--------------------+-----+------------+-------+------+------+-------------+
|              id|        comment_text|toxic|severe_toxic|obscene|threat|insult|identity_hate|
+----------------+--------------------+-----+------------+-------+------+------+-------------+
|0000997932d777bf|Explanation
Why t...|    0|           0|      0|     0|     0|            0|
+----------------+--------------------+-----+------------+-------+------+------+-------------+
only showing top 1 row



In [8]:
train.filter(F.col('toxic') == 1).show(5)

+----------------+--------------------+-----+------------+-------+------+------+-------------+
|              id|        comment_text|toxic|severe_toxic|obscene|threat|insult|identity_hate|
+----------------+--------------------+-----+------------+-------+------+------+-------------+
|0002bcb3da6cb337|COCKSUCKER BEFORE...|    1|           1|      1|     0|     1|            0|
|0005c987bdfc9d4b|Hey... what is it...|    1|           0|      0|     0|     0|            0|
|0007e25b2121310b|Bye! 

Don't look...|    1|           0|      0|     0|     0|            0|
|001810bf8c45bf5f|You are gay or an...|    1|           0|      1|     0|     1|            1|
|00190820581d90ce|FUCK YOUR FILTHY ...|    1|           0|      1|     0|     1|            0|
+----------------+--------------------+-----+------------+-------+------+------+-------------+
only showing top 5 rows



In [9]:
# Basic sentence tokenizer
tokenizer = Tokenizer(inputCol="comment_text", outputCol="words")
wordsData = tokenizer.transform(train)

# Count the words in a document
hashingTF = HashingTF(inputCol="words", outputCol="rawFeatures")
tf = hashingTF.transform(wordsData)

In [10]:
tf.select('rawFeatures').take(2)

[Row(rawFeatures=SparseVector(262144, {19208: 1.0, 23032: 1.0, 24417: 1.0, 25000: 1.0, 29945: 1.0, 32241: 1.0, 32976: 1.0, 37852: 1.0, 46075: 1.0, 59853: 1.0, 72125: 1.0, 77971: 1.0, 81631: 1.0, 82999: 1.0, 83922: 1.0, 91677: 1.0, 97171: 1.0, 100258: 1.0, 101169: 1.0, 103838: 3.0, 110427: 1.0, 113031: 1.0, 113418: 1.0, 135568: 1.0, 139533: 1.0, 140784: 1.0, 145284: 1.0, 151536: 1.0, 164148: 1.0, 169364: 1.0, 176964: 1.0, 182267: 1.0, 192137: 1.0, 193131: 1.0, 229137: 1.0, 230921: 1.0, 231630: 1.0, 244466: 1.0, 246621: 1.0, 249835: 1.0, 253170: 1.0})),
 Row(rawFeatures=SparseVector(262144, {17429: 1.0, 38728: 1.0, 83815: 1.0, 88337: 1.0, 101527: 1.0, 101833: 1.0, 108541: 1.0, 125765: 1.0, 141219: 1.0, 151980: 1.0, 169364: 1.0, 169800: 1.0, 203235: 1.0, 208090: 1.0, 219140: 1.0, 242101: 1.0, 248135: 1.0, 249180: 1.0}))]

In [11]:
# Build the idf model and transform the original token frequencies into their tf-idf counterparts
idf = IDF(inputCol="rawFeatures", outputCol="features")
idfModel = idf.fit(tf) 
tfidf = idfModel.transform(tf)

In [12]:
tfidf.select("features").first()

Row(features=SparseVector(262144, {19208: 2.244, 23032: 5.0123, 24417: 0.7386, 25000: 5.6813, 29945: 3.0517, 32241: 8.3967, 32976: 5.0285, 37852: 1.7539, 46075: 6.9564, 59853: 3.1525, 72125: 2.2744, 77971: 7.6108, 81631: 3.4198, 82999: 7.5735, 83922: 6.4588, 91677: 0.6965, 97171: 2.0163, 100258: 1.1947, 101169: 1.734, 103838: 1.2127, 110427: 2.1173, 113031: 8.9845, 113418: 2.2023, 135568: 3.5864, 139533: 2.5136, 140784: 3.0483, 145284: 7.6628, 151536: 2.2412, 164148: 6.0064, 169364: 2.4772, 176964: 1.7656, 182267: 8.613, 192137: 3.1018, 193131: 5.6703, 229137: 4.5705, 230921: 2.0429, 231630: 8.2914, 244466: 3.351, 246621: 10.0343, 249835: 6.827, 253170: 2.7021}))

In [13]:
REG = 0.1
lr = LogisticRegression(featuresCol="features", labelCol='toxic', regParam=REG)

In [14]:
tfidf.show(5)

+----------------+--------------------+-----+------------+-------+------+------+-------------+--------------------+--------------------+--------------------+
|              id|        comment_text|toxic|severe_toxic|obscene|threat|insult|identity_hate|               words|         rawFeatures|            features|
+----------------+--------------------+-----+------------+-------+------+------+-------------+--------------------+--------------------+--------------------+
|0000997932d777bf|Explanation
Why t...|    0|           0|      0|     0|     0|            0|[explanation, why...|(262144,[19208,23...|(262144,[19208,23...|
|000103f0d9cfb60f|D'aww! He matches...|    0|           0|      0|     0|     0|            0|[d'aww!, he, matc...|(262144,[17429,38...|(262144,[17429,38...|
|000113f07ec002fd|Hey man, I'm real...|    0|           0|      0|     0|     0|            0|[hey, man,, i'm, ...|(262144,[14,9639,...|(262144,[14,9639,...|
|0001b41b1c6bb37e|"
More
I can't ma...|    0|       

In [15]:
lrModel = lr.fit(tfidf.limit(5000))

In [16]:
res_train = lrModel.transform(tfidf)

In [17]:
res_train.select("id", "toxic", "probability", "prediction").show(20)

+----------------+-----+--------------------+----------+
|              id|toxic|         probability|prediction|
+----------------+-----+--------------------+----------+
|0000997932d777bf|    0|[0.98581646633835...|       0.0|
|000103f0d9cfb60f|    0|[0.98344821303795...|       0.0|
|000113f07ec002fd|    0|[0.95500173136246...|       0.0|
|0001b41b1c6bb37e|    0|[0.99453009308165...|       0.0|
|0001d958c54c6e35|    0|[0.96269618805532...|       0.0|
|00025465d4725e87|    0|[0.95766822132553...|       0.0|
|0002bcb3da6cb337|    1|[0.27147906115805...|       1.0|
|00031b1e95af7921|    0|[0.96287942446786...|       0.0|
|00037261f536c51d|    0|[0.98502026615636...|       0.0|
|00040093b2687caa|    0|[0.96974700476352...|       0.0|
|0005300084f90edc|    0|[0.99999214813733...|       0.0|
|00054a5e18b50dd4|    0|[0.96909930941050...|       0.0|
|0005c987bdfc9d4b|    1|[0.04657011154294...|       1.0|
|0006f16e4e9f292e|    0|[0.99600536691196...|       0.0|
|00070ef96486d6f9|    0|[0.9804

In [18]:
res_train.show(5)

+----------------+--------------------+-----+------------+-------+------+------+-------------+--------------------+--------------------+--------------------+--------------------+--------------------+----------+
|              id|        comment_text|toxic|severe_toxic|obscene|threat|insult|identity_hate|               words|         rawFeatures|            features|       rawPrediction|         probability|prediction|
+----------------+--------------------+-----+------------+-------+------+------+-------------+--------------------+--------------------+--------------------+--------------------+--------------------+----------+
|0000997932d777bf|Explanation
Why t...|    0|           0|      0|     0|     0|            0|[explanation, why...|(262144,[19208,23...|(262144,[19208,23...|[4.24138850720369...|[0.98581646633835...|       0.0|
|000103f0d9cfb60f|D'aww! He matches...|    0|           0|      0|     0|     0|            0|[d'aww!, he, matc...|(262144,[17429,38...|(262144,[17429,38...

In [19]:
#Select second element in each row of the column vector
extract_prob = F.udf(lambda x: float(x[1]), T.FloatType())

In [20]:
(res_train.withColumn("proba", extract_prob("probability"))
 .select("proba", "prediction")
 .show())


+-----------+----------+
|      proba|prediction|
+-----------+----------+
|0.014183533|       0.0|
|0.016551787|       0.0|
| 0.04499827|       0.0|
|0.005469907|       0.0|
|0.037303813|       0.0|
|0.042331778|       0.0|
| 0.72852093|       1.0|
|0.037120577|       0.0|
|0.014979734|       0.0|
|0.030252995|       0.0|
|7.851862E-6|       0.0|
| 0.03090069|       0.0|
|  0.9534299|       1.0|
|0.003994633|       0.0|
|0.019571071|       0.0|
|0.008423822|       0.0|
|  0.7983589|       1.0|
|0.023836304|       0.0|
|0.019899674|       0.0|
|0.011591644|       0.0|
+-----------+----------+
only showing top 20 rows



In [21]:
#Create Results Dataframe

In [22]:
#Convert test text
test_tokens = tokenizer.transform(test)
test_tf = hashingTF.transform(test_tokens)
test_tfidf = idfModel.transform(test_tf)

In [23]:
#Initialize the new DataFrame with the id column
test_res = test.select('id')
test_res.head()

Row(id='00001cee341fdb12')

In [None]:
#Make predictions for each class
test_probs = []
for col in out_cols:
    print(col)
    lr = LogisticRegression(featuresCol="features", labelCol=col, regParam=REG)
    print("...fitting")
    lrModel = lr.fit(tfidf)
    print("...predicting")
    res = lrModel.transform(test_tfidf)
    print("...appending result")
    test_res = test_res.join(res.select('id', 'probability'), on="id")
    print("...extracting probability")
    test_res = test_res.withColumn(col, extract_prob('probability')).drop("probability")
    test_res.show(5)


toxic
...fitting
...predicting
...appending result
...extracting probability
+----------------+------------+
|              id|       toxic|
+----------------+------------+
|000968ce11f5ee34|  0.04655437|
|00491682330fdd1d|3.6486778E-8|
|008eb47c4684d190|   0.6308229|
|00d251f47486b6d2|  0.06102414|
|0114ae82c53101a9|  0.43038085|
+----------------+------------+
only showing top 5 rows

severe_toxic
...fitting
...predicting
...appending result
...extracting probability


In [None]:
test_res.show(5)