# **Proyecto de NLP, Spark y Redes Neuronales con Python**

-------

## Importación de librerías, inicio de sesión en Spark y lectura de los datos.

In [1]:
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *
from pyspark.ml.feature import StopWordsRemover, Tokenizer, CountVectorizer, NGram, VectorAssembler 
from pyspark.ml.regression import RandomForestRegressor
from pyspark.ml.evaluation import RegressionEvaluator
from nltk.corpus import stopwords
import findspark
import pickle

In [2]:
findspark.init()
spark = SparkSession.builder.appName("ProyectoNLPSpark").getOrCreate()
data = spark.read.csv('../data/ruddit_comments_score.csv', header=True, inferSchema=True, sep = ",", multiLine=True)
data = data.withColumnRenamed("comment_id", "ID").withColumnRenamed("body", "Comentario").withColumnRenamed("score", "Puntuacion")

> ##### Estos son los datos con los que vamos a trabajar

In [3]:
data.show()

+--------------------+--------------------+----------+
|                  ID|          Comentario|Puntuacion|
+--------------------+--------------------+----------+
|             cza1q49|> The difference ...|    -0.083|
|             cza1wdh|"The myth is that...|    -0.022|
|             cza23qx|           [deleted]|     0.167|
|             cza2bw8|The assertion is ...|    -0.146|
|             cza2iji|You said in the O...|    -0.083|
|             cza2jj3|">Men and women a...|      null|
|Edit: Changed 70 ...|              -0.042|      null|
|             cza31e2|> All the wage ga...|    -0.021|
|             cza321d|           [deleted]|    -0.021|
|             cza336e|           [deleted]|     0.208|
|             cza34dq|           [deleted]|    -0.191|
|             cza3500|           [deleted]|    -0.229|
|             cza37ue|No, the point is ...|    -0.174|
|             cza3802|           [deleted]|     0.021|
|             cza392y|So women are paid...|    -0.229|
|         

In [4]:
data.printSchema()

root
 |-- ID: string (nullable = true)
 |-- Comentario: string (nullable = true)
 |-- Puntuacion: string (nullable = true)



##### Limpieza y transformación y análisis de los datos

In [5]:
data = data.withColumn("Puntuacion", data["Puntuacion"].cast("float"))
data.printSchema()

root
 |-- ID: string (nullable = true)
 |-- Comentario: string (nullable = true)
 |-- Puntuacion: float (nullable = true)



In [6]:
data = data.dropna()
data = data.filter(data["Comentario"] != "[deleted]")
data.show()

+-------+--------------------+----------+
|     ID|          Comentario|Puntuacion|
+-------+--------------------+----------+
|cza1q49|> The difference ...|    -0.083|
|cza1wdh|"The myth is that...|    -0.022|
|cza2bw8|The assertion is ...|    -0.146|
|cza2iji|You said in the O...|    -0.083|
|cza31e2|> All the wage ga...|    -0.021|
|cza37ue|No, the point is ...|    -0.174|
|cza392y|So women are paid...|    -0.229|
|cza3m1b|But obviously tha...|       0.0|
|cza3r5u|"I think that Hol...|     0.098|
|cza47sd|"> I don't think ...|    -0.083|
|cza47xu|I don't think the...|    -0.062|
|cza4d2a|> Women are a who...|    -0.062|
|cza4gsv|"The gist of my p...|    -0.021|
|cza4ldq|Biological differ...|    -0.083|
|cza5maz|> It's the differ...|    -0.188|
|cza6q74|>The fact of the ...|     0.083|
|cza6wrd|Well, if your wif...|    -0.104|
|cza76eq|Women have not sp...|     0.188|
|cza79u4|Doesn't it also m...|     0.175|
|cza7gpu|So you do believe...|       0.0|
+-------+--------------------+----

In [7]:
data.collect()[1][1]

'"The myth is that the ""gap"" is entirely based on the sex of the  person. "'

