In [1]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-11-openjdk-amd64"
os.environ["SPARK_HOME"] = "/opt/spark-3.0.0-bin-hadoop2.7"

In [2]:
import findspark
findspark.init()

In [3]:
from pyspark.sql import SparkSession
spark=SparkSession.builder.master("local[*]").getOrCreate()
sc=spark.sparkContext

In [4]:
import numpy as np
#np.set_printoptions(threshold=sys.maxsize)

In [11]:
datos=(
    sc.textFile('../Base/BD_300.csv')
    .map(lambda s: s.split(','))
    .map(lambda col: col[0])
    )

In [12]:
datos.count()

301

In [13]:
datos.take(10)

['COMENTARIO',
 'EXCELENTE ESPACIO DE DISCUSION GRACIAS POR LA INVITACION',
 'EL PRESIDENTE SENALA QUE ES MUY IMPORTANTE QUE EXISTAN CAMPANAS DE ORIENTACION A LOS JOVENES PARA QUE NO SE CONSUMA DROGA PARA QUE NO AUMENTE EL CONSUMO',
 'POS CLARO',
 'MEXICO ARDE EL GOBIERNO ES RESPONSABLE TE INVITO A LEER MI COLUMNA',
 'EL PRIMER RECURSO DEBE SER SIEMPRE LA CONTENCION DE QUIEN ESTA ATRAVESANDO LA CURVA DE LA VIOLENCIA DESDE LO TECNICO DEPENDE EL CASO PUEDE SER GESTIONAR LA BAJA DEL CONTENIDO LESIVO DOCUMENTAR LA AGRESION TEJER REDES DE APOYO FRENTE A ATAQUES TROLLS',
 'ARMONIA',
 'LES QUERIA CONTAR QUE EN EL POR PRIMERA VEZ DESDE SU EXISTENCIA EL TALLER DE MUJERES CIENCIA Y TECNOLOGIA TUVO QUE DIVIDIRSE EN COMISIONES DEBIDO A LA CANTIDAD DE ASISTENTES ESTA IMAGEN ES TODO',
 'NOS RE ENCANTO LEAN LEAN LEAN TODA LA CUENTA DE ESTA GENIAL ESPANOLA ES DIGNA DE SER SEGUIDA GRACIAS Y SALUDOS DESDE BUENOS AIRES ARGENTINA',
 'MI PRESIDENTE ACABA DE DESTINAR MILLONES DE PESOS PARA EL BEISBOL Y DEJA

In [14]:
TotalMuestras = datos.count()# Son las filas que tiene con datos
print("Total de registros original: ", TotalMuestras)#imprimir en pantalla

Total de registros original:  301


In [15]:
#Para el proceso de etiquetado solo utilizaremos la columna del encabezado y borraremos el encabezado: 
rdd_datos = (
    datos
    .zipWithIndex()
    .filter(lambda col: col[1] > 0)
    .map(lambda col: col[0])
    )

In [16]:
#RDD que almacena las palabras separadas de la base.
rdd_separadas = rdd_datos.flatMap(lambda s:s.split(' '))

In [17]:
rdd_separadas.take(10)

['EXCELENTE',
 'ESPACIO',
 'DE',
 'DISCUSION',
 'GRACIAS',
 'POR',
 'LA',
 'INVITACION',
 'EL',
 'PRESIDENTE']

In [18]:
#Se agrega un 1 para contar cada palabra.
rdd_cont = rdd_separadas.map(lambda s: (s,1))

In [19]:
rdd_cont.take(10)

[('EXCELENTE', 1),
 ('ESPACIO', 1),
 ('DE', 1),
 ('DISCUSION', 1),
 ('GRACIAS', 1),
 ('POR', 1),
 ('LA', 1),
 ('INVITACION', 1),
 ('EL', 1),
 ('PRESIDENTE', 1)]

In [20]:
#Se guarda un archivo con las palabras separadas. 
rdd_cont.coalesce(1).saveAsTextFile('../resultados/300_com/palabras_diferentes')

In [21]:
rdd_cont.count()

6043

In [22]:
#Se suma cada palabra que aparece y se agrupan.
rdd_agrupadas = rdd_cont.reduceByKey(lambda a, b: a + b)

In [23]:
rdd_agrupadas.take(15)

[('ESPACIO', 3),
 ('DE', 352),
 ('GRACIAS', 13),
 ('POR', 84),
 ('LA', 233),
 ('INVITACION', 1),
 ('EL', 172),
 ('SENALA', 1),
 ('QUE', 170),
 ('ES', 68),
 ('MUY', 11),
 ('IMPORTANTE', 3),
 ('CAMPANAS', 1),
 ('ORIENTACION', 2),
 ('JOVENES', 2)]

In [24]:
#Total de Palabras diferentes.
rdd_agrupadas.count()

2236

In [25]:
#Se ordenan las palabras según las veces que aparecen de mayor a menor. 
rdd_ordenadas = rdd_agrupadas.sortBy(lambda s:s[1], False)

In [26]:
rdd_ordenadas.take(10)

[('DE', 352),
 ('LA', 233),
 ('EL', 172),
 ('QUE', 170),
 ('EN', 165),
 ('A', 151),
 ('Y', 148),
 ('NO', 96),
 ('POR', 84),
 ('PARA', 75)]

In [27]:
#Numero total de palabras que hay en el archivo
rdd_ordenadas.count()

2236

In [28]:
rdd_ordenadas.coalesce(1).saveAsTextFile('../resultados/300_com/no_palabras')

In [29]:
s = rdd_ordenadas.map(lambda x: x[0])

In [30]:
#Éste es el RDD que contiene solamente las palabras, ya órdenadas por cantidad de repetición.
s.take(5)

['DE', 'LA', 'EL', 'QUE', 'EN']

In [31]:
#Éste nuevo RDD sólo agrega el índice de las palabras pero, dicho índice, comienza en 0.
rdd_indice = s.zipWithIndex()

In [33]:
#Nuevo RDD con la palabra y el índice comenzando en 1.
rdd_palabras = rdd_indice.map(lambda x: (x[0], x[1] + 1))

In [34]:
rdd_palabras.take(10)

[('DE', 1),
 ('LA', 2),
 ('EL', 3),
 ('QUE', 4),
 ('EN', 5),
 ('A', 6),
 ('Y', 7),
 ('NO', 8),
 ('POR', 9),
 ('PARA', 10)]

In [35]:
rdd_palabras.coalesce(1).saveAsTextFile('../resultados/300_com/indice_300')