In [1]:
from __future__ import print_function

import time
import re
import os.path
import fnmatch
import sgmllib
import urllib
import tarfile

import numpy as np
import pylab as pl

from sklearn.feature_extraction.text import HashingVectorizer
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.linear_model.stochastic_gradient import SGDClassifier
import itertools

from pyspark import SparkContext

In [10]:
def _not_in_sphinx():
    return '__file__' in globals()
class ReutersParser(sgmllib.SGMLParser):
    def __init__(self, verbose=1):
        sgmllib.SGMLParser.__init__(self, verbose)
        self._reset()

    def _reset(self):
        self.in_title = 0
        self.in_body = 0
        self.in_topics = 0
        self.in_topic_d = 0
        self.title = ""
        self.body = ""
        self.topics = []
        self.topic_d = ""

    def parse(self, fd):
        self.docs = []
        for chunk in fd:
            self.feed(chunk)
            for doc in self.docs:
                yield doc
            self.docs = []
        self.close()

    def handle_data(self, data):
        if self.in_body:
            self.body += data
        elif self.in_title:
            self.title += data
        elif self.in_topic_d:
            self.topic_d += data

    def start_reuters(self, attributes):
        pass

    def end_reuters(self):
        self.body = re.sub(r'\s+', r' ', self.body)
        self.docs.append({'title': self.title,
                          'body': self.body,
                          'topics': self.topics})
        self._reset()

    def start_title(self, attributes):
        self.in_title = 1

    def end_title(self):
        self.in_title = 0

    def start_body(self, attributes):
        self.in_body = 1

    def end_body(self):
        self.in_body = 0

    def start_topics(self, attributes):
        self.in_topics = 1

    def end_topics(self):
        self.in_topics = 0

    def start_d(self, attributes):
        self.in_topic_d = 1

    def end_d(self):
        self.in_topic_d = 0
        self.topics.append(self.topic_d)
        self.topic_d = ""


In [11]:
class ReutersStreamReader():

    DOWNLOAD_URL = ('http://archive.ics.uci.edu/ml/machine-learning-databases/'
                    'reuters21578-mld/reuters21578.tar.gz')
    ARCHIVE_FILENAME = 'reuters21578.tar.gz'

    def __init__(self, data_path):
        self.data_path = data_path
        if not os.path.exists(self.data_path):
            self.download_dataset()

    def download_dataset(self):
        """Download the dataset."""
        print("downloading dataset (once and for all) into %s" %
              self.data_path)
        os.mkdir(self.data_path)

        def progress(blocknum, bs, size):
            total_sz_mb = '%.2f MB' % (size / 1e6)
            current_sz_mb = '%.2f MB' % ((blocknum * bs) / 1e6)
            if _not_in_sphinx():
                print('\rdownloaded %s / %s' % (current_sz_mb, total_sz_mb),
                      end='')
        urllib.urlretrieve(self.DOWNLOAD_URL,
                           filename=os.path.join(self.data_path,
                                                 self.ARCHIVE_FILENAME),
                           reporthook=progress)
        if _not_in_sphinx():
            print('\r', end='')
        print("untaring data ...")
        tfile = tarfile.open(os.path.join(self.data_path,
                                          self.ARCHIVE_FILENAME),
                             'r:gz')
        tfile.extractall(self.data_path)
        print("done !")

    def iterdocs(self):
        """Iterate doc by doc, yield a dict."""
        for root, _dirnames, filenames in os.walk(self.data_path):
            for filename in fnmatch.filter(filenames, '*.sgm'):
                path = os.path.join(root, filename)
                parser = ReutersParser()
                for doc in parser.parse(open(path)):
                    yield doc

In [12]:
ReutersStreamReader('reuters').iterdocs()

<generator object iterdocs at 0x7f0ba9957e60>

In [13]:
data_streamer = ReutersStreamReader('reuters').iterdocs()

positive_class = ['acq', 'corn', 'crude', 'earn', 'grain', 'interest', 'money-fx', 'ship', 'trade', 'wheat']

In [7]:
temp = sc.parallelize(data_streamer)

In [9]:
len(temp.collect())

21578

In [79]:
temp.take(5)

