In [8]:
import findspark
# provide path to your spark directory directly
findspark.init("/home/canwill/spark2")
import pyspark
sc = pyspark.SparkContext(appName="helloworld")

In [27]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType

In [21]:
spark = SparkSession \
    .builder \
    .appName("Python Spark SQL basic example") \
    .config("spark.some.config.option", "some-value") \
    .getOrCreate()

In [10]:
%%bash
wget "https://en.wikipedia.org/wiki/Python_(programming_language)" #Python Wikipedia Page

--2017-04-14 11:28:30--  https://en.wikipedia.org/wiki/Python_(programming_language)
Resolving en.wikipedia.org (en.wikipedia.org)... 91.198.174.192, 2620:0:862:ed1a::1
Connecting to en.wikipedia.org (en.wikipedia.org)|91.198.174.192|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 338014 (330K) [text/html]
Saving to: ‘Python_(programming_language)’

     0K .......... .......... .......... .......... .......... 15%  160K 2s
    50K .......... .......... .......... .......... .......... 30%  342K 1s
   100K .......... .......... .......... .......... .......... 45% 1.14M 1s
   150K .......... .......... .......... .......... .......... 60% 1.21M 0s
   200K .......... .......... .......... .......... .......... 75%  205K 0s
   250K .......... .......... .......... .......... .......... 90%  142M 0s
   300K .......... .......... ..........                      100%  146M=0.8s

2017-04-14 11:28:32 (419 KB/s) - ‘Python_(programming_language)’ saved [338014/338014]


In [11]:

%%bash
mv 'Python_(programming_language)' python_wiki

In [12]:
%%bash
ls

bank_cross_sale.ipynb
churn_measurements.py
ColorQuantization
contractions.py
contractions.pyc
credit_risk_modeling.ipynb
customer_churn.ipynb
data
medaplexis
PageRankingwithSparseMatrix.ipynb
python_wiki
README.md
sms_spam_collector.ipynb
sms_spam_detector.pkl
spark-nltk-practice.ipynb
spark-warehouse


In [13]:
lines = sc.textFile("python_wiki")

In [14]:
lines.take(5)

[u'<!DOCTYPE html>',
 u'<html class="client-nojs" lang="en" dir="ltr">',
 u'<head>',
 u'<meta charset="UTF-8"/>',
 u'<title>Python (programming language) - Wikipedia</title>']

In [15]:
lines.count()

1261

In [16]:
import nltk

In [None]:
from pyspark import SparkConf
from pyspark import SparkContext

conf = SparkConf()
conf.setMaster('yarn-client')
conf.setAppName('spark-nltk')
sc = SparkContext(conf=conf)

data = sc.textFile('file:///usr/share/nltk_data/corpora/state_union/1972-Nixon.txt')

def word_tokenize(x):
    import nltk
    return nltk.word_tokenize(x)

def pos_tag(x):
    import nltk
    return nltk.pos_tag([x])

words = data.flatMap(word_tokenize)
print words.take(10)

pos_word = words.map(pos_tag)
print pos_word.take(5)

In [28]:
import nltk
import re
import string

from nltk.corpus import wordnet as wn
from contractions import CONTRACTION_MAP
from nltk.stem import WordNetLemmatizer
from HTMLParser import HTMLParser
import unicodedata

wnl = WordNetLemmatizer()
html_parser = HTMLParser()

stopword_list = nltk.corpus.stopwords.words('english')
stopword_list = stopword_list + ['mr', 'mrs', 'come', 'go', 'get',
                                 'tell', 'listen', 'one', 'two', 'three',
                                 'four', 'five', 'six', 'seven', 'eight',
                                 'nine', 'zero', 'join', 'find', 'make',
                                 'say', 'ask', 'tell', 'see', 'try', 'back',
                                 'also', '', None, ' ']


def expand_contractions(text, contraction_mapping):
    
    contractions_pattern = re.compile('({})'.format('|'.join(contraction_mapping.keys())), 
                                      flags=re.IGNORECASE|re.DOTALL)
    def expand_match(contraction):
        match = contraction.group(0)
        first_char = match[0]
        expanded_contraction = contraction_mapping.get(match)\
                                if contraction_mapping.get(match)\
                                else contraction_mapping.get(match.lower())                       
        expanded_contraction = first_char+expanded_contraction[1:]
        return expanded_contraction
        
    expanded_text = contractions_pattern.sub(expand_match, text)
    expanded_text = re.sub("'", "", expanded_text)
    return expanded_text
    

def word_tokenize(text):
    tokens = nltk.word_tokenize(text) 
    tokens = [token.strip() for token in tokens]
    return tokens

def remove_special_characters(text):
    tokens = text
    pattern = re.compile('[{}]'.format(re.escape(string.punctuation)))
    filtered_tokens = filter(None, [pattern.sub(' ', token) for token in tokens])
    return filtered_tokens
    
    
def remove_stopwords(text):
    tokens = text
    filtered_tokens = [token for token in tokens if token.strip() not in stopword_list]
    return filtered_tokens


# Annotate text tokens with POS tags
def pos_tag_text(words):
    
    def penn_to_wn_tags(pos_tag):
        if pos_tag.startswith('J'):
            return wn.ADJ
        elif pos_tag.startswith('V'):
            return wn.VERB
        elif pos_tag.startswith('N'):
            return wn.NOUN
        elif pos_tag.startswith('R'):
            return wn.ADV
        else:
            return None
        
        
    def pos_tagging(words):
        print 'pos_tagging'
        return nltk.pos_tag(words)
    
    tagged_words = pos_tagging(words)
    print 'pos_tagging'
    tagged_lower_text = [(word.lower(), penn_to_wn_tags(pos_tag))
                         for word, pos_tag in
                         tagged_words]
    return tagged_lower_text
    


