# PySpark Part of Speech (POS) analysis
Text taken from [Reuters](https://www.reuters.com/business/finance/banks-beware-outsiders-are-cracking-code-finance-2021-09-17/).

In [1]:
import nltk
from pyspark import SparkContext

In [2]:
nltk.download("punkt")
nltk.download("averaged_perceptron_tagger")

[nltk_data] Downloading package punkt to /home/william/nltk_data...
[nltk_data]   Package punkt is already up-to-date!
[nltk_data] Downloading package averaged_perceptron_tagger to
[nltk_data]     /home/william/nltk_data...
[nltk_data]   Package averaged_perceptron_tagger is already up-to-
[nltk_data]       date!


True

In [3]:
sc = SparkContext("spark://william-VirtualBox:7077", appName = "pyspark-pos-analysis")

In [4]:
# Loading a text file
rdd_reuters = sc.textFile("./data/reuters.txt")

In [12]:
rdd_reuters.count()
rdd_reuters.take(5)

['Banks beware, Amazon and Walmart are cracking the code for finance',
 '',
 'LONDON, Sept 17 (Reuters) - Anyone can be a banker these days, you just need the right code.',
 '',
 'Global brands from Mercedes and Amazon (AMZN.O) to IKEA and Walmart (WMT.N) are cutting out the traditional financial middleman and plugging in software from tech startups to offer customers everything from banking and credit to insurance.']

In [16]:
from operator import add
rdd= rdd_reuters.filter(lambda x: x!='').fold("", add)
rdd



In [19]:
text_tokenize= nltk.word_tokenize(rdd)
posText= nltk.pos_tag(text_tokenize)
posText

[('Banks', 'NNS'),
 ('beware', 'NN'),
 (',', ','),
 ('Amazon', 'NNP'),
 ('and', 'CC'),
 ('Walmart', 'NNP'),
 ('are', 'VBP'),
 ('cracking', 'VBG'),
 ('the', 'DT'),
 ('code', 'NN'),
 ('for', 'IN'),
 ('financeLONDON', 'NN'),
 (',', ','),
 ('Sept', 'NNP'),
 ('17', 'CD'),
 ('(', '('),
 ('Reuters', 'NNPS'),
 (')', ')'),
 ('-', ':'),
 ('Anyone', 'NN'),
 ('can', 'MD'),
 ('be', 'VB'),
 ('a', 'DT'),
 ('banker', 'NN'),
 ('these', 'DT'),
 ('days', 'NNS'),
 (',', ','),
 ('you', 'PRP'),
 ('just', 'RB'),
 ('need', 'VB'),
 ('the', 'DT'),
 ('right', 'JJ'),
 ('code.Global', 'NN'),
 ('brands', 'NNS'),
 ('from', 'IN'),
 ('Mercedes', 'NNP'),
 ('and', 'CC'),
 ('Amazon', 'NNP'),
 ('(', '('),
 ('AMZN.O', 'NNP'),
 (')', ')'),
 ('to', 'TO'),
 ('IKEA', 'NNP'),
 ('and', 'CC'),
 ('Walmart', 'NNP'),
 ('(', '('),
 ('WMT.N', 'NNP'),
 (')', ')'),
 ('are', 'VBP'),
 ('cutting', 'VBG'),
 ('out', 'RP'),
 ('the', 'DT'),
 ('traditional', 'JJ'),
 ('financial', 'JJ'),
 ('middleman', 'NN'),
 ('and', 'CC'),
 ('plugging', 'NN'),

In [56]:
import re
posTextRdd= sc.parallelize(posText)
#print(posTextRdd.take(5))
p= re.compile("(\\w|\\d)")
#ptRdd= posTextRdd.reduceByKey(lambda x: re.search(p, x))
#print(ptRdd.first())
posTextRdd= posTextRdd.map(lambda x: (x[1],x[0])).filter(lambda x: re.search(p, x[0])).map(lambda x: (x[0], 1)).reduceByKey(add)
posTextRdd.take(10)

[('NNS', 120),
 ('CC', 43),
 ('VBP', 40),
 ('CD', 33),
 ('PRP', 40),
 ('RB', 49),
 ('TO', 41),
 ('RP', 7),
 ('VBZ', 38),
 ('WRB', 8)]

In [62]:
def categorize(tag):
    if tag == "CC":
        tag= "coordinating conjunction"
    elif tag == "CD":
        tag= "cardinal digit"
    elif tag == "DT":
        tag= "determiner"
    elif tag == "EX":
        tag= "existential"
    elif tag == "FW":
        tag= "foreign word"
    elif tag == "IN":
        tag= "preposition/subordinating conjunction"
    elif tag == "JJ" or tag == "JJR" or tag == "JJS":
        tag= "adjective"
    elif tag == "LS":
        tag= "list marker"
    elif tag == "MD":
        tag= "modal could"
    elif tag == "LS":
        tag= "list marker"
    elif tag == "NN" or tag == "NNS" or tag == "NNP" or tag == "NNPS":
        tag= "noun"
    elif tag == "VB" or tag == "VBD" or tag == "VBG" or tag == "VBN" or tag == "VBP" or tag == "VBZ":
        tag= "verb"
        
posTextRdd.map(lambda x: categorize(x[0])).collect()

AttributeError: 'NoneType' object has no attribute 'map'