## Upload Review Data using AzureML

Create a batch file and execute:
    
```
cd "C:\Program Files (x86)\Microsoft SDKs\Azure\AzCopy"
AzCopy /Source:C:\_ilia_share\amazon_prod_reviews_clean\raw /Dest:https://ikcentralusstore.blob.core.windows.net/amazonrev /DestKey:dLR5lH2QN/ejGmyD61nQoh7Cc2DW8jIKhR5n5uvGu8+H3Qem4J0XzWG1/7XtBxmVlWr+y/GNRlwX4Km5YU68sg== /Pattern:"aggressive_dedup.json"
pause
```

## Load Review Data (from Blob)

In [1]:
# Idea courtesy of Thomas D.
import time
STIME = { "start" : time.time() }

def tic():
    STIME["start"] = time.time()

def toc():
    elapsed = time.time() - STIME["start"]
    print("%.2f seconds elasped" % elapsed)

Creating SparkContext as 'sc'


ID,YARN Application ID,Kind,State,Spark UI,Driver log,Current session?
34,application_1469453428769_0012,pyspark,idle,Link,Link,✔


Creating HiveContext as 'sqlContext'
SparkContext and HiveContext created. Executing user code ...


In [2]:
# paths
blob = "wasb://amazonrev@ikcentralusstore.blob.core.windows.net"
json_dta = blob + "/aggressive_dedup.json"

In [154]:
# load data
jsonFile = sqlContext.read.json(json_dta)
jsonFile.registerTempTable("reviews")

print(type(jsonFile)) #  <class 'pyspark.sql.dataframe.DataFrame'>
jsonFile.show(5)

# Note: also load the IMDB data at some point
# ...

<class 'pyspark.sql.dataframe.DataFrame'>
+----------+-------+-------+--------------------+-----------+--------------------+---------------+--------------------+--------------+
|      asin|helpful|overall|          reviewText| reviewTime|          reviewerID|   reviewerName|             summary|unixReviewTime|
+----------+-------+-------+--------------------+-----------+--------------------+---------------+--------------------+--------------+
|B003UYU16G| [0, 0]|    5.0|It is and does ex...|11 21, 2012|A00000262KYZUE4J5...| Steven N Elich|Does what it's su...|    1353456000|
|B005FYPK9C| [0, 0]|    5.0|I was sketchy at ...| 01 8, 2013|A000008615DZQRRI9...|      mj waldon|           great buy|    1357603200|
|B000VEBG9Y| [0, 0]|    3.0|Very mobile produ...|03 24, 2014|A00000922W28P2OCH...|Gabriel Merrill|Great product but...|    1395619200|
|B001EJMS6K| [0, 0]|    4.0|Easy to use a mob...|03 24, 2014|A00000922W28P2OCH...|Gabriel Merrill|Great inexpensive...|    1395619200|
|B003XJCNVO| 

## Examine some of the reviews

In [155]:
%%sql 
SELECT overall, reviewText
FROM reviews
LIMIT 10

In [156]:
%%sql 
SELECT overall, COUNT(overall) as freq
FROM reviews
GROUP BY overall
ORDER by -freq

In [157]:
# Create a dataframe of our reviews
# To analyse class imbalance
reviews =  sqlContext.sql("SELECT " + 
                          "CASE WHEN overall < 3 THEN 'low' " +
                          "WHEN overall > 3 THEN 'high' ELSE 'mid' END as label, " + 
                          "reviewText as sentences " + 
                          "FROM reviews")

tally = reviews.groupBy("label").count()
tally.show()

#mid| 7,039,272
#low|10,963,811
#high|64,453,794

+-----+--------+
|label|   count|
+-----+--------+
|  mid| 7039272|
|  low|10963811|
| high|64453794|
+-----+--------+

In [None]:
"""
# Let's look at some reviews to see how clean they are
# there seems to be lots of html formatting
for c,r in enumerate(reviews.take(10)):
    print("%d. %s" % (c+1,r['sentences']))
"""

