# Topic Model Notebook
Author: Andrew  

This notebook will outline the steps used when cleaning the raw articles from Towards Data Science. 

In [1]:
# Load Libraries
import pyspark

In [2]:
# start SparkSession
spark = pyspark.sql.SparkSession.builder.getOrCreate()
spark.getActiveSession()

## Clean Data

In [3]:
# Load CSV into Spark

from pyspark.sql.types import *
import pandas as pd

df = pd.read_csv('../src/TDS_articles.csv', index_col=0)

mySchema = StructType([ StructField("title", StringType(), True)\
                       ,StructField("subtitle", StringType(), True)\
                       ,StructField("author", StringType(), True)\
                       ,StructField("date", StringType(), True)\
                       ,StructField("body", StringType(), True)\
                       ,StructField("link", StringType(), True)\
                       ,StructField("article_id", IntegerType(), True)])

articles = spark.createDataFrame(df, schema=mySchema)

In [4]:
# Delete the pandas df to save memory
del df

In [5]:
articles.show()

+--------------------+--------------------+--------------------+----------+--------------------+--------------------+----------+
|               title|            subtitle|              author|      date|                body|                link|article_id|
+--------------------+--------------------+--------------------+----------+--------------------+--------------------+----------+
|Lessons Learned F...| Be prepared to code|     John Wittenauer|2014-11-25|This content orig...|https://towardsda...|         1|
|   The Next Big Wave|                 NaN|       Salman Naseer|2017-03-03|IoT, Big Data, M2...|https://towardsda...|      1580|
|Thinking about Da...|What might DSaaP ...|       Chris Dowsett|2016-05-29|The “usability of...|https://towardsda...|      1657|
|The Science / Eng...|                 NaN|          Jenny Kwan|2015-07-13|As I wrote about ...|https://towardsda...|      1710|
|So You Want to be...|                 NaN|          Jenny Kwan|2015-07-13|We could discuss ...|h

In [5]:
# register SQL table
articles.registerTempTable('articles')

In [6]:
query = """
SELECT * FROM articles;
"""
spark.sql(query).show()

+--------------------+--------------------+--------------------+----------+--------------------+--------------------+----------+
|               title|            subtitle|              author|      date|                body|                link|article_id|
+--------------------+--------------------+--------------------+----------+--------------------+--------------------+----------+
|Lessons Learned F...| Be prepared to code|     John Wittenauer|2014-11-25|This content orig...|https://towardsda...|         1|
|   The Next Big Wave|                 NaN|       Salman Naseer|2017-03-03|IoT, Big Data, M2...|https://towardsda...|      1580|
|Thinking about Da...|What might DSaaP ...|       Chris Dowsett|2016-05-29|The “usability of...|https://towardsda...|      1657|
|The Science / Eng...|                 NaN|          Jenny Kwan|2015-07-13|As I wrote about ...|https://towardsda...|      1710|
|So You Want to be...|                 NaN|          Jenny Kwan|2015-07-13|We could discuss ...|h

In [8]:
query = """
SELECT COUNT(subtitle)
FROM articles
WHERE subtitle like CONCAT('%', author, '%');
"""

spark.sql(query).show()

+---------------+
|count(subtitle)|
+---------------+
|             85|
+---------------+



Some of the Authors are replicated in the subtitles. 

In [9]:
query = """
SELECT subtitle, author
FROM articles
WHERE subtitle like CONCAT('%', author, '%');
"""

spark.sql(query).show()