[{'body': '',
  'title': "TREASURY'S BAKER SAYS MACROECONOMIC INDICATORS NEED MORE PROMINENT ROLE\n",
  'topics': ['james-baker']},
 {'body': '',
  'title': 'HOSPITAL CORP SAYS IT RECEIVED 47 DLR A SHARE OFFER FROM INVESTOR GROUP\n',
  'topics': ['acq']},
 {'body': 'Qtly div five cts vs five cts prior Pay July 13 Record June 30 Reuter \x03',
  'title': 'BEVERLY ENTERPRISES <BEV> SETS REGULAR DIVIDEND',
  'topics': ['earn', 'usa']},
 {'body': '',
  'title': "TREASURY'S BAKER SAYS FLOATING EXCHANGE RATE SYSTEM NEEDS GREATER STABILITY\n",
  'topics': ['money-fx', 'james-baker']},
 {'body': 'Crude oil netback values in complex refineries rose sharply in Europe and firmed in the U.S. last Friday from the previous week but fell sharply in Singapore, according to calculations by Reuters Pipeline. The firmer tone to refining margins in Europe and the U.S. relected higher prices for petroleum products, particularly gasoline, and support from crude oil prices. Netback values for crude oil refine

In [10]:
train = temp.filter(lambda s: s['topics'] != [])

In [11]:
len(train.collect())

19716

In [12]:
test = temp.filter(lambda s: s['topics'] == [])

In [13]:
test = test.map(lambda s: s['title']+" "+s['body'])

In [14]:
test_data = test.collect()

In [15]:
len(list(test.collect()))

1862

In [16]:
test.take(3)

['GENERAL NUTRITION FILES FOR SECONDARY OFFERING OF EIGHT MLN COMMON SHARES\n ',
 'TEXACO NOT REQUIRED TO POST BOND IN APPEAL OF PENNZOIL JUDGMENT, COURT SAYS\n ',
 "MOODY'S MAY DOWNGRADE IRVING BANK CORP, AFFECTS 950 MLN DLRS OF DEBT\n "]

In [18]:
target = []
for i in range(10):
    target.append((train.map(lambda s : positive_class[i] in s['topics'])).collect())

In [19]:
target = np.array(target)
target.shape

(10, 19716)

In [20]:
target[0]

array([False,  True, False, ...,  True, False, False], dtype=bool)

In [24]:
x_train = train.map(lambda s: s['title']+" "+s['body'])

In [25]:
x_train.take(3)

["TREASURY'S BAKER SAYS MACROECONOMIC INDICATORS NEED MORE PROMINENT ROLE\n ",
 'HOSPITAL CORP SAYS IT RECEIVED 47 DLR A SHARE OFFER FROM INVESTOR GROUP\n ',
 'BEVERLY ENTERPRISES <BEV> SETS REGULAR DIVIDEND Qtly div five cts vs five cts prior Pay July 13 Record June 30 Reuter \x03']

In [26]:
from pyspark.mllib.feature import HashingTF, IDF

documents = x_train.map(lambda line: line.split(" "))
hashingTF = HashingTF()
tf = hashingTF.transform(documents)
tf.cache()
idfIgnore = IDF(minDocFreq=2).fit(tf)
tfidfIgnore = idfIgnore.transform(tf)

In [27]:
train = tfidfIgnore.collect()

In [160]:
target[0]

array([False,  True, False, ...,  True, False, False], dtype=bool)

In [22]:
train.take(2)

[{'body': '',
  'title': "TREASURY'S BAKER SAYS MACROECONOMIC INDICATORS NEED MORE PROMINENT ROLE\n",
  'topics': ['james-baker']},
 {'body': '',
  'title': 'HOSPITAL CORP SAYS IT RECEIVED 47 DLR A SHARE OFFER FROM INVESTOR GROUP\n',
  'topics': ['acq']}]

In [28]:
from pyspark.mllib.regression import LabeledPoint
total_data = []
for j in range(10):
    sparse_data = []
    for i in range(19716):
        sparse_data.append(LabeledPoint(target[j][i], train[i]))
    total_data.append(sparse_data)

In [162]:
len(total_data[0])

19716

In [29]:
from pyspark.mllib.classification import LogisticRegressionWithSGD
from pyspark.mllib.classification import LogisticRegressionWithLBFGS
from pyspark.mllib.classification import SVMWithSGD
lrm = []
for i in range(10):
    #lrm.append(LogisticRegressionWithSGD.train(sc.parallelize(total_data[i]), iterations=100))
    #lrm.append(LogisticRegressionWithLBFGS.train(sc.parallelize(total_data[i]), iterations=100))
    svm = SVMWithSGD.train(sc.parallelize(total_data[i]), iterations=10)

In [30]:
from pyspark.mllib.feature import HashingTF, IDF

documents = test.map(lambda line: line.split())
hashingTF = HashingTF()
tf = hashingTF.transform(documents)
tf.cache()
idfIgnore = IDF(minDocFreq=2).fit(tf)
tfidfIgnore = idfIgnore.transform(tf)

In [31]:
test = tfidfIgnore.collect()

In [32]:
test[0]

SparseVector(1048576, {9889: 0.0, 95279: 5.4505, 148230: 4.965, 170239: 0.0, 223924: 5.584, 267881: 2.5741, 281798: 3.2125, 377013: 4.8219, 599150: 4.5342, 781260: 2.9656, 812768: 4.5342})

In [208]:
result = []
for i in range(1):
    stat = []
    for j in range(1862):
        stat.append(svm.predict(test[j]))
    result.append(stat)

In [209]:
c = 0
for i in result[0]:
    if i:
        print(test_data[c])
        c = c+1

GENERAL NUTRITION FILES FOR SECONDARY OFFERING OF EIGHT MLN COMMON SHARES
 
TEXACO NOT REQUIRED TO POST BOND IN APPEAL OF PENNZOIL JUDGMENT, COURT SAYS
 
MOODY'S MAY DOWNGRADE IRVING BANK CORP, AFFECTS 950 MLN DLRS OF DEBT
 
SEARS ROEBUCK SAID IT WILL REDEEM ALL ADJUSTABLE RATE PREFERRED SHARES, FIRST SERIES
 
MOODY'S DOWNGRADES TEXACO'S 8.2 BILLION DLRS OF DEBT TO 'CAA'
 
MOODY'S DOWNGRADES BANKERS TRUST, AFFECTS 1.7 BILLION DLRS OF DEBT
 
Toshiba group net 34.18 billion yen (59.44 billion) year to March 31
 
KOREA ELECTRIC POWER PLANS FIRST EUROYEN BOND Korea Electric Power Corp plans to issue its first euroyen bond, a 7.5 billion yen issue with six-year maturity carrying a coupon of less than five pct, company officials said. They said the bond would be launched in London later this month with Daiwa Securities Co Ltd as lead manager. The officials gave no further details. REUTER 
TOKYO STOCK INDEX RISES 91.19 POINTS TO RECORD CLOSING 24,992.78 - BROKERS
 
Japan May external reserve