In [1]:
from pyspark.sql.types import *
from pyspark.sql.functions import monotonically_increasing_id, col, expr, when, concat, lit, isnan
from pyspark.ml.linalg import Vectors
from pyspark.ml.regression import GeneralizedLinearRegression
from pyspark.ml.classification import RandomForestClassifier, LogisticRegression
from pyspark.ml.feature import VectorIndexer, VectorAssembler, StringIndexer, OneHotEncoder
from pyspark.ml.evaluation import MulticlassClassificationEvaluator, RegressionEvaluator, BinaryClassificationEvaluator
from pyspark.ml import Pipeline
import pyspark

In [2]:
#Al ejecutar esta celda se demora un poco, así que un poco de paciencia
from pyspark.context import SparkContext
from pyspark.sql.session import SparkSession
sc = SparkContext('local', "Articles_topic_model") 
spark = SparkSession(sc)

In [3]:
#dfArticles = spark.read.format("csv").option("delimiter", ",").option("header",True).option("encoding", "UTF-8").load("file:///home/ubuntu/3ra_entrega_pyto_integrador/datasets/articles.csv")
#dfArticles.show(10)

In [4]:
dfArticles=spark.read.csv("file:///home/ubuntu/3ra_entrega_pyto_integrador/datasets/articles.csv", inferSchema=True, header=True, encoding="UTF-8")
dfArticles.show(10)

+--------------------+--------------------+--------------------+--------------------+--------------------+
|          identifier|               title|         description|             subject|             creator|
+--------------------+--------------------+--------------------+--------------------+--------------------+
|http://arxiv.org/...|Smooth R\'enyi En...|  We prove that t...|Quantum Physics ;...|Schoenmakers, Ber...|
|http://arxiv.org/...|Analyzing Design ...|  In the field of...|Computer Science ...|Brust, Matthias R...|
|http://arxiv.org/...|Colour image segm...|  We propose a ne...|Computer Science ...|Kay, David A ; To...|
|http://arxiv.org/...|Unequal Error Pro...|  An information ...|Computer Science ...|Borade, Shashi ; ...|
|http://arxiv.org/...|On the hitting ti...|  In this paper w...|Quantum Physics ;...|Magniez, Frederic...|
|http://arxiv.org/...|Coding Theory and...|  This chapter in...|Mathematics - Com...|   Huber, Michael ; |
|http://arxiv.org/...|Generating Rand

In [21]:
def countWords(record):
    textTitle  = record[1]
    textDescription = record[2]
    textCombined = textTitle + " " + textDescription
    words = textCombined.split()
    longitudTexto=len(words)
    print("Longitud: {}".format(str(longitudTexto)))
    return str(longitudTexto)

udf_countWords = udf(countWords, ArrayType(StringType()))
dfcountWords = dfArticles.withColumn("countWords", udf_countWords(struct([dfArticles[x] for x in dfArticles.columns])))

#Pendiente
#Verificar por qué no está realizando el conteo de las palabras de título y descripción

In [22]:
dfcountWords.show(10)

+--------------------+--------------------+--------------------+--------------------+--------------------+----------+
|          identifier|               title|         description|             subject|             creator|countWords|
+--------------------+--------------------+--------------------+--------------------+--------------------+----------+
|http://arxiv.org/...|Smooth R\'enyi En...|  We prove that t...|Quantum Physics ;...|Schoenmakers, Ber...|      null|
|http://arxiv.org/...|Analyzing Design ...|  In the field of...|Computer Science ...|Brust, Matthias R...|      null|
|http://arxiv.org/...|Colour image segm...|  We propose a ne...|Computer Science ...|Kay, David A ; To...|      null|
|http://arxiv.org/...|Unequal Error Pro...|  An information ...|Computer Science ...|Borade, Shashi ; ...|      null|
|http://arxiv.org/...|On the hitting ti...|  In this paper w...|Quantum Physics ;...|Magniez, Frederic...|      null|
|http://arxiv.org/...|Coding Theory and...|  This chapte

In [5]:
dfMainCols= dfArticles.select('identifier','title','description')
dfMainCols.show(10)

+--------------------+--------------------+--------------------+
|          identifier|               title|         description|
+--------------------+--------------------+--------------------+
|http://arxiv.org/...|Smooth R\'enyi En...|  We prove that t...|
|http://arxiv.org/...|Analyzing Design ...|  In the field of...|
|http://arxiv.org/...|Colour image segm...|  We propose a ne...|
|http://arxiv.org/...|Unequal Error Pro...|  An information ...|
|http://arxiv.org/...|On the hitting ti...|  In this paper w...|
|http://arxiv.org/...|Coding Theory and...|  This chapter in...|
|http://arxiv.org/...|Generating Random...|  Random graph ge...|
|http://arxiv.org/...|Variations on a t...|  Schalkwijk and ...|
|http://arxiv.org/...|Rotation Distance...|  Rotation distan...|
|http://arxiv.org/...|A Linear-Time App...|  Rotation distan...|
+--------------------+--------------------+--------------------+
only showing top 10 rows