+--------------------+--------------------+
|            subtitle|              author|
+--------------------+--------------------+
|by Jose Marcial P...|Jose Marcial Port...|
|ashispapu (Ashis ...|         Ashis Samal|
|By Eli Bildner, E...|         Eli Bildner|
|Using Machine Lea...|          DeviceHive|
|RecSys Week 1: Th...|                   R|
|WHAT DO WE DO TO ...|                   T|
|Sukant Khurana (@...|      Sukant Khurana|
|Sukant Khurana (@...|      Sukant Khurana|
|                 NaN|                 NaN|
|                 NaN|                 NaN|
|Co-Authors: Konst...|  Konstantinos Bozas|
|                 NaN|                 NaN|
|Naveen Manwani - ...|      Naveen Manwani|
|By Zina Akrout, S...|     Samantha Bansil|
|Applying François...|                   A|
|You will soon be ...|                   Y|
|Laurent El Ghaoui...|   Laurent El Ghaoui|
|By Werlindo Mangr...|          Mia Iseman|
|Written by Vivian...| Viviane Lindenbergh|
|Rocket (Data) Sci...|         Y

In [5]:
# Register cleaning function as UDF 
from cleaning import clean_doc
from pyspark.sql.functions import udf

clean_udf = udf(lambda doc: clean_doc(doc), StringType())

In [6]:
# Clean "body" with udf
clean_df = articles.withColumn("clean_body", clean_udf("body"))

In [7]:
clean_df.show(5)

+--------------------+--------------------+---------------+----------+--------------------+--------------------+----------+--------------------+
|               title|            subtitle|         author|      date|                body|                link|article_id|          clean_body|
+--------------------+--------------------+---------------+----------+--------------------+--------------------+----------+--------------------+
|Lessons Learned F...| Be prepared to code|John Wittenauer|2014-11-25|This content orig...|https://towardsda...|         1|content originall...|
|   The Next Big Wave|                 NaN|  Salman Naseer|2017-03-03|IoT, Big Data, M2...|https://towardsda...|      1580|iot big data m m ...|
|Thinking about Da...|What might DSaaP ...|  Chris Dowsett|2016-05-29|The “usability of...|https://towardsda...|      1657|usability datum a...|
|The Science / Eng...|                 NaN|     Jenny Kwan|2015-07-13|As I wrote about ...|https://towardsda...|      1710|write p

In [10]:
clean_df.printSchema()

root
 |-- title: string (nullable = true)
 |-- subtitle: string (nullable = true)
 |-- author: string (nullable = true)
 |-- date: string (nullable = true)
 |-- body: string (nullable = true)
 |-- link: string (nullable = true)
 |-- article_id: integer (nullable = true)
 |-- clean_body: string (nullable = true)



## Preprocessing - Get Document Term Matrix (dtm) 

In [8]:
from pyspark.ml.feature import Tokenizer
from pyspark.ml import Pipeline
from pyspark.ml.feature import CountVectorizer
from pyspark.ml.clustering import LDA


tk = Tokenizer(inputCol='clean_body', outputCol='tokens')
vectorizer = CountVectorizer(inputCol=tk.getOutputCol(), outputCol='term_freq')
lda_model = LDA()


In [10]:
pipe = Pipeline(stages=[tk, vectorizer, lda_model])

In [9]:
clean_df = tk.transform(clean_df)

In [11]:
clean_df.select('tokens').show(5)

+--------------------+
|              tokens|
+--------------------+
|[content, origina...|
|[iot, big, data, ...|
|[usability, datum...|
|[write, previousl...|
|[discuss, suspect...|
+--------------------+
only showing top 5 rows



In [12]:
vectorizer.fit(clean_df)

Py4JJavaError: An error occurred while calling o64.fit.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 1.0 failed 1 times, most recent failure: Lost task 0.0 in stage 1.0 (TID 1, andrews-mbp.fios-router.home, executor driver): java.lang.OutOfMemoryError: Java heap space

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2059)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2008)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2007)
	at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
	at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2007)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:973)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:973)
	at scala.Option.foreach(Option.scala:407)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:973)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2239)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2188)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2177)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:775)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2099)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2120)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2139)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2164)
	at org.apache.spark.rdd.RDD.count(RDD.scala:1227)
	at org.apache.spark.ml.feature.CountVectorizer.fit(CountVectorizer.scala:233)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:64)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.base/java.lang.reflect.Method.invoke(Method.java:564)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:238)
	at java.base/java.lang.Thread.run(Thread.java:832)
Caused by: java.lang.OutOfMemoryError: Java heap space


In [9]:
cv_model = vectorizer.fit(clean_df)
dtm = cv_model.transform(clean_df)

nvs/spark/lib/python3.8/site-packages/py4j/java_gateway.py", line 985, in _create_connection
    connection.start()
  File "/Users/andrew/opt/anaconda3/envs/spark/lib/python3.8/site-packages/py4j/java_gateway.py", line 1127, in start
    raise Py4JNetworkError(msg, e)
py4j.protocol.Py4JNetworkError: An error occurred while trying to connect to the Java server (127.0.0.1:51833)

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/Users/andrew/opt/anaconda3/envs/spark/lib/python3.8/site-packages/IPython/core/interactiveshell.py", line 2045, in showtraceback
    stb = value._render_traceback_()
AttributeError: 'Py4JNetworkError' object has no attribute '_render_traceback_'

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/Users/andrew/opt/anaconda3/envs/spark/lib/python3.8/site-packages/py4j/java_gateway.py", line 977, in _get_connection
    connection = self.deque.

Py4JNetworkError: An error occurred while trying to connect to the Java server (127.0.0.1:51833)

In [None]:
dtm.show(5)