In [10]:
data = data.rdd.map(lambda x: (x[0],x[1][1:] if (x[1][0] == '"') else x[1],x[2])).toDF()
data = data.rdd.map(lambda x: (x[0],x[1][:-1] if (x[1][-1] == '"') else x[1],x[2])).toDF()
data = data.rdd.map(lambda x: (x[0],x[1][1:] if (x[1][0] == '>') else x[1],x[2])).toDF()
data = data.rdd.map(lambda x: (x[0],x[1][1:] if (x[1][0] == ' ') else x[1],x[2])).toDF()
data = data.rdd.map(lambda x: (x[0],x[1][:-1] if (x[1][-1] == ' ') else x[1],x[2])).toDF()
data = data.rdd.map(lambda x: (x[0],x[1][:-1] if (x[1][-1] == '.') else x[1],x[2])).toDF()
data = data.rdd.map(lambda x: (x[0],x[1].replace('""', '"'),x[2])).toDF()
data = data.rdd.map(lambda x: (x[0],x[1].replace('\n', ''),x[2])).toDF()
data = data.rdd.map(lambda x: (x[0],x[1].replace('\t', ''),x[2])).toDF()
data.show()

Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.runJob.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 19.0 failed 1 times, most recent failure: Lost task 0.0 in stage 19.0 (TID 19) (host.docker.internal executor driver): java.net.SocketException: Connection reset by peer
	at java.base/sun.nio.ch.NioSocketImpl.implWrite(NioSocketImpl.java:413)
	at java.base/sun.nio.ch.NioSocketImpl.write(NioSocketImpl.java:433)
	at java.base/sun.nio.ch.NioSocketImpl$2.write(NioSocketImpl.java:812)
	at java.base/java.net.Socket$SocketOutputStream.write(Socket.java:1120)
	at java.base/java.io.BufferedOutputStream.implWrite(BufferedOutputStream.java:216)
	at java.base/java.io.BufferedOutputStream.write(BufferedOutputStream.java:199)
	at java.base/java.io.DataOutputStream.write(DataOutputStream.java:112)
	at java.base/java.io.FilterOutputStream.write(FilterOutputStream.java:108)
	at org.apache.spark.api.python.PythonRDD$.write$1(PythonRDD.scala:295)
	at org.apache.spark.api.python.PythonRDD$.$anonfun$writeIteratorToStream$1(PythonRDD.scala:307)
	at org.apache.spark.api.python.PythonRDD$.$anonfun$writeIteratorToStream$1$adapted(PythonRDD.scala:307)
	at scala.collection.Iterator.foreach(Iterator.scala:943)
	at scala.collection.Iterator.foreach$(Iterator.scala:943)
	at org.apache.spark.api.python.SerDeUtil$AutoBatchedPickler.foreach(SerDeUtil.scala:80)
	at org.apache.spark.api.python.PythonRDD$.writeIteratorToStream(PythonRDD.scala:307)
	at org.apache.spark.api.python.PythonRunner$$anon$2.writeIteratorToStream(PythonRunner.scala:732)
	at org.apache.spark.api.python.BasePythonRunner$WriterThread.$anonfun$run$1(PythonRunner.scala:438)
	at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:2066)
	at org.apache.spark.api.python.BasePythonRunner$WriterThread.run(PythonRunner.scala:272)

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2672)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2608)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2607)
	at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
	at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2607)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1182)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1182)
	at scala.Option.foreach(Option.scala:407)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1182)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2860)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2802)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2791)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:952)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2228)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2249)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2268)
	at org.apache.spark.api.python.PythonRDD$.runJob(PythonRDD.scala:166)
	at org.apache.spark.api.python.PythonRDD.runJob(PythonRDD.scala)
	at java.base/jdk.internal.reflect.DirectMethodHandleAccessor.invoke(DirectMethodHandleAccessor.java:104)
	at java.base/java.lang.reflect.Method.invoke(Method.java:578)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
	at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
	at java.base/java.lang.Thread.run(Thread.java:1589)
Caused by: java.net.SocketException: Connection reset by peer
	at java.base/sun.nio.ch.NioSocketImpl.implWrite(NioSocketImpl.java:413)
	at java.base/sun.nio.ch.NioSocketImpl.write(NioSocketImpl.java:433)
	at java.base/sun.nio.ch.NioSocketImpl$2.write(NioSocketImpl.java:812)
	at java.base/java.net.Socket$SocketOutputStream.write(Socket.java:1120)
	at java.base/java.io.BufferedOutputStream.implWrite(BufferedOutputStream.java:216)
	at java.base/java.io.BufferedOutputStream.write(BufferedOutputStream.java:199)
	at java.base/java.io.DataOutputStream.write(DataOutputStream.java:112)
	at java.base/java.io.FilterOutputStream.write(FilterOutputStream.java:108)
	at org.apache.spark.api.python.PythonRDD$.write$1(PythonRDD.scala:295)
	at org.apache.spark.api.python.PythonRDD$.$anonfun$writeIteratorToStream$1(PythonRDD.scala:307)
	at org.apache.spark.api.python.PythonRDD$.$anonfun$writeIteratorToStream$1$adapted(PythonRDD.scala:307)
	at scala.collection.Iterator.foreach(Iterator.scala:943)
	at scala.collection.Iterator.foreach$(Iterator.scala:943)
	at org.apache.spark.api.python.SerDeUtil$AutoBatchedPickler.foreach(SerDeUtil.scala:80)
	at org.apache.spark.api.python.PythonRDD$.writeIteratorToStream(PythonRDD.scala:307)
	at org.apache.spark.api.python.PythonRunner$$anon$2.writeIteratorToStream(PythonRunner.scala:732)
	at org.apache.spark.api.python.BasePythonRunner$WriterThread.$anonfun$run$1(PythonRunner.scala:438)
	at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:2066)
	at org.apache.spark.api.python.BasePythonRunner$WriterThread.run(PythonRunner.scala:272)


