In [4]:
from pyspark.ml.feature import HashingTF, IDF, Tokenizer
from pyspark.context import SparkContext
from pyspark.sql.session import SparkSession
from pyspark.ml.classification import LogisticRegression
import pyspark.sql.functions as F
import pyspark.sql.types as T

sc = SparkContext('local')
spark = SparkSession(sc)

In [5]:
df = spark.read.format("csv").option("inferschema","true").option("header", "true").option("delimiter", "\t").load("trainReviews.tsv")

In [37]:
tokenizer = Tokenizer(inputCol="text", outputCol="words")
wordsData = tokenizer.transform(df)
wordsData.show(5)

+----+--------+--------------------+--------------------+
|  id|category|                text|               words|
+----+--------+--------------------+--------------------+
| 858|       0|burnt money is th...|[burnt, money, is...|
|1762|       1|the   italian hit...|[the, , , italian...|
| 235|       0|at times   you d ...|[at, times, , , y...|
| 712|       0|after a marketing...|[after, a, market...|
|1319|       1|john cusack is th...|[john, cusack, is...|
+----+--------+--------------------+--------------------+
only showing top 5 rows



In [7]:
hashingTF = HashingTF(inputCol="words", outputCol="rawFeatures")
tf = hashingTF.transform(wordsData)
tf.show(10)

+----+--------+--------------------+--------------------+--------------------+
|  id|category|                text|               words|         rawFeatures|
+----+--------+--------------------+--------------------+--------------------+
| 858|       0|burnt money is th...|[burnt, money, is...|(262144,[14,2437,...|
|1762|       1|the   italian hit...|[the, , , italian...|(262144,[5090,963...|
| 235|       0|at times   you d ...|[at, times, , , y...|(262144,[1846,357...|
| 712|       0|after a marketing...|[after, a, market...|(262144,[3760,392...|
|1319|       1|john cusack is th...|[john, cusack, is...|(262144,[14,1998,...|
|1488|       1|every once in a w...|[every, once, in,...|(262144,[2366,329...|
|  76|       0|for better or wor...|[for, better, or,...|(262144,[14,991,1...|
|  69|       0|   first rule of ...|[, , , first, rul...|(262144,[14,571,1...|
|1144|       1|it is hard to ima...|[it, is, hard, to...|(262144,[2329,243...|
|  75|       0|late in down to y...|[late, in, down,

In [36]:
tf.head().rawFeatures

SparseVector(262144, {14: 1.0, 2437: 1.0, 3234: 1.0, 6061: 1.0, 7156: 2.0, 9255: 1.0, 9639: 5.0, 9781: 1.0, 13981: 1.0, 15664: 3.0, 15889: 6.0, 16332: 2.0, 17291: 7.0, 18448: 1.0, 18483: 1.0, 19153: 1.0, 19370: 1.0, 20832: 1.0, 21300: 1.0, 22264: 1.0, 22323: 1.0, 23326: 1.0, 24417: 1.0, 24661: 1.0, 24980: 1.0, 25570: 2.0, 28818: 2.0, 29066: 1.0, 30569: 2.0, 30700: 1.0, 34116: 3.0, 35383: 1.0, 35661: 1.0, 36564: 1.0, 40108: 2.0, 40343: 1.0, 40732: 1.0, 41213: 1.0, 41508: 1.0, 42239: 1.0, 45441: 1.0, 45818: 1.0, 46762: 2.0, 48448: 3.0, 50940: 6.0, 52617: 1.0, 54383: 1.0, 55242: 1.0, 56063: 2.0, 56715: 1.0, 57304: 1.0, 59733: 1.0, 60268: 1.0, 60483: 1.0, 63422: 1.0, 66314: 1.0, 66980: 2.0, 69529: 1.0, 70869: 1.0, 76764: 1.0, 77142: 1.0, 77372: 2.0, 79323: 1.0, 79364: 2.0, 79737: 1.0, 79876: 1.0, 85125: 2.0, 86175: 6.0, 86436: 5.0, 86832: 1.0, 87052: 1.0, 87927: 1.0, 89356: 3.0, 89457: 1.0, 91677: 14.0, 94518: 1.0, 94533: 8.0, 95457: 1.0, 95543: 1.0, 96257: 1.0, 96638: 2.0, 96717: 2.0, 984

In [12]:
idf = IDF(inputCol="rawFeatures", outputCol="features", minDocFreq=2).fit(tf)
tfidf = idf.transform(tf)

In [13]:
tfidf.show(5)

+----+--------+--------------------+--------------------+--------------------+--------------------+
|  id|category|                text|               words|         rawFeatures|            features|
+----+--------+--------------------+--------------------+--------------------+--------------------+
| 858|       0|burnt money is th...|[burnt, money, is...|(262144,[14,2437,...|(262144,[14,2437,...|
|1762|       1|the   italian hit...|[the, , , italian...|(262144,[5090,963...|(262144,[5090,963...|
| 235|       0|at times   you d ...|[at, times, , , y...|(262144,[1846,357...|(262144,[1846,357...|
| 712|       0|after a marketing...|[after, a, market...|(262144,[3760,392...|(262144,[3760,392...|
|1319|       1|john cusack is th...|[john, cusack, is...|(262144,[14,1998,...|(262144,[14,1998,...|
+----+--------+--------------------+--------------------+--------------------+--------------------+
only showing top 5 rows



In [14]:
ml = LogisticRegression(featuresCol="features", labelCol='category', regParam=0.01)

In [15]:
mlModel = ml.fit(tfidf.limit(5000))

In [16]:
res_train = mlModel.transform(tfidf)

In [17]:
extract_prob = F.udf(lambda x: float(x[1]), T.FloatType())

In [18]:
res_train.withColumn("proba", extract_prob("probability")).select("id", "proba", "prediction").show()

+----+------------+----------+
|  id|       proba|prediction|
+----+------------+----------+
| 858| 0.004850338|       0.0|
|1762|   0.9935536|       1.0|
| 235|0.0043423125|       0.0|
| 712|0.0018025053|       0.0|
|1319|  0.99910474|       1.0|
|1488|   0.9971708|       1.0|
|  76|0.0020437534|       0.0|
|  69|0.0027964748|       0.0|
|1144|   0.9984451|       1.0|
|  75|0.0014211949|       0.0|
|1816|  0.99190885|       1.0|
|1542|  0.99767244|       1.0|
|1153|  0.99755925|       1.0|
|1904|  0.99952346|       1.0|
|  93|0.0041584536|       0.0|
|1669|   0.9992438|       1.0|
|1348|  0.99620974|       1.0|
|1610|  0.99875736|       1.0|
| 684|0.0033833312|       0.0|
| 918|0.0029977756|       0.0|
+----+------------+----------+
only showing top 20 rows



In [19]:
test_df = spark.read.format("csv").option("inferschema","true").option("header", "true").option("delimiter", "\t").load("testReviews.tsv")

In [20]:
tokenizer = Tokenizer(inputCol="text", outputCol="words")
wordsData = tokenizer.transform(test_df)
wordsData.show(5)

+----+--------+--------------------+--------------------+
|  id|category|                text|               words|
+----+--------+--------------------+--------------------+
|1790|       1|you leave little ...|[you, leave, litt...|
| 243|       0|the most popular ...|[the, most, popul...|
|1501|       1|towards the middl...|[towards, the, mi...|
|1229|       1|the   submarine  ...|[the, , , submari...|
|1618|       1|a welcome cinemat...|[a, welcome, cine...|
+----+--------+--------------------+--------------------+
only showing top 5 rows



In [21]:
test_tf = hashingTF.transform(wordsData)
test_tf.show(10)

+----+--------+--------------------+--------------------+--------------------+
|  id|category|                text|               words|         rawFeatures|
+----+--------+--------------------+--------------------+--------------------+
|1790|       1|you leave little ...|[you, leave, litt...|(262144,[15,4200,...|
| 243|       0|the most popular ...|[the, most, popul...|(262144,[2192,243...|
|1501|       1|towards the middl...|[towards, the, mi...|(262144,[925,2026...|
|1229|       1|the   submarine  ...|[the, , , submari...|(262144,[14,3023,...|
|1618|       1|a welcome cinemat...|[a, welcome, cine...|(262144,[2325,261...|
|1582|       1|this movie was on...|[this, movie, was...|(262144,[14,1889,...|
| 488|       0|where do i begin ...|[where, do, i, be...|(262144,[14,2410,...|
|1093|       1|accepting his osc...|[accepting, his, ...|(262144,[1414,606...|
|1280|       1|written by john g...|[written, by, joh...|(262144,[14,2705,...|
|1655|       1|   a man is not a...|[, , , a, man, i

In [22]:
test_idf = IDF(inputCol="rawFeatures", outputCol="features", minDocFreq=2).fit(test_tf)
test_tfidf = idf.transform(test_tf)

In [23]:
test_tfidf.show(5)

+----+--------+--------------------+--------------------+--------------------+--------------------+
|  id|category|                text|               words|         rawFeatures|            features|
+----+--------+--------------------+--------------------+--------------------+--------------------+
|1790|       1|you leave little ...|[you, leave, litt...|(262144,[15,4200,...|(262144,[15,4200,...|
| 243|       0|the most popular ...|[the, most, popul...|(262144,[2192,243...|(262144,[2192,243...|
|1501|       1|towards the middl...|[towards, the, mi...|(262144,[925,2026...|(262144,[925,2026...|
|1229|       1|the   submarine  ...|[the, , , submari...|(262144,[14,3023,...|(262144,[14,3023,...|
|1618|       1|a welcome cinemat...|[a, welcome, cine...|(262144,[2325,261...|(262144,[2325,261...|
+----+--------+--------------------+--------------------+--------------------+--------------------+
only showing top 5 rows



In [24]:
res_test = mlModel.transform(test_tfidf)

In [28]:
res_test.show(2)

+----+--------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+----------+
|  id|category|                text|               words|         rawFeatures|            features|       rawPrediction|         probability|prediction|
+----+--------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+----------+
|1790|       1|you leave little ...|[you, leave, litt...|(262144,[15,4200,...|(262144,[15,4200,...|[-1.6342528526597...|[0.16324859566363...|       1.0|
| 243|       0|the most popular ...|[the, most, popul...|(262144,[2192,243...|(262144,[2192,243...|[2.62731392262806...|[0.93259890352084...|       0.0|
+----+--------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+----------+
only showing top 2 rows



In [29]:
res_test.withColumn("proba", extract_prob("probability")).select("id", "proba", "prediction").show(10)

+----+-----------+----------+
|  id|      proba|prediction|
+----+-----------+----------+
|1790|  0.8367514|       1.0|
| 243|0.067401096|       0.0|
|1501| 0.88819647|       1.0|
|1229|  0.7159783|       1.0|
|1618|  0.5893712|       1.0|
|1582| 0.59913754|       1.0|
| 488| 0.09497543|       0.0|
|1093|  0.4517028|       0.0|
|1280|  0.9903946|       1.0|
|1655|  0.9998224|       1.0|
+----+-----------+----------+
only showing top 10 rows

