In [1]:
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName('proyecto-4-news').getOrCreate()

VBox()

Starting Spark application


ID,YARN Application ID,Kind,State,Spark UI,Driver log,Current session?
0,application_1574370755158_0001,pyspark,idle,Link,Link,✔


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

SparkSession available as 'spark'.


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [2]:
'''Load previously merged data'''

bucket = 's3://tet-project-3/data-csv/join/'

df = spark.read.csv(bucket + 'articles.csv',inferSchema=True,header=True)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [3]:
'''Check everything is OK'''

df.show(5)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+---+-----+--------------------+--------------+--------------------+----------+------+-----+----+--------------------+
|_c0|   id|               title|   publication|              author|      date|  year|month| url|             content|
+---+-----+--------------------+--------------+--------------------+----------+------+-----+----+--------------------+
|  0|17283|House Republicans...|New York Times|          Carl Hulse|2016-12-31|2016.0| 12.0|null|WASHINGTON  —   C...|
|  1|17284|Rift Between Offi...|New York Times|Benjamin Mueller ...|2017-06-19|2017.0|  6.0|null|After the bullet ...|
|  2|17285|Tyrus Wong, ‘Bamb...|New York Times|        Margalit Fox|2017-01-06|2017.0|  1.0|null|When Walt Disney’...|
|  3|17286|Among Deaths in 2...|New York Times|    William McDonald|2017-04-10|2017.0|  4.0|null|Death may be the ...|
|  4|17287|Kim Jong-un Says ...|New York Times|       Choe Sang-Hun|2017-01-02|2017.0|  1.0|null|SEOUL, South Kore...|
+---+-----+--------------------+--------------+-

In [4]:
'''Remove null values from content'''

from pyspark.sql.functions import when, col, coalesce, array

fill = array().cast("string")
null_free = coalesce(col("content"), fill)
df = df.withColumn("null_free_content", null_free)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [5]:
'''Tokenize content to prepare its data
   Here, We perform the following steps:
     1. Remove special characters [.,%()'"...]
     2. Remove Stop-Words
     3. Remove Words of length 1
'''

from pyspark.ml.feature import RegexTokenizer

tokenization = RegexTokenizer(
    minTokenLength=2,
    gaps=False,
    pattern='[A-Za-z0-9]+',
    inputCol='null_free_content',
    outputCol='tokens'
)

df = tokenization.transform(df)
df.printSchema()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

root
 |-- _c0: string (nullable = true)
 |-- id: string (nullable = true)
 |-- title: string (nullable = true)
 |-- publication: string (nullable = true)
 |-- author: string (nullable = true)
 |-- date: string (nullable = true)
 |-- year: string (nullable = true)
 |-- month: string (nullable = true)
 |-- url: string (nullable = true)
 |-- content: string (nullable = true)
 |-- null_free_content: string (nullable = false)
 |-- tokens: array (nullable = true)
 |    |-- element: string (containsNull = true)

In [6]:
'''Remove stopwords'''

from pyspark.ml.feature import StopWordsRemover

stopword_removal = StopWordsRemover(inputCol='tokens',outputCol='refined_tokens')
df = stopword_removal.transform(df)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [7]:
'''Show the result. End of the first phase (Data Preparation)'''

