In [1]:
from IPython.core.display import display, HTML
display(HTML("<style>.container { width:100% !important; }</style>"))

In [2]:
import sparknlp
sparknlp.start()

In [3]:
from sparknlp.base import Finisher, DocumentAssembler
from sparknlp.annotator import (Tokenizer, Normalizer,
                                LemmatizerModel, StopWordsCleaner)
from pyspark.ml import Pipeline
from pyspark.sql.types import *
from pyspark.sql.functions import concat, col, lit, spark_partition_id, count


In [4]:
#s3 bucket location of the csv file
input_path = 's3://largedatabucket/*.csv'

In [5]:
#Manually set the schema for the csv file instead of inferring
schema = StructType([
    StructField("_c0", IntegerType()),
    StructField("Unnamed: 0", IntegerType()),
    StructField("date", DateType()),
    StructField("year", IntegerType()),
    StructField("month", FloatType()),
    StructField("day", IntegerType()),
    StructField("author", StringType()),
    StructField("title", StringType()),
    StructField("article", StringType()),
    StructField("url", StringType()),
    StructField("section", StringType()),
    StructField("publication", StringType()),
])

In [6]:
df = spark.read.option("quote", "\"").option("escape", "\"").option("encoding", "UTF-8").csv(input_path, schema=schema, header='true').withColumnRenamed("Unnamed: 0","uid")

In [7]:
df.show(10)

+----+----+----------+----+-----+----+-----------------+--------------------+--------------------+--------------------+----------+-----------+
| _c0| uid|      date|year|month| day|           author|               title|             article|                 url|   section|publication|
+----+----+----------+----+-----+----+-----------------+--------------------+--------------------+--------------------+----------+-----------+
|   0|   0|2016-12-09|2016| 12.0|   9|      Lee Drutman|We should take co...|This post is part...|https://www.vox.c...|      null|        Vox|
|   1|   1|2016-10-07|2016| 10.0|   7|      Scott Davis|Colts GM Ryan Gri...| The Indianapolis...|                null|      null|       null|
|null|null|      null|null| null|null|             null|                null|                null|                null|      null|       null|
|   2|   2|2018-01-26|2018|  1.0|  26|             null|Trump denies repo...|DAVOS, Switzerlan...|https://www.reute...|     Davos|    Reuters|

In [8]:
df.count()

3858240

In [9]:
#create a temp table to query and clean
df.createOrReplaceTempView('articles')

In [10]:
#select data that only has actual articles
df = spark.sql('''SELECT uid,
                         date,
                         year,
                         month,
                         day,
                         author,
                         title,
                         article,
                         url,
                         publication
                   FROM articles 
                   WHERE article IS NOT NULL''')

#fill in missing information
df = df.fillna({'date':'1970-01-01 00:00:00',
                'year':1970,
                'month':1.0,
                'day':1.0,
                'author':'missing',
                'title':'missing',
                'url':'missing',
                'publication':'missing'})

In [11]:
df.createOrReplaceTempView('articles')

In [12]:
#get only data existing in the 2000s
df = spark.sql('''
                    SELECT *
                    FROM articles
                    WHERE year >= 2000
                    AND year <2021

               ''')

In [13]:
print('Total Articles: ', df.count())

Total Articles:  2581694


In [14]:
df.show(10)

+---+----------+----+-----+---+-----------------+--------------------+--------------------+--------------------+-----------+
|uid|      date|year|month|day|           author|               title|             article|                 url|publication|
+---+----------+----+-----+---+-----------------+--------------------+--------------------+--------------------+-----------+
|  0|2016-12-09|2016| 12.0|  9|      Lee Drutman|We should take co...|This post is part...|https://www.vox.c...|        Vox|
|  1|2016-10-07|2016| 10.0|  7|      Scott Davis|Colts GM Ryan Gri...| The Indianapolis...|             missing|    missing|
|  2|2018-01-26|2018|  1.0| 26|          missing|Trump denies repo...|DAVOS, Switzerlan...|https://www.reute...|    Reuters|
|  3|2019-06-27|2019|  6.0| 27|          missing|France's Sarkozy ...|PARIS (Reuters) -...|https://www.reute...|    Reuters|
|  4|2016-01-27|2016|  1.0| 27|          missing|Paris Hilton: Wom...|Paris Hilton arri...|https://www.tmz.c...|        TMZ|


