## Apache Spark (MLlib) -  Extracting Text Features for TF-IDF


<div class="alert alert-block alert-info" style="margin-top: 20px">**Data source:** The Notebooks of Leonardo Da Vinci — Complete by da Vinci Leonardo for a tutorial example. Datasource@ http://www.gutenberg.org/ebooks/5000/. Reference: Pyspark ML : https://spark.apache.org/docs/2.1.0</div>



##### by John Ryan 1/05/2017


__Overview__


The data set provides many categorical and continous variables that allow for the opportunity to implement Machine Learning models to accurately predict future purchasing habits of customers.

Question 1: How do we build a Resilient Distributed Data Set from text

Question 2: How do we build a Term frequency-inverse document frequency (TF-IDF) from text

### Apache Spark: Building Relisient Distributed Datasets (RDDs) with text in pyspark


** 1. Getting Started**

This tutorial brings you trough the most import fundamentals of running spark programmes.
When begining every pyspark session the following process is required:

- Import and create RDDs from external data sources.

- Transform - develop new RDDs using transformations such as filter().

- persist - possible RDDs that will need further use later need to be persisted a.

- Actions - Initiate parellel computation executed by Spark using take(), count() etc..

#### Import: and create RDDs

In [1]:
from pyspark.sql import SparkSession

# @hidden_cell
# This function is used to setup the access of Spark to your Object Storage. The definition contains your credentials.
# You might want to remove those credentials before you share your notebook.
def set_hadoop_config_with_credentials_xxxxxxxxxxxxxxxx(name):
    """This function sets the Hadoop configuration so it is possible to
    access data from Bluemix Object Storage using Spark"""

    prefix = 'fs.swift.service.' + name
    hconf = sc._jsc.hadoopConfiguration()
    hconf.set(prefix + '.auth.url', 'https://identity.open.softlayer.com'+'/xx/auth/tokens')
    hconf.set(prefix + '.auth.endpoint.prefix', 'endpoints')
    hconf.set(prefix + '.tenant', 'xxxxxxxxxxxxxxxxxxx')
    hconf.set(prefix + '.username', 'xxxxxxxxxxxxxxxxxxx')
    hconf.set(prefix + '.password', 'xxxxxxxxx')
    hconf.setInt(prefix + '.http.port', xxxx)
    hconf.set(prefix + '.region', 'xxxx')
    hconf.setBoolean(prefix + '.public', False)

# you can choose any name
name = 'keystone'
set_hadoop_config_with_credentials_xxxxxxxxxxxxxxxxxxx(name)

spark = SparkSession.builder.getOrCreate()

# Please read the documentation of PySpark to learn more about the possibilities to load data files.
# PySpark documentation: https://spark.apache.org/docs/2.0.1/api/python/pyspark.sql.html#pyspark.sql.SparkSession
# The SparkSession object is already initalized for you.
# The following variable contains the path to your file on your Object Storage.

rddData1 = sc.textFile("swift://newscala." + name + "/DiVinciNotes.txt")
rddData1.take(10)


[u'The Project Gutenberg EBook of The Notebooks of Leonardo Da Vinci, Complete',
 u'by Leonardo Da Vinci',
 u'(#3 in our series by Leonardo Da Vinci)',
 u'',
 u'Copyright laws are changing all over the world. Be sure to check the',
 u'copyright laws for your country before downloading or redistributing',
 u'this or any other Project Gutenberg eBook.',
 u'',
 u'This header should be the first thing seen when viewing this Project',
 u'Gutenberg file.  Please do not remove it.  Do not change or edit the']

**Resilient Distributed Datasets (RDDs)**

RDDs are a main compotent to spark these Resillient Distributed Datasets are fault-tolerant collections of elements that can handle commands in parrellel.Spark will distribute data stored in the RDD across the cluster and parallizes the operations carried out on the RDD. 

**Creating Resilient Distributed Datasets (RDDs)**

- RDD's are split into a number of partitions, that are computed on different nodeds of a cluster.

- Pyspark allows for RDDs to be created from an array of data storage types that are thus supported by bt Hadoop, for eaxmple HDFS, HBase, object container storage and URI. For this example we will use data imported from an object container storage.