def lemmatize_text(words):
    print 'lemma'
    pos_tagged_words = pos_tag_text(words)
    print 'pos done'
    lemmatized_tokens = [wnl.lemmatize(word, pos_tag) if pos_tag
                         else word                     
                         for word, pos_tag in pos_tagged_words]
    return lemmatized_tokens

def parse_hippa(text):
    hippa_pattern = '[[^a-zA-Z0-9]*/*[[^a-zA-Z0-9., -_]*/*[^a-zA-Z0-9., -_]*]*~]*'    
    if re.match(hippa_pattern, text)    :
        text = re.sub('\*','\* ', text)
        text = re.sub('~',' ~ ', text)
        return True, text
 
    return False, text 

In [29]:
import pandas as pd
data = pd.read_csv('data/retuters_test_data.csv', sep='\t')
data = spark.createDataFrame(data)
data.take(2)

[Row(Unnamed: 0=0, text=u'SANDOZ PLANS WEEDKILLER JOINT VENTURE IN USSR\n\nSandoz AG said it planned a joint venture to produce herbicides in the Soviet Union. The company said it had signed a letter of intent with the Soviet Ministry of Fertiliser Production to form the first foreign joint venture the ministry had undertaken since the Soviet Union allowed Western firms to enter into joint ventures two months ago. The ministry and Sandoz will each have a 50 pct stake, but a company spokeswoman was unable to give details of the size of investment or planned output. Reuter \x03', tags=u'usa,ussr'),
 Row(Unnamed: 0=1, text=u'TAIWAN REJECTS TEXTILE MAKERS EXCHANGE RATE PLEA\n\nCentral bank governor Chang Chi-cheng rejected a request by textile makers to halt the rise of the Taiwan dollar against the U.S. Dollar to stop them losing orders to South Korea, Hong Kong and Singapore, a spokesman for the Taiwan Textile Federation said. He quoted Chang as telling representatives of 19 textile asso

In [30]:
data.printSchema()

root
 |-- Unnamed: 0: long (nullable = true)
 |-- text: string (nullable = true)
 |-- tags: string (nullable = true)



In [31]:

def normalize_data(line):
    line = html_parser.unescape(line)
    line = expand_contractions(line, CONTRACTION_MAP)
    ishippa, line = parse_hippa(line)
    words = word_tokenize(line)
    if not ishippa:
        words = remove_special_characters(words)
    words = remove_stopwords(words)
    words = lemmatize_text(words)
    return words

udf_normalize_data = udf(normalize_data, StringType())

PicklingError: args[0] from __newobj__ args has the wrong class

In [None]:

df.withColumn("new_text", udf_normalize_data("text")).show(10)

In [118]:
words = lines.map(word_tokenize)
words = words.map(remove_special_characters)
words = words.map(remove_stopwords)
words = words.map(lemmatize_text)

TypeError: 'Column' object is not callable

In [90]:
words.take(20)

[[u'doctype', u'html'],
 [u'html', u'class ', u'client nojs', u'lang ', u'en', u'dir ', u'ltr'],
 [u'head'],
 [u'meta', u'charset ', u'utf 8'],
 [u'title', u'python', u'programming', u'language', u'wikipedia', u' title'],
 [u'script',
  u'document documentelement classname',
  u'document documentelement classname replace',
  u'client nojs',
  u'1client js',
  u'2',
  u' script'],
 [u'script',
  u'window rlq window rlq  ',
  u' push',
  u'function',
  u'mw config set',
  u'wgcanonicalnamespace',
  u'wgcanonicalspecialpagename',
  u'false',
  u'wgnamespacenumber',
  u' 0',
  u'wgpagename',
  u'python ',
  u'programming language',
  u'wgtitle',
  u'python',
  u'programming',
  u'language',
  u'wgcurrevisionid',
  u' 774758975',
  u'wgrevisionid',
  u' 774758975',
  u'wgarticleid',
  u' 23862',
  u'wgisarticle',
  u'true',
  u'wgisredirect',
  u'false',
  u'wgaction',
  u'view',
  u'wgusername',
  u'null',
  u'wgusergroups',
  u'wgcategories',
  u'wikipedia',
  u'article',
  u'need',
  u'p

In [62]:
words = lines.flatMap(word_tokenize)


In [59]:
words.take(20)

[[u'DOCTYPE', u'html'],
 [u'html', u'class ', u'client nojs', u'lang ', u'en', u'dir ', u'ltr'],
 [u'head'],
 [u'meta', u'charset ', u'UTF 8'],
 [u'title', u'Python', u'programming', u'language', u'Wikipedia', u' title'],
 [u'script',
  u'document documentElement className',
  u'document documentElement className replace',
  u'client nojs',
  u'1client js',
  u'2',
  u' script'],
 [u'script',
  u'window RLQ window RLQ  ',
  u' push',
  u'function',
  u'mw config set',
  u'wgCanonicalNamespace',
  u'wgCanonicalSpecialPageName',
  u'false',
  u'wgNamespaceNumber',
  u' 0',
  u'wgPageName',
  u'Python ',
  u'programming language',
  u'wgTitle',
  u'Python',
  u'programming',
  u'language',
  u'wgCurRevisionId',
  u' 774758975',
  u'wgRevisionId',
  u' 774758975',
  u'wgArticleId',
  u' 23862',
  u'wgIsArticle',
  u'true',
  u'wgIsRedirect',
  u'false',
  u'wgAction',
  u'view',
  u'wgUserName',
  u'null',
  u'wgUserGroups',
  u'wgCategories',
  u'Wikipedia',
  u'articles',
  u'needing',
 

In [14]:
import wordcloud