In [6]:
#dfArticles.unpersist(True)
#dfArticles.show(10)

In [7]:
from pyspark.ml.feature import HashingTF, IDF, Tokenizer, CountVectorizer
from pyspark.sql.types import *
from pyspark.sql.functions import *
from pyspark.ml.linalg import Vectors, SparseVector
from pyspark.ml.clustering import LDA, BisectingKMeans
#from pyspark.sql.functions import monotonically_increasing_id
import re

In [8]:
import nltk
import pandas as pd
import numpy as np
import codecs
import matplotlib.pyplot as plt

In [23]:
# Default list of Stopwords
stopwords_core = ['a', u'about', u'above', u'after', u'again', u'against', u'all', u'am', u'an', u'and', u'any', 
                  u'are', u'arent', u'as', u'at', u'be', u'because', u'been', u'before', u'being', u'below', 
                  u'between', u'both', u'but', u'by',     u'can', 'cant', 'come', u'could', 'couldnt',u'd', u'did', 
                  u'didn', u'do', u'does', u'doesnt', u'doing', u'dont', u'down', u'during', u'each', u'few', u'finally', 
                  u'for', u'from', u'further', u'had', u'hadnt', u'has', u'hasnt', u'have', u'havent', u'having', u'he', 
                  u'her', u'here', u'hers', u'herself', u'him', u'himself', u'his', u'how', u'i', u'if', u'in', u'into', 
                  u'is', u'isnt', u'it', u'its', u'itself', u'just', u'll',u'm', u'me', u'might', u'more', u'most', u'must',
                  u'my', u'myself', u'no', u'nor', u'not', u'now', u'o', u'of', u'off', u'on', u'once', u'only', u'or', 
                  u'other', u'our', u'ours', u'ourselves', u'out', u'over', u'own', u'r', u're', u's', 'said', u'same', 
                  u'she', u'should', u'shouldnt', u'so', u'some', u'such', u't', u'than', u'that', 'thats', u'the', u'their',
                  u'theirs', u'them', u'themselves', u'then', u'there', u'these', u'they', u'this', u'those', u'through', 
                  u'to', u'too', u'under', u'until', u'up',u'very', u'was', u'wasnt', u'we', u'were', u'werent', u'what', 
                  u'when', u'where', u'which', u'while', u'who', u'whom', u'why', u'will', u'with', u'wont', u'would', u'y', 
                  u'you', u'your', u'yours', u'yourself', u'yourselves']
    
# Custom List of Stopwords - Add your own here
stopwords_custom = ['']
stopwords = stopwords_core + stopwords_custom
stopwords = [word.lower() for word in stopwords] 

def cleanup_text(record):
    textTitle  = record[1]
    textDescription = record[2]
    textCombined = textTitle + " " + textDescription
    words = textCombined.split()
    # Remove special characters
    text_out = [re.sub('[^a-zA-Z]',' ',word) for word in words]                                   
    text_out = [re.sub(' +',' ',word) for word in words] 
    # Remove stopwords and words under X length
    text_out = [word.lower() for word in text_out if len(word)>2 and word.lower() not in stopwords] 
    return text_out

#def cleanup_description(record):
#    text  = record[2]
#    words = text.split()
#    # Remove special characters
#    text_out = [re.sub('[^a-zA-Z]','',word) for word in words]                                   
#    # Remove stopwords and words under X length
#    text_out = [word.lower() for word in text_out if len(word)>2 and word.lower() not in stopwords] 
#    return text_out


def cleanup_filename(record):
    text  = record[0]
    #listFile = text.split("/")
    text_out = text
    return text_out

In [24]:
udf_tokensCombined = udf(cleanup_text, ArrayType(StringType()))
dfCombined = dfMainCols.withColumn("tokensCombined", udf_tokensCombined(struct([dfMainCols[x] for x in dfMainCols.columns])))   

#udf_tokensDescription = udf(cleanup_description, ArrayType(StringType()))
#dfDescription = dfMainCols.withColumn("tokensDescription", udf_tokensDescription(struct([dfMainCols[x] for x in dfMainCols.columns]))) 

In [25]:
# Está pendiente de extraer el nombre del archivo, que corresponde a los nueve caracteres a la derecha del campo
# identifier
#
#udf_filename = udf(cleanup_filename, ArrayType(StringType()))
#dfFilename1 = dfMainCols.withColumn("File", udf_filename(struct([dfMainCols[x] for x in dfMainCols.columns])))   

In [26]:
dfCombined.show(10)

+--------------------+--------------------+--------------------+--------------------+
|          identifier|               title|         description|      tokensCombined|
+--------------------+--------------------+--------------------+--------------------+
|http://arxiv.org/...|Smooth R\'enyi En...|  We prove that t...|[smooth, r\'enyi,...|
|http://arxiv.org/...|Analyzing Design ...|  In the field of...|[analyzing, desig...|
|http://arxiv.org/...|Colour image segm...|  We propose a ne...|[colour, image, s...|
|http://arxiv.org/...|Unequal Error Pro...|  An information ...|[unequal, error, ...|
|http://arxiv.org/...|On the hitting ti...|  In this paper w...|[hitting, times, ...|
|http://arxiv.org/...|Coding Theory and...|  This chapter in...|[coding, theory, ...|
|http://arxiv.org/...|Generating Random...|  Random graph ge...|[generating, rand...|
|http://arxiv.org/...|Variations on a t...|  Schalkwijk and ...|[variations, them...|
|http://arxiv.org/...|Rotation Distance...|  Rotation 