df.select('refined_tokens').show(5)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+--------------------+
|      refined_tokens|
+--------------------+
|[washington, cong...|
|[bullet, shells, ...|
|[walt, disney, ba...|
|[death, may, grea...|
|[seoul, south, ko...|
+--------------------+
only showing top 5 rows

In [8]:
'''Start of the Second Phase (NLP analytics).
   We will perform two actions with our data.
     1. Text grouping with k-means algorithm
     2. Topic modeling with the LDA algorithm

   We'll begin with the 2.
   We start importing some libraries
'''

'''Libraries for text processing'''
from pyspark.ml.feature import IDF, HashingTF

'''Libraries to build the model'''
from pyspark.ml.clustering import LDA

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [9]:
'''Build a TF-IDF matrix.
   TF
'''

hashingTF = HashingTF(inputCol="refined_tokens", outputCol="raw_features")
tf = hashingTF.transform(df)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [10]:
'''IDF'''

tf.cache()
idf = IDF(inputCol="raw_features", outputCol="features").fit(tf)
tfidf = idf.transform(tf)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [11]:
'''With all the data the model takes longer than the session to finish
   So I'm trying to do it with a small subset to see if it works that
   way. Since there won't be a time problem in the DCA, there we can
   use the full thing
'''

corpus = tfidf.filter(df['publication']=='New York Times').select('id', 'features')
corpus.show(5)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+-----+--------------------+
|   id|            features|
+-----+--------------------+
|17283|(262144,[619,2326...|
|17284|(262144,[14,343,1...|
|17285|(262144,[14,535,5...|
|17286|(262144,[75,535,7...|
|17287|(262144,[2472,328...|
+-----+--------------------+
only showing top 5 rows

In [12]:
'''Train and make the LDA model'''

corpus = tfidf.select('id', 'features')
lda = LDA(k=10, maxIter=10)
model = lda.fit(corpus)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [39]:
'''The topics described by their top-weighted terms, the results are not conclusive'''

topics = model.describeTopics(5)
print("The topics described by their top-weighted terms:")
topics.show(10, truncate=False)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

The topics described by their top-weighted terms:
+-----+----------------------------------------+-------------------------------------------------------------------------------------------------------------------+
|topic|termIndices                             |termWeights                                                                                                        |
+-----+----------------------------------------+-------------------------------------------------------------------------------------------------------------------+
|0    |[257347, 258162, 211756, 187621, 168976]|[0.002081512866048314, 0.0019332381799013387, 0.0016092524191497761, 0.0015469962032630825, 0.0012836316062428271] |
|1    |[114686, 7612, 250458, 161826, 90757]   |[0.0021301640296976835, 0.0018663553464767237, 0.0016027427234093798, 0.0015518331442499855, 0.0015438155983825623]|
|2    |[7612, 168976, 177029, 81379, 63045]    |[0.002976934225921496, 0.0029183591549375434, 0.0027380238160873032, 0.002292

In [14]:
vocabulary = tfidf.filter(tfidf['publication']=='New York Times').select('refined_tokens')
vocabulary.show(10)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+--------------------+
|      refined_tokens|
+--------------------+
|[washington, cong...|
|[bullet, shells, ...|
|[walt, disney, ba...|
|[death, may, grea...|
|[seoul, south, ko...|
|[london, queen, e...|
|[beijing, preside...|
|[danny, cahill, s...|
|[hillary, kerr, f...|
|[angels, everywhe...|
+--------------------+
only showing top 10 rows

In [15]:
vocabulary = vocabulary.rdd.map(lambda row : row.refined_tokens)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [16]:
vocabulary.take(2)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

[['washington', 'congressional', 'republicans', 'new', 'fear', 'comes', 'health', 'care', 'lawsuit', 'obama', 'administration', 'might', 'win', 'incoming', 'trump', 'administration', 'choose', 'longer', 'defend', 'executive', 'branch', 'suit', 'challenges', 'administration', 'authority', 'spend', 'billions', 'dollars', 'health', 'insurance', 'subsidies', 'americans', 'handing', 'house', 'republicans', 'big', 'victory', 'issues', 'sudden', 'loss', 'disputed', 'subsidies', 'conceivably', 'cause', 'health', 'care', 'program', 'implode', 'leaving', 'millions', 'people', 'without', 'access', 'health', 'insurance', 'republicans', 'prepared', 'replacement', 'lead', 'chaos', 'insurance', 'market', 'spur', 'political', 'backlash', 'republicans', 'gain', 'full', 'control', 'government', 'stave', 'outcome', 'republicans', 'find', 'awkward', 'position', 'appropriating', 'huge', 'sums', 'temporarily', 'prop', 'obama', 'health', 'care', 'law', 'angering', 'conservative', 'voters', 'demanding', 'end'

In [17]:
terms = vocabulary.collect()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [36]:
'''Function to map terms to indices.'''

def find_terms(indices):
    words = []
    for i in indices:
        count = 0
        for t in terms:
            distance = len(t)
            if count + distance > i:
                idx = i - count
                words.append(t[idx])
                break
            else:
                count += distance
    return words

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [40]:
'''Top five terms for topics 0-9, with this we confirm the results were not conclusive,
   since the terms don't really tell much'''

top0 = find_terms([257347, 258162, 211756, 187621, 168976])
top1 = find_terms([114686, 7612, 250458, 161826, 90757])
top2 = find_terms([7612, 168976, 177029, 81379, 63045])
top3 = find_terms([168976, 230634, 138895, 28951, 235700])
top4 = find_terms([7612, 69384, 36974, 168976, 226147])
top5 = find_terms([95592, 33182, 105411, 168976, 170698])
top6 = find_terms([116024, 87288, 7612, 36974, 234972])
top7 = find_terms([52871, 161826, 146227, 250458, 185155])
top8 = find_terms([21715, 205460, 182367, 168976, 257853])
top9 = find_terms([7612, 168976, 1769, 33182, 124361])

print(top0,top1,top2,top3,top4,top5,top6,top7,top8,top9)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

['percent', '73', 'cer', 'shelter', 'inside'] ['newark', 'show', 'half', 'races', 'belmonte'] ['show', 'inside', 'hannity', 'exercises', 'news'] ['inside', 'malpractice', 'called', 'sunday', 'beyond'] ['show', '000', 'dropped', 'inside', 'like'] ['wild', 'people', 'much', 'inside', 'scientists'] ['editors', 'russia', 'show', 'dropped', 'scientist'] ['new', 'races', 'secure', 'half', 'needs'] ['kurtulmus', 'relations', 'associated', 'inside', 'passages'] ['show', 'inside', 'ago', 'people', 'list']