In [1]:
import os
from pyspark import SparkContext, SparkConf
from pyspark.ml.feature import NGram
from pyspark.sql import SparkSession

os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
# os.environ["PYSPARK_PYTHON"]="/usr/bin/python3"
conf = SparkConf().setAppName('Spark DL Tabular Pipeline').setMaster('local[6]')
sc = SparkContext(conf=conf)
sql_context = SparkSession(sc)

In [2]:
# Load Data to Spark Dataframe
df = sql_context.read.csv('amazon_reviews.tsv',
                          header=True,
                          sep=r'\t',
                          inferSchema=True)
df.printSchema()



root
 |-- DOC_ID: integer (nullable = true)
 |-- LABEL: string (nullable = true)
 |-- RATING: integer (nullable = true)
 |-- VERIFIED_PURCHASE: string (nullable = true)
 |-- PRODUCT_CATEGORY: string (nullable = true)
 |-- PRODUCT_ID: string (nullable = true)
 |-- PRODUCT_TITLE: string (nullable = true)
 |-- REVIEW_TITLE: string (nullable = true)
 |-- REVIEW_TEXT: string (nullable = true)



In [3]:
df.describe().show()
from pyspark.sql.functions import col, lower, regexp_replace, split

# Convert to Lower Case, remove html tags
def clean_text(c):
  c = lower(c)
  c = regexp_replace(c, "<.*?>", "")
  # c = regexp_replace(c, "<.*?> ", "")
  return c

clean_text_df = df.select(clean_text(col("REVIEW_TEXT")).alias("REVIEW_TEXT"))

clean_text_df.printSchema()
clean_text_df.show(10)



+-------+-----------------+----------+-----------------+-----------------+----------------+--------------------+--------------------+--------------------+-----------------------------------+
|summary|           DOC_ID|     LABEL|           RATING|VERIFIED_PURCHASE|PRODUCT_CATEGORY|          PRODUCT_ID|       PRODUCT_TITLE|        REVIEW_TITLE|                        REVIEW_TEXT|
+-------+-----------------+----------+-----------------+-----------------+----------------+--------------------+--------------------+--------------------+-----------------------------------+
|  count|            21000|     21000|            21000|            21000|           21000|               21000|               21000|               21000|                              21000|
|   mean|          10500.5|      null|4.127952380952381|             null|            null|1.2152996572572742E9|                 9.0|              1421.0|                               null|
| stddev|6062.322162340104|      null|1.27833

In [4]:
from pyspark.ml.feature import NGram
from pyspark.ml.feature import Tokenizer, RegexTokenizer
from pyspark.sql.functions import col, udf
from pyspark.sql.types import IntegerType

tokenizer = Tokenizer(inputCol="REVIEW_TEXT", outputCol="words")

countTokens = udf(lambda words: len(words), IntegerType())

tokenized = tokenizer.transform(clean_text_df)
tokenized.select("REVIEW_TEXT", "words")\
    .withColumn("tokens", countTokens(col("words"))).show(truncate=False)




ngram = NGram(n=2, inputCol="words", outputCol="ngrams")

ngramDataFrame = ngram.transform(tokenized)
ngramDataFrame.select("ngrams").show(truncate=False)


+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

In [5]:
from pyspark.ml.feature import Word2Vec
from pyspark.ml import Pipeline
w2v = Word2Vec(vectorSize=300, minCount=0, inputCol="words", outputCol="features")
doc2vec_pipeline = Pipeline(stages=[w2v])
doc2vec_model = doc2vec_pipeline.fit(tokenized)
doc2vecs_df = doc2vec_model.transform(tokenized)
doc2vecs_df.show()

Py4JJavaError: An error occurred while calling o95.fit.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 11.0 failed 1 times, most recent failure: Lost task 0.0 in stage 11.0 (TID 21, charles-HP-Z420-Workstation.cable.rcn.com, executor driver): java.lang.OutOfMemoryError: Java heap space
	at java.util.Arrays.copyOf(Arrays.java:3236)
	at java.io.ByteArrayOutputStream.grow(ByteArrayOutputStream.java:118)
	at java.io.ByteArrayOutputStream.ensureCapacity(ByteArrayOutputStream.java:93)
	at java.io.ByteArrayOutputStream.write(ByteArrayOutputStream.java:153)
	at org.apache.spark.util.ByteBufferOutputStream.write(ByteBufferOutputStream.scala:41)
	at java.io.ObjectOutputStream$BlockDataOutputStream.drain(ObjectOutputStream.java:1877)
	at java.io.ObjectOutputStream$BlockDataOutputStream.setBlockDataMode(ObjectOutputStream.java:1786)
	at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1189)
	at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
	at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:44)
	at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:101)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:491)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2059)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2008)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2007)
	at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
	at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2007)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:973)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:973)
	at scala.Option.foreach(Option.scala:407)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:973)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2239)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2188)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2177)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:775)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2099)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2120)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2139)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2164)
	at org.apache.spark.rdd.RDD.$anonfun$collect$1(RDD.scala:1004)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:388)
	at org.apache.spark.rdd.RDD.collect(RDD.scala:1003)
	at org.apache.spark.mllib.feature.Word2Vec.$anonfun$doFit$6(Word2Vec.scala:453)
	at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:158)
	at org.apache.spark.mllib.feature.Word2Vec.doFit(Word2Vec.scala:362)
	at org.apache.spark.mllib.feature.Word2Vec.fit(Word2Vec.scala:322)
	at org.apache.spark.ml.feature.Word2Vec.fit(Word2Vec.scala:182)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:238)
	at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.OutOfMemoryError: Java heap space
	at java.util.Arrays.copyOf(Arrays.java:3236)
	at java.io.ByteArrayOutputStream.grow(ByteArrayOutputStream.java:118)
	at java.io.ByteArrayOutputStream.ensureCapacity(ByteArrayOutputStream.java:93)
	at java.io.ByteArrayOutputStream.write(ByteArrayOutputStream.java:153)
	at org.apache.spark.util.ByteBufferOutputStream.write(ByteBufferOutputStream.scala:41)
	at java.io.ObjectOutputStream$BlockDataOutputStream.drain(ObjectOutputStream.java:1877)
	at java.io.ObjectOutputStream$BlockDataOutputStream.setBlockDataMode(ObjectOutputStream.java:1786)
	at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1189)
	at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
	at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:44)
	at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:101)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:491)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	... 1 more


In [None]:
tokenized.head()