In [None]:
"""
# Some very basic cleaning
from pyspark.sql.functions import UserDefinedFunction
from pyspark.sql.types import StringType, DoubleType 
from bs4 import BeautifulSoup

def cleanerHTML(line):
    # html formatting
    html_clean = BeautifulSoup(line, "lxml").get_text().lower()
    # remove any double spaces, line-breaks, etc.
    return " ".join(html_clean.split())

cleaner = UserDefinedFunction(cleanerHTML, StringType())

def labelForResults(s):
    # string label to numeric
    if s == 'low':
        return 0.0
    elif s == 'high':
        return 1.0
    else:
        return -1.0
        
label = UserDefinedFunction(labelForResults, DoubleType())

cleanedReviews = reviews.select(reviews.label,
                                label(reviews.label).alias('sentiment'), 
                                cleaner(reviews.sentences).alias('sentences'))
"""

In [None]:
"""
# A bit cleaner ...
for c,r in enumerate(cleanedReviews.take(10)):
    print("%d. %s" % (c+1,r['sentences']))
"""

In [None]:
"""
#cleanedReviews.show()
"""

In [None]:
"""
# Equalise classes 
neg_rev = cleanedReviews.filter("sentiment = 0.0")
pos_rev = cleanedReviews.filter("sentiment = 1.0").limit(neg_rev.count())
"""

In [None]:
"""
# Save data
allData = pos_rev.unionAll(neg_rev)
print(allData.count()) # 21,927,622 ( = 10,963,811 * 2)

allDataLoc = blob + "/cleaned_equal_classes.json"
allData.write.json(allDataLoc)
"""

## Load Clean Data

In [None]:
allDataLoc = blob + "/cleaned_equal_classes.json"
allData = sqlContext.read.json(allDataLoc)

data_count = allData.count()
print(data_count)

In [None]:
# Take 100,000
sub_sample = 10000
sub_sample_ratio = float(sub_sample)/float(data_count)

print(sub_sample_ratio)
print(type(allData))

In [None]:
# sub_sample -> sample(boolean withReplacement, double fraction, long seed)
allData = allData.sample(False, sub_sample_ratio, 12345)

# split intro training and test (50%, 50%)
trainingData, testData = allData.randomSplit([0.5, 0.5])

In [3]:
trainingDataLoc = blob + "/training_1mill.json"
testDataLoc = blob + "/testing_1mill.json"

In [None]:
# Save
#trainingData.write.mode(SaveMode.Overwrite).json(trainingDataLoc)
#testData.write.mode(SaveMode.Overwrite).json(testDataLoc)

In [4]:
# Load
trainingData = sqlContext.read.json(trainingDataLoc)
testData = sqlContext.read.json(testDataLoc)

In [5]:
trainingData.cache()
testData.cache()

print(trainingData.count())
print(testData.count())

500349
499826

In [6]:
trainingData.show()

+-----+--------------------+---------+
|label|           sentences|sentiment|
+-----+--------------------+---------+
| high|!i recommend this...|      1.0|
| high|" duty, honor, co...|      1.0|
| high|" ok let first st...|      1.0|
| high|"a deadly justice...|      1.0|
| high|"a dirty job" is ...|      1.0|
| high|"a man of god" is...|      1.0|
| high|"a practical book...|      1.0|
| high|"a widow's story"...|      1.0|
| high|"abraham's burden...|      1.0|
| high|"always said if i...|      1.0|
| high|"american fool" b...|      1.0|
| high|"antsy does time"...|      1.0|
| high|"anyone who leads...|      1.0|
| high|"athlete/warrior"...|      1.0|
| high|"better living th...|      1.0|
| high|"bob cornuke writ...|      1.0|
| high|"changing seasons...|      1.0|
| high|"city of angels: ...|      1.0|
| high|"cleopatra" was t...|      1.0|
| high|"courage" by dais...|      1.0|
+-----+--------------------+---------+
only showing top 20 rows

In [7]:
testData.show()

+-----+--------------------+---------+
|label|           sentences|sentiment|
+-----+--------------------+---------+
| high|" to sheldon and ...|      1.0|
| high|"...those men who...|      1.0|
| high|"a prayer for the...|      1.0|
| high|"a seacat's love"...|      1.0|
| high|"amazing product ...|      1.0|
| high|"aves - the age o...|      1.0|
| high|"butera does it a...|      1.0|
| high|"c'era una volta ...|      1.0|
| high|"circle william" ...|      1.0|
| high|"crush proof" is ...|      1.0|
| high|"die unendliche g...|      1.0|
| high|"don't let fear h...|      1.0|
| high|"du hast" means y...|      1.0|
| high|"feedback" is an ...|      1.0|
| high|"fling" is one of...|      1.0|
| high|"forgiving maximo...|      1.0|
| high|"gangster governm...|      1.0|
| high|"gone girl" just ...|      1.0|
| high|"good parents bad...|      1.0|
| high|"great price with...|      1.0|
+-----+--------------------+---------+
only showing top 20 rows