In [2]:
#the first()action computes row of the RDD, spark only reads 
#the first line not the rest of the file!
rddData1.first()

u'The Project Gutenberg EBook of The Notebooks of Leonardo Da Vinci, Complete'

**2. Transformations: Develop new RDDs using transformations such as filter().**

Transformations construct and change the previous RDD to a new one.Here we filtered the 
elements containing "Leonardo" from the document and created a new RDD called "data".

In [3]:
#Transformations: construct and change the previous RDD to a new one.Here we filtered the 
#elements containing
leoRDD = rddData1.filter(lambda line: "Leonardo" in line)
leoRDD

PythonRDD[4] at RDD at PythonRDD.scala:48

In [4]:
#example two same function with longer string
effectsRDD = rddData1.filter(lambda line: "The effects of" in line)
effectsRDD

PythonRDD[5] at RDD at PythonRDD.scala:48

**Persisting () or Caching()**

This envolves sending a subset of your data into memory inorder for recall to carry out repeated calcualtions. Caching is similier process.



In [5]:
#we persist the new rdd that contains the Leonardo elements
leoRDD.persist

<bound method PipelinedRDD.persist of PythonRDD[4] at RDD at PythonRDD.scala:48>

In [6]:
#Example two cached
effectsRDD.cache

<bound method PipelinedRDD.cache of PythonRDD[5] at RDD at PythonRDD.scala:48>

In [7]:
leoRDD.count()

626

In [8]:
effectsRDD.count()

4

**2.1 Union() Transformation**

In [9]:
#prints out the number of lines containing bothe "Leonardo" and "the effects of"
vinciRDD = effectsRDD.union(leoRDD)

**2.3 map() and filter() Transformations**

- The map () transformation is an element wise transformation that uses a given function and applies a computation to elements within the RDD.
- The filter() transformation as seen previously also uses a given function but only creates a new RDD that may exclude elements onced passing the filter() function for example.

**Term frequency-inverse document frequency (TF-IDF)**

**Step 1:  HashingTF** 

In [10]:
from pyspark.mllib.feature import HashingTF, IDF
dataRDD = rddData1.map(lambda line: line.split())
htf = HashingTF()
tfVect = htf.transform(dataRDD)
tfVect.cache()#cache for later reuse

PythonRDD[9] at RDD at PythonRDD.scala:48

**Step 2: Inverse Document Frequency**

In [14]:
#import library and fit the tf vector to IDF model and then transform to a sparse vector representive of words
from pyspark.mllib.feature import IDF
idf = IDF(minDocFreq=2).fit(tfVect)
tfidf = idf.transform(tfVect)
tfidf.take(10)

[SparseVector(1048576, {274140: 4.5222, 323305: 2.6779, 641967: 8.3176, 689675: 8.7876, 832474: 6.0919, 833202: 7.4014, 880387: 7.1782, 994896: 7.0298, 996619: 0.0, 1027493: 9.2985}),
 SparseVector(1048576, {274140: 4.5222, 335910: 6.8417, 641967: 8.3176, 956125: 2.8184}),
 SparseVector(1048576, {50583: 1.9295, 274140: 4.5222, 641967: 8.3176, 852617: 8.3176, 880390: 0.0, 948889: 5.4483, 956125: 2.8184, 997345: 0.0}),
 SparseVector(1048576, {}),
 SparseVector(1048576, {191961: 8.7876, 358972: 7.5067, 521365: 3.0466, 592895: 7.5639, 618498: 4.0445, 647690: 5.5688, 649117: 8.4512, 682947: 9.0108, 725041: 1.9718, 900641: 8.6053, 959994: 1.8289, 996102: 9.2985}),
 SparseVector(1048576, {66793: 7.4014, 308262: 8.1999, 323325: 3.781, 372509: 4.6445, 592895: 7.5639, 643889: 0.0, 672287: 5.6609, 919619: 0.0, 951974: 3.4637}),
 SparseVector(1048576, {196531: 4.2233, 230882: 0.0, 323325: 3.781, 629096: 3.0785, 715677: 4.5306, 833202: 7.4014, 994896: 7.0298}),
 SparseVector(1048576, {}),
 SparseVe