In [1]:
import pyspark
from pyspark.sql import functions as fn
from pyspark.sql import SQLContext


conf = pyspark.SparkConf().setAppName('read_textRDD').setMaster('local[*]')
sc = pyspark.SparkContext(conf=conf)
sqlContext = SQLContext(sc)

rdd_pos = sc.wholeTextFiles("./aclImdb/train/pos") 
rdd_neg = sc.wholeTextFiles("./aclImdb/train/neg")
df_pos = rdd_pos.toDF()
df_neg = rdd_neg.toDF()
df_pos = df_pos.toDF('Path', 'Review')
df_neg = df_neg.toDF('Path', 'Review')

In [2]:
df_pos = df_pos.select(fn.regexp_extract('Path', r'(pos/\d+_\d)', 1).\
            alias('id'), 'review')
df_neg = df_neg.select(fn.regexp_extract('Path', r'(neg/\d+_\d)', 1).\
            alias('id'), 'review')

In [3]:
df_neg.count()

12500

In [4]:
df_pos.select(fn.count('*').alias('total_num')).show()

+---------+
|total_num|
+---------+
|    12500|
+---------+



In [5]:
df_pos = df_pos.withColumn('score', fn.lit(1))
df_neg = df_neg.withColumn('score', fn.lit(0))

In [6]:
df = df_pos.union(df_neg)

In [7]:
df.show(5)

+-----------+--------------------+-----+
|         id|              review|score|
+-----------+--------------------+-----+
| pos/5525_8|A lot of the prob...|    1|
| pos/2923_1|Eddie Murphy real...|    1|
|pos/10450_1|It was by acciden...|    1|
|pos/10836_1|I thought this mo...|    1|
| pos/8180_8|this is an entert...|    1|
+-----------+--------------------+-----+
only showing top 5 rows



In [8]:
df_imdb = df.orderBy(fn.rand(10))

In [9]:
df_imdb.show(10)

+-----------+--------------------+-----+
|         id|              review|score|
+-----------+--------------------+-----+
|neg/12144_2|Although nothing ...|    0|
|  neg/638_2|The Good Earth is...|    0|
| neg/4737_3|I loved the first...|    0|
|neg/10011_3|This film is abou...|    0|
| neg/5659_4|A disappointing e...|    0|
| pos/2692_8|I enjoyed a lot w...|    1|
|neg/11930_2|Times are tough f...|    0|
| pos/4070_1|I know Anime. I'v...|    1|
|pos/12109_1|Yaitate!! Japan i...|    1|
| pos/2148_1|I always look for...|    1|
+-----------+--------------------+-----+
only showing top 10 rows



# Tokenize

In [10]:
from pyspark.ml.feature import RegexTokenizer
tk = RegexTokenizer().setGaps(False).setPattern('\\p{L}+').setInputCol('review').setOutputCol('words')

In [11]:
df_words = tk.transform(df)

In [12]:
df_words.show(5)

+-----------+--------------------+-----+--------------------+
|         id|              review|score|               words|
+-----------+--------------------+-----+--------------------+
| pos/5525_8|A lot of the prob...|    1|[a, lot, of, the,...|
| pos/2923_1|Eddie Murphy real...|    1|[eddie, murphy, r...|
|pos/10450_1|It was by acciden...|    1|[it, was, by, acc...|
|pos/10836_1|I thought this mo...|    1|[i, thought, this...|
| pos/8180_8|this is an entert...|    1|[this, is, an, en...|
+-----------+--------------------+-----+--------------------+
only showing top 5 rows



# Tfidf

In [19]:
from nltk.corpus import stopwords
from pyspark.ml.feature import StopWordsRemover, CountVectorizer, IDF
from pyspark.ml import Pipeline
stop_words = list(stopwords.words('english'))

sw_filter = StopWordsRemover().setStopWords(stop_words).\
            setCaseSensitive(False).setInputCol("words").setOutputCol("filtered")
cv = CountVectorizer(minTF=1., minDF=5., vocabSize=2000).setInputCol('filtered').setOutputCol('tf')
idf = IDF().setInputCol('tf').setOutputCol('tfidf')
pipeline_tfidf = Pipeline(stages=[tk, sw_filter, cv, idf]).fit(df_imdb)

In [21]:
tfidf = pipeline_tfidf.transform(df_imdb)
tfidf.show(5)

+-----------+--------------------+-----+--------------------+--------------------+--------------------+--------------------+
|         id|              review|score|               words|            filtered|                  tf|               tfidf|
+-----------+--------------------+-----+--------------------+--------------------+--------------------+--------------------+
|neg/12144_2|Although nothing ...|    0|[although, nothin...|[although, nothin...|(2000,[0,1,2,4,11...|(2000,[0,1,2,4,11...|
|  neg/638_2|The Good Earth is...|    0|[the, good, earth...|[good, earth, per...|(2000,[2,5,6,17,3...|(2000,[2,5,6,17,3...|
| neg/4737_3|I loved the first...|    0|[i, loved, the, f...|[loved, first, az...|(2000,[0,1,13,18,...|(2000,[0,1,13,18,...|
|neg/10011_3|This film is abou...|    0|[this, film, is, ...|[film, male, esco...|(2000,[0,2,7,8,13...|(2000,[0,2,7,8,13...|
| neg/5659_4|A disappointing e...|    0|[a, disappointing...|[disappointing, e...|(2000,[0,4,7,8,12...|(2000,[0,4,7,8,12...|


# Train Test Split

In [22]:
training_df, validation_df, testing_df = df_imdb.randomSplit([0.6, 0.3, 0.1], seed=10)
[training_df.count(), validation_df.count(), testing_df.count()]

[15123, 7377, 2500]

# Logistic regression

In [27]:
from pyspark.ml.classification import LogisticRegression

lr = LogisticRegression().\
    setLabelCol('score').\
    setFeaturesCol('tfidf').\
    setRegParam(0.0).\
    setMaxIter(10).\
    setElasticNetParam(0.)

lr_pipeline = Pipeline(stages=[pipeline_tfidf, lr]).fit(training_df)

In [28]:
lr_pipeline.transform(validation_df).\
    select(fn.expr('float(prediction = score)').alias('correct')).\
    select(fn.avg('correct')).show()

+-----------------+
|     avg(correct)|
+-----------------+
|0.851565677104514|
+-----------------+



In [29]:
lr_pipeline.transform(testing_df).\
    select(fn.expr('float(prediction = score)').alias('correct')).\
    select(fn.avg('correct')).show()

+------------+
|avg(correct)|
+------------+
|      0.8452|
+------------+



In [30]:
df.write.csv('imdbcsv.csv')

In [31]:
df.toPandas().to_csv('imdb.csv')