## 1. TFIDF

In [148]:
""" Pipeline for feature selection and classification
Using:

https://spark.apache.org/docs/1.5.2/ml-features.html
https://spark.apache.org/docs/1.6.1/api/python/pyspark.sql.html
http://spark.apache.org/docs/latest/api/python/pyspark.mllib.html#pyspark.mllib.classification.LogisticRegressionModel
http://nlp.stanford.edu/IR-book/html/htmledition/document-and-query-weighting-schemes-1.html#sec:querydocweighting

Attempting to replicate: 

class sklearn.feature_extraction.text.TfidfVectorizer(input='content', encoding='utf-8',
decode_error='strict', strip_accents=None, lowercase=True, preprocessor=None, 
tokenizer=None, analyzer='word', stop_words=None, token_pattern='(?u)\b\w\w+\b', 
ngram_range=(1, 3), max_df=1.0, min_df=1, max_features=40000, vocabulary=None, 
binary=False, dtype=<class 'numpy.int64'>, norm='l2', use_idf=True, 
smooth_idf=True, sublinear_tf=True)

I think only sublinear_tf and ngram_range need to be modified

(https://github.com/scikit-learn/scikit-learn/blob/master/sklearn/feature_extraction/text.py)
if self.sublinear_tf:
    np.log(X.data, X.data)
    X.data += 1
            
"""
from pyspark.ml import Pipeline
from pyspark.ml.feature import HashingTF, IDF, Tokenizer, NGram, StringIndexer
from pyspark.ml.classification import LogisticRegression, RandomForestClassifier, GBTClassifier
from pyspark.sql.functions import col, udf
from itertools import chain
from pyspark.sql.types import ArrayType, StringType
from pyspark.mllib.linalg import Vectors, VectorUDT
import numpy as np

numfeat = 40000

########################
# 1. Feature-extraction
########################

def concat(type):
    """ UDF to concatenate lists across columns to create
    an n-gram range. To reproduce ngram_range=(1,3) from sklearn
    """
    def concat_(*args):
        return list(chain(*args))
    return udf(concat_, ArrayType(type))                   
concat_string_arrays = concat(StringType())


indexer = StringIndexer(inputCol="sentiment", outputCol="sentiment_idx")
tokenizer = Tokenizer(inputCol="sentences", outputCol="words")
biGram = NGram(inputCol = "words", n=2, outputCol = "2gram")
triGram = NGram(inputCol = "words", n=3, outputCol = "3gram")
hashingtf  = HashingTF(inputCol="ngrams", outputCol="rawFeatures", numFeatures=numfeat)
# UDF to apply sub-linear scaling on sparse vectors tf
vector_udf = udf(lambda sv: Vectors.sparse(sv.size, dict(zip(sv.indices, np.log(sv.values) + 1))), VectorUDT())
idf = IDF(inputCol="logRawFeatures", outputCol="features")

#######
# Train
#######
indexerModel = indexer.fit(trainingData)
trainingDataIx = indexerModel.transform(trainingData)
tokenized_train = tokenizer.transform(trainingDataIx)

biGram_train = biGram.transform(tokenized_train)
triGram_train = triGram.transform(biGram_train)
ngrammed_train = triGram_train.withColumn("ngrams", concat_string_arrays(
        col("words"),
        col("2gram"),
        col("3gram")))
hashed_train = hashingtf.transform(ngrammed_train)
sublintf_train = hashed_train.withColumn('logRawFeatures', vector_udf(hashed_train.rawFeatures))

idfModel = idf.fit(sublintf_train)
idf_train = idfModel.transform(sublintf_train)
idf_train.first()