In [15]:
#set manual stop words from NLTK stopwords
stopwords = ['a','about','above','after','again','against','ain','all','also','am','an','and','any','are','aren',"aren't",'as','at','be','because','been','before','being','below','between','both','breitbart','but',
 'by','can','cnn','could','couldn',"couldn't",'d','dent','did','didn',"didn't",'didnt','do','does','doesn',"doesn't",'doing','don',"don't",'dont','down','during','each','edu','few','for','fox','from',
 'further','get','go','going','had','hadn',"hadn't",'has','hasn',"hasn't",'have','haven',"haven't",'having','he','her','here','hers','herself','him','himself','his','how','i','if','in','into','is','isn',
 "isn't",'it',"it's",'its','itself','just','kind','like','ll','m','ma','maybe','me','mightn',"mightn't",'more','most','mustn',"mustn't",'my','myself','needn',"needn't",'no','nor','not','now','o','of',
 'off','on','once','only','or','other','our','ours','ourselves','out','over','own','re','s','said','same','say','says','shan',"shan't",'she',"she's",'should',"should've",'shouldn',"shouldn't",'so','some',
 'still','subject','such','t','than','that',"that'll",'thats','the','their','theirs','them','themselves','then','there','theres','these','they','thing','things','think','this','those','through','to','too',
 'u','under','until','up','use','ve','very','wanted','was','wasn',"wasn't",'way','we','went','were','weren',"weren't",'what','when','where','which','while','who','whom','why','will','with','won',"won't",'would',
 'wouldn',"wouldn't",'y','you',"you'd","you'll","you're","you've",'your','yours','yourself','yourselves']

In [16]:
#parse all the articles in the 'article' column
#have it output to a new column called document
documentAssembler = DocumentAssembler() \
     .setInputCol('article') \
     .setOutputCol('document')

#parse the document column and tokenize the text
#output to a new column called 'token'
tokenizer = Tokenizer() \
     .setInputCols(['document']) \
     .setOutputCol('token')

#Remove punctuation, numbers, and symbols from text from tokens
#output to a new column called 'normalized'
normalizer = Normalizer() \
     .setInputCols(['token']) \
     .setOutputCol('normalized') \
     .setLowercase(True)

#get the root of each word by performing lemmatization
#output to a new column called lemma
lemmatizer = LemmatizerModel.pretrained() \
     .setInputCols(['normalized']) \
     .setOutputCol('lemma')

#remove all stopwords from lemma column
#ouptut to a new column called 'clean_lemma'
stopwords_cleaner = StopWordsCleaner() \
     .setInputCols(['lemma']) \
     .setOutputCol('clean_lemma') \
     .setCaseSensitive(False) \
     .setStopWords(stopwords)

# finisher converts tokens to human-readable output
finisher = Finisher() \
     .setInputCols(['clean_lemma']) \
     .setCleanAnnotations(False)

lemma_antbnc download started this may take some time.
Approximate size to download 907.6 KB
[OK!]


In [17]:
pipeline = Pipeline() \
     .setStages([
           documentAssembler,
           tokenizer,
           normalizer,
           lemmatizer,
           stopwords_cleaner,
           finisher
     ])

In [18]:
pipe = pipeline.fit(df)

In [19]:
clean = pipe.transform(df)

In [20]:
#convert the final column to all strings so it can be written to CSV
clean = clean.withColumn("bow", clean["finished_clean_lemma"].cast(StringType()))

In [21]:
#select only the columns we care about
results = clean.select('uid',
                       'date',
                       'year',
                       'month',
                       'day',
                       'author',
                       'title',
                       'article',
                       'publication',
                       'bow'
)

In [22]:
results = results.withColumn('month', results['month'].cast(IntegerType())).withColumn('uid', concat(col("month"), lit("."), col("uid")) )

In [23]:
results.show(100)

+-----+----------+----+-----+---+--------------------+--------------------+--------------------+----------------+--------------------+
|  uid|      date|year|month|day|              author|               title|             article|     publication|                 bow|
+-----+----------+----+-----+---+--------------------+--------------------+--------------------+----------------+--------------------+
| 12.0|2016-12-09|2016|   12|  9|         Lee Drutman|We should take co...|This post is part...|             Vox|[post, part, poly...|
| 10.1|2016-10-07|2016|   10|  7|         Scott Davis|Colts GM Ryan Gri...| The Indianapolis...|         missing|[indianapolis, co...|
|  1.2|2018-01-26|2018|    1| 26|             missing|Trump denies repo...|DAVOS, Switzerlan...|         Reuters|[davos, switzerla...|
|  6.3|2019-06-27|2019|    6| 27|             missing|France's Sarkozy ...|PARIS (Reuters) -...|         Reuters|[paris, reuter, f...|
|  1.4|2016-01-27|2016|    1| 27|             missing|P

In [24]:
results.write.format("csv").option("quote", "\"").option("escape", "\"").option("encoding", "UTF-8").option("header","true").mode("Overwrite").save("s3://largedatabucket/clean")