In [None]:
data.collect()[1][1]

'The myth is that the "gap" is entirely based on the sex of the  person'

In [None]:
data = data.withColumnRenamed("_1", "ID").withColumnRenamed("_2", "Comentario").withColumnRenamed("_3", "Puntuacion")

In [None]:
data.count()

5422

In [None]:
data.show()

In [14]:
data = data.withColumn("LongitudLetras", length(data["Comentario"]))

In [15]:
stopwords.words('english')[:10]

['i', 'me', 'my', 'myself', 'we', 'our', 'ours', 'ourselves', 'you', "you're"]

In [16]:
data = Tokenizer(inputCol="Comentario", outputCol="ComenToken").transform(data).drop("Comentario")

In [17]:
data = StopWordsRemover(stopWords=stopwords.words('english'), inputCol="ComenToken", outputCol="ComenTokenLimpio").transform(data).drop("ComenToken")

In [18]:
data = CountVectorizer(inputCol="ComenTokenLimpio", outputCol="ConteoPalabras").fit(data).transform(data)

In [19]:
data.show()

+-------+--------------------+--------------+--------------------+--------------------+
|     ID|          Puntuacion|LongitudLetras|    ComenTokenLimpio|      ConteoPalabras|
+-------+--------------------+--------------+--------------------+--------------------+
|cza1q49|-0.08299999684095383|           171|[difference, aver...|(23209,[36,50,68,...|
|cza1wdh|-0.02199999988079071|            70|[myth, "gap", ent...|(23209,[0,32,59,1...|
|cza2bw8| -0.1459999978542328|           115|[assertion, women...|(23209,[4,36,56,2...|
|cza2iji|-0.08299999684095383|           160|[said, op, that's...|(23209,[10,25,36,...|
|cza31e2|-0.02099999971687...|           476|[wage, gap, is,, ...|(23209,[1,5,12,25...|
|cza37ue|-0.17399999499320984|            62|[no,, point, talk...|(23209,[61,167,31...|
|cza392y| -0.2290000021457672|            40|[women, paid, les...|(23209,[36,56,274...|
|cza3m1b|                 0.0|           377|[obviously, make,...|(23209,[1,8,10,11...|
|cza3r5u| 0.09799999743700027|  

In [20]:
data.select([max(data["Puntuacion"]), min(data["Puntuacion"])]).show()

+------------------+-------------------+
|   max(Puntuacion)|    min(Puntuacion)|
+------------------+-------------------+
|0.9789999723434448|-0.8889999985694885|
+------------------+-------------------+



In [21]:
data.select([max(data["LongitudLetras"]), min(data["LongitudLetras"])]).show()

+-------------------+-------------------+
|max(LongitudLetras)|min(LongitudLetras)|
+-------------------+-------------------+
|                913|                  9|
+-------------------+-------------------+



In [22]:
data.select("ConteoPalabras").show()

+--------------------+
|      ConteoPalabras|
+--------------------+
|(23209,[36,50,68,...|
|(23209,[0,32,59,1...|
|(23209,[4,36,56,2...|
|(23209,[10,25,36,...|
|(23209,[1,5,12,25...|
|(23209,[61,167,31...|
|(23209,[36,56,274...|
|(23209,[1,8,10,11...|
|(23209,[5,38,40,6...|
|(23209,[1,5,12,26...|
|(23209,[3,5,12,14...|
|(23209,[12,20,36,...|
|(23209,[10,132,14...|
|(23209,[13,22,93,...|
|(23209,[54,56,66,...|
|(23209,[11,12,13,...|
|(23209,[10,12,18,...|
|(23209,[3,20,36,3...|
|(23209,[11,16,26,...|
|(23209,[0,2,36,64...|
+--------------------+
only showing top 20 rows



In [23]:
data.show()

+-------+--------------------+--------------+--------------------+--------------------+
|     ID|          Puntuacion|LongitudLetras|    ComenTokenLimpio|      ConteoPalabras|
+-------+--------------------+--------------+--------------------+--------------------+
|cza1q49|-0.08299999684095383|           171|[difference, aver...|(23209,[36,50,68,...|
|cza1wdh|-0.02199999988079071|            70|[myth, "gap", ent...|(23209,[0,32,59,1...|
|cza2bw8| -0.1459999978542328|           115|[assertion, women...|(23209,[4,36,56,2...|
|cza2iji|-0.08299999684095383|           160|[said, op, that's...|(23209,[10,25,36,...|
|cza31e2|-0.02099999971687...|           476|[wage, gap, is,, ...|(23209,[1,5,12,25...|
|cza37ue|-0.17399999499320984|            62|[no,, point, talk...|(23209,[61,167,31...|
|cza392y| -0.2290000021457672|            40|[women, paid, les...|(23209,[36,56,274...|
|cza3m1b|                 0.0|           377|[obviously, make,...|(23209,[1,8,10,11...|
|cza3r5u| 0.09799999743700027|  

In [24]:
data = NGram(n = 2, inputCol="ComenTokenLimpio", outputCol="NGram").transform(data)

In [25]:
data = CountVectorizer(inputCol="NGram", outputCol="ConteoNGram").fit(data).transform(data)

In [26]:
data.show()

+-------+--------------------+--------------+--------------------+--------------------+--------------------+--------------------+
|     ID|          Puntuacion|LongitudLetras|    ComenTokenLimpio|      ConteoPalabras|               NGram|         ConteoNGram|
+-------+--------------------+--------------+--------------------+--------------------+--------------------+--------------------+
|cza1q49|-0.08299999684095383|           171|[difference, aver...|(23209,[36,50,68,...|[difference avera...|(82420,[98,3321,7...|
|cza1wdh|-0.02199999988079071|            70|[myth, "gap", ent...|(23209,[0,32,59,1...|[myth "gap", "gap...|(82420,[1006,1118...|
|cza2bw8| -0.1459999978542328|           115|[assertion, women...|(23209,[4,36,56,2...|[assertion women,...|(82420,[126,721,4...|
|cza2iji|-0.08299999684095383|           160|[said, op, that's...|(23209,[10,25,36,...|[said op, op that...|(82420,[3084,3856...|
|cza31e2|-0.02099999971687...|           476|[wage, gap, is,, ...|(23209,[1,5,12,25...|[wa

In [27]:
feature_assembler = VectorAssembler(inputCols=["LongitudLetras", "ConteoPalabras"], outputCol="CaracteristicasIndependientes")

In [28]:
output = feature_assembler.transform(data)

In [29]:
FinalData = output.select("Puntuacion","CaracteristicasIndependientes")

In [30]:
FinalData.show()

+--------------------+-----------------------------+
|          Puntuacion|CaracteristicasIndependientes|
+--------------------+-----------------------------+
|-0.08299999684095383|         (23210,[0,37,51,6...|
|-0.02199999988079071|         (23210,[0,1,33,60...|
| -0.1459999978542328|         (23210,[0,5,37,57...|
|-0.08299999684095383|         (23210,[0,11,26,3...|
|-0.02099999971687...|         (23210,[0,2,6,13,...|
|-0.17399999499320984|         (23210,[0,62,168,...|
| -0.2290000021457672|         (23210,[0,37,57,2...|
|                 0.0|         (23210,[0,2,9,11,...|
| 0.09799999743700027|         (23210,[0,6,39,41...|
|-0.08299999684095383|         (23210,[0,2,6,13,...|
|-0.06199999898672104|         (23210,[0,4,6,13,...|
|-0.06199999898672104|         (23210,[0,13,21,3...|
|-0.02099999971687...|         (23210,[0,11,133,...|
|-0.08299999684095383|         (23210,[0,14,23,9...|
|-0.18799999356269836|         (23210,[0,55,57,6...|
| 0.08299999684095383|         (23210,[0,12,13

In [31]:
train, test = FinalData.randomSplit([0.75, 0.25])

## Creación del modelo y predicción de datos

In [32]:
Model1 = RandomForestRegressor(featuresCol = "CaracteristicasIndependientes", labelCol="Puntuacion")

In [33]:
Model1 = Model1.fit(train)

In [34]:
predicciones = Model1.transform(test)

In [35]:
print("RMSE: %f" % RegressionEvaluator(labelCol="Puntuacion", predictionCol="prediction", metricName="rmse").evaluate(predicciones))
print("MSE: %f" % RegressionEvaluator(labelCol="Puntuacion", predictionCol="prediction", metricName="mse").evaluate(predicciones))
print("MAE: %f" % RegressionEvaluator(labelCol="Puntuacion", predictionCol="prediction", metricName="mae").evaluate(predicciones))
print("r2: %f" % RegressionEvaluator(labelCol="Puntuacion", predictionCol="prediction", metricName="r2").evaluate(predicciones))

RMSE: 0.263799
MSE: 0.069590
MAE: 0.208927
r2: 0.370163


In [36]:
data.select("Puntuacion").describe().show()

+-------+--------------------+
|summary|          Puntuacion|
+-------+--------------------+
|  count|                5422|
|   mean|-0.03671855389644512|
| stddev|  0.3350070193314698|
|    min| -0.8889999985694885|
|    max|  0.9789999723434448|
+-------+--------------------+