+-----+--------------------+---------+-------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+
|label|           sentences|sentiment|sentiment_idx|               words|               2gram|               3gram|              ngrams|         rawFeatures|      logRawFeatures|            features|
+-----+--------------------+---------+-------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+
| high|!i recommend this...|      1.0|          1.0|[!i, recommend, t...|[!i recommend, re...|[!i recommend thi...|[!i, recommend, t...|(40000,[277,1056,...|(40000,[277,1056,...|(40000,[277,1056,...|
| high|" duty, honor, co...|      1.0|          1.0|[", duty,, honor,...|[" duty,, duty, h...|[" duty, honor,, ...|[", duty,, honor,...|(40000,[34,97,432...|(40000,[34,97,432...|(40000,[34,97,432...|


### tfidf variants:

![alt text](http://nlp.stanford.edu/IR-book/html/htmledition/img462.png "TFs")

In [146]:
"""
###########################
# Example: Apply sub-linear
###########################
from pyspark.mllib.linalg import Vectors, VectorUDT
import numpy as np

testy = hashed_train.first()['rawFeatures']

print(type(testy))
print(type(testy.values))
print(testy)

vector_udf = udf(lambda sv: Vectors.sparse(sv.size, dict(zip(sv.indices, np.log(sv.values) + 10))), VectorUDT())
sublintf_train = hashed_train.withColumn('sublintf', vector_udf(hashed_train.rawFeatures))

testy2 = sublintf_train.first()['sublintf']
print(testy2)
"""

<class 'pyspark.mllib.linalg.SparseVector'>
<type 'numpy.ndarray'>
(40000,[277,1056,1128,2165,3370,3371,3500,3707,4051,4486,4668,4846,5627,5646,6272,6944,7183,8088,9207,9541,9581,10564,11966,12365,12620,12709,13250,14081,15914,16727,16890,16897,16956,20127,20883,21925,22837,23051,23444,23895,23967,24280,24738,25310,25969,26033,26302,27086,29182,29509,29593,31576,32107,32586,34076,34380,34466,34619,35240,35615,36103,36715,37023,37075,37368,38027,38092,38621,39070,39352],[1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,2.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,2.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0])
(40000,[277,1056,1128,2165,3370,3371,3500,3707,4051,4486,4668,4846,5627,5646,6272,6944,7183,8088,9207,9541,9581,10564,11966,12365,12620,12709,13250,14081,15914,16727,16890,16897,16956,20127,20883,21925,22837,23051,23444,23895,23967,24280,24738

In [151]:
######
# Test
######
testDataIx = indexerModel.transform(testData)
tokenized_test = tokenizer.transform(testDataIx)

biGram_test = biGram.transform(tokenized_test)
triGram_test = triGram.transform(biGram_test)
ngrammed_test = triGram_test.withColumn("ngrams", concat_string_arrays(
        col("words"),
        col("2gram"),
        col("3gram")))
hashed_test = hashingtf.transform(ngrammed_test)
sublintf_test = hashed_test.withColumn('logRawFeatures', vector_udf(hashed_test.rawFeatures))
idf_test = idfModel.transform(sublintf_test)

#idf_test.show()

In [153]:
# 2A. Classifier (Logistic Regression)
classi = LogisticRegression(labelCol="sentiment_idx", featuresCol="features")
tfidfModel = classi.fit(idf_train)
pred = tfidfModel.transform(idf_test)

# 3. Examine
numSuccesses = pred.where("""(prediction = sentiment_idx)""").count()
numInspections = numSuccesses + pred.where("""(prediction != sentiment_idx)""").count()
acc = (float(numSuccesses) / float(numInspections)) * 100
print("%.2f success rate" % acc) # 76.77 success rate

"""
Standard: 76.77 success rate
With ngrams(1,3): 88.17 success rate
With ngrams + sublineartf: 88.32 success rate
"""

88.32 success rate
'\nStandard: 76.77 success rate\nWith ngrams(1,3): 88.17 success rate\nWith ngrams + sublineartf: ?\n'

In [None]:
# 2C. Classifier (GBTClassifier)
classi = GBTClassifier(labelCol="sentiment_idx", featuresCol="features", numClasses=2)
tfidfModel = classi.fit(idf_train)
pred = tfidfModel.transform(idf_test)

# 3. Examine
numSuccesses = pred.where("""(prediction = sentiment_idx)""").count()
numInspections = numSuccesses + pred.where("""(prediction != sentiment_idx)""").count()
acc = (float(numSuccesses) / float(numInspections)) * 100
print("%.2f success rate" % acc) # ? success rate

In [None]:
# 3. Evaluation
pred.select(col('prediction'),col('sentiment_idx')).show()