In [29]:
# Term Frequency Vectorization  - Option 2 (CountVectorizer)    : 
cvTokensCombined = CountVectorizer(inputCol="tokensCombined", outputCol="rawFeatures", vocabSize = 30000)
cvmodel = cvTokensCombined.fit(dfCombined)
featurizedData = cvmodel.transform(dfCombined)

vocab = cvmodel.vocabulary
vocab_broadcast = sc.broadcast(vocab)

idf = IDF(inputCol="rawFeatures", outputCol="features")
idfModel = idf.fit(featurizedData)
rescaledData = idfModel.transform(featurizedData)

In [30]:
# Generate 20 Data-Driven Topics:
lda = LDA(k=20, seed=123, optimizer="em", featuresCol="features")

ldamodel = lda.fit(rescaledData)

#model.isDistributed()
#model.vocabSize()

ldatopics = ldamodel.describeTopics()
ldatopics.show(20)

+-----+--------------------+--------------------+
|topic|         termIndices|         termWeights|
+-----+--------------------+--------------------+
|    0|[465, 3, 7, 116, ...|[0.00565330534278...|
|    1|[1025, 210, 671, ...|[0.00556476098788...|
|    2|[10, 110, 728, 60...|[0.00424725004773...|
|    3|[142, 3, 658, 27,...|[0.00437888825581...|
|    4|[309, 254, 13, 11...|[0.00631802624221...|
|    5|[112, 388, 51, 73...|[0.00621866499421...|
|    6|[190, 145, 606, 1...|[0.00413189548241...|
|    7|[89, 81, 566, 170...|[0.00429623292511...|
|    8|[3, 1888, 845, 10...|[0.00459273344608...|
|    9|[0, 145, 536, 981...|[0.00480623722493...|
|   10|[57, 1648, 3, 506...|[0.00793019693388...|
|   11|[13, 35, 233, 626...|[0.00442047813191...|
|   12|[584, 345, 916, 5...|[0.00667728003403...|
|   13|[1043, 237, 73, 1...|[0.00392779860293...|
|   14|[18, 292, 1008, 4...|[0.00864784909908...|
|   15|[227, 174, 175, 4...|[0.00380430206751...|
|   16|[321, 286, 808, 8...|[0.00614506469934...|


In [31]:
def map_termID_to_Word(termIndices):
    words = []
    for termID in termIndices:
        words.append(vocab_broadcast.value[termID])
    
    return words

udf_map_termID_to_Word = udf(map_termID_to_Word , ArrayType(StringType()))
ldatopics_mapped = ldatopics.withColumn("topic_desc", udf_map_termID_to_Word(ldatopics.termIndices))
ldatopics_mapped.select(ldatopics_mapped.topic, ldatopics_mapped.topic_desc).show(50,False)

+-----+-------------------------------------------------------------------------------------------------------+
|topic|topic_desc                                                                                             |
+-----+-------------------------------------------------------------------------------------------------------+
|0    |[video, data, model, selection, cancer, function, problem, linear, circuits, subspace]                 |
|1    |[rmt, neural, estimator, testing, network, spectral, networks, signal, facial, change]                 |
|2    |[time, control, grid, systems, type, semidefinite, dynamic, classical, problem, optimal]               |
|3    |[matrices, data, biological, networks, parsimonious, matrix, training, bayesian, curves, mixture]      |
|4    |[stable, edges, graph, dfs, mechanism, approximation, algorithm, counter-party, polytopes, design]     |
|5    |[sparse, automata, matrix, codes, finite, words, error, noiselet, decoding, measurement]         

In [33]:
ldaResults = ldamodel.transform(rescaledData)

ldaResults.select('identifier','title','description','tokensCombined','features','topicDistribution').show()

+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+
|          identifier|               title|         description|      tokensCombined|            features|   topicDistribution|
+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+
|http://arxiv.org/...|Smooth R\'enyi En...|  We prove that t...|[smooth, r\'enyi,...|(18042,[18,30,40,...|[0.02302087209979...|
|http://arxiv.org/...|Analyzing Design ...|  In the field of...|[analyzing, desig...|(18042,[8,12,14,1...|[0.01172652669764...|
|http://arxiv.org/...|Colour image segm...|  We propose a ne...|[colour, image, s...|(18042,[1,3,6,7,1...|[0.01616681587956...|
|http://arxiv.org/...|Unequal Error Pro...|  An information ...|[unequal, error, ...|(18042,[4,14,18,5...|[0.01283407763124...|
|http://arxiv.org/...|On the hitting ti...|  In this paper w...|[hitting, times, ...|(18042,[0,2,4,6,8..