## Titanic Data Analysis
### Shreyas Kulkarni-sk385
### Soham Gupte- sohamgup

### Reading and cleaning data

In [25]:
from pyspark.context import SparkContext
from pyspark.sql.session import SparkSession
from pyspark.sql import SQLContext
sc.stop()
sc = SparkContext('local')
sqlContext = SQLContext(sc)
spark = SparkSession(sc)
from pyspark.ml.feature import HashingTF, IDF, Tokenizer
line=sc.wholeTextFiles("/home/shreyas/Lab3data/sparkdata")
from pyspark.ml.feature import StopWordsRemover
from pyspark.sql import Row
from pyspark.ml.feature import Tokenizer, RegexTokenizer
from pyspark.sql.functions import col, udf
from pyspark.sql.types import IntegerType
import re
a=line.collect()
for i in range(len(a)):
    a[i]=list(a[i])
    a[i][0]=re.sub(r'\d+', '',a[i][0])
    a[i]=tuple(a[i])
rdd = sc.parallelize(a)
people = rdd.map(lambda x: Row(label=x[0], sentence=(x[1])))
schemaPeople = sqlContext.createDataFrame(people)
regexTokenizer = RegexTokenizer(inputCol="sentence", outputCol="words", pattern="\\W")

regexTokenized = regexTokenizer.transform(schemaPeople)
#regexTokenized.show(2)

#### Data is read using wholeTextFiles. This function can be used to read all the files present in the given directory. It returns a list of tuples. Each tuple contains address of file and data. Data is cleaned and seperated by tokenizer. Stopwords, digits and symbols are removed. 

In [26]:

remover = StopWordsRemover(inputCol="words", outputCol="filtered")
sw=remover.transform(regexTokenized)


# tokenizer = Tokenizer(inputCol="sentence", outputCol="words")
# wordsData = tokenizer.transform(sentenceData)

hashingTF = HashingTF(inputCol="filtered", outputCol="rawFeatures", numFeatures=20)
featurizedData = hashingTF.transform(sw)
# alternatively, CountVectorizer can also be used to get term frequency vectors

idf = IDF(inputCol="rawFeatures", outputCol="features")
idfModel = idf.fit(featurizedData)
rescaledData = idfModel.transform(featurizedData)
rescaledData.show(10)



+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+
|               label|            sentence|               words|            filtered|         rawFeatures|            features|
+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+
|file:/home/shreya...|AdvertisementSupp...|[advertisementsup...|[advertisementsup...|(20,[0,1,2,3,4,5,...|(20,[0,1,2,3,4,5,...|
|file:/home/shreya...|AdvertisementBy M...|[advertisementby,...|[advertisementby,...|(20,[0,1,2,3,4,5,...|(20,[0,1,2,3,4,5,...|
|file:/home/shreya...|AdvertisementSupp...|[advertisementsup...|[advertisementsup...|(20,[0,1,2,3,4,5,...|(20,[0,1,2,3,4,5,...|
|file:/home/shreya...|AdvertisementWith...|[advertisementwit...|[advertisementwit...|(20,[0,1,2,3,4,5,...|(20,[0,1,2,3,4,5,...|
|file:/home/shreya...|AdvertisementSupp...|[advertisementsup...|[advertisementsup...|(20,[0,1,2,3,4,5,..

#### TF-IDF is applied on the clean data which helps to get features for the words present in the file. It returns importance of every word.

In [27]:
from pyspark.sql.functions import UserDefinedFunction
import re
from pyspark.sql.types import *
name='label'
# udf = UserDefinedFunction(lambda x: re.sub('file:/home/shreyas/Lab3data/try/','',str(x)), StringType())
# new_df = rescaledData.select(*[udf(column).alias(name) if column == name else column for column in rescaledData.columns])
udf = UserDefinedFunction(lambda x: re.sub('.txt','',str(x)), StringType())
new_df = rescaledData.select(*[udf(column).alias(name) if column == name else column for column in rescaledData.columns])

udf1 = UserDefinedFunction(lambda x: re.sub('file:/home/shreyas/sparkdata/','',str(x)), StringType())
re = new_df.select(*[udf1(column).alias(name) if column == name else column for column in new_df.columns])

re.select("label","features").show(2)
final=re.select("label","features")

+--------------------+--------------------+
|               label|            features|
+--------------------+--------------------+
|file:/home/shreya...|(20,[0,1,2,3,4,5,...|
|file:/home/shreya...|(20,[0,1,2,3,4,5,...|
+--------------------+--------------------+
only showing top 2 rows



#### The filename returned by wholeTextFile is used as label. It returns the absolute address of file. So the absolute address is removed and only label part is kept.(We have saved files with the labelname.txt)

## Divide data in train and test set

###### Cleaned data is divided into train and test set by the ratio of 4:1

## Convert string labels to integer

##### Every label has been assigned by a integer. 

In [28]:
from pyspark.ml import Pipeline
from pyspark.ml.classification import DecisionTreeClassifier
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml.feature import IndexToString, StringIndexer, VectorIndexer

indexer = StringIndexer(inputCol="label", outputCol="categoryIndex")
indexed = indexer.fit(final).transform(final)
indexed.show()


# Fit on whole dataset to include all labels in index.
featureIndexer =\
    VectorIndexer(inputCol="features", outputCol="indexedFeatures", maxCategories=4).fit(indexed)

# Split the data into training and test sets (30% held out for testing)
(train, test) = indexed.randomSplit([0.8, 0.2])


+--------------------+--------------------+-------------+
|               label|            features|categoryIndex|
+--------------------+--------------------+-------------+
|file:/home/shreya...|(20,[0,1,2,3,4,5,...|          0.0|
|file:/home/shreya...|(20,[0,1,2,3,4,5,...|          0.0|
|file:/home/shreya...|(20,[0,1,2,3,4,5,...|          1.0|
|file:/home/shreya...|(20,[0,1,2,3,4,5,...|          1.0|
|file:/home/shreya...|(20,[0,1,2,3,4,5,...|          1.0|
|file:/home/shreya...|(20,[0,1,2,3,4,5,...|          1.0|
|file:/home/shreya...|(20,[0,1,2,3,4,5,...|          0.0|
|file:/home/shreya...|(20,[0,1,2,3,4,5,...|          3.0|
|file:/home/shreya...|(20,[0,1,2,3,4,5,...|          0.0|
|file:/home/shreya...|(20,[0,1,2,3,4,5,...|          1.0|
|file:/home/shreya...|(20,[0,1,2,3,4,5,...|          0.0|
|file:/home/shreya...|(20,[0,1,2,3,4,5,...|          0.0|
|file:/home/shreya...|(20,[0,1,2,3,4,5,...|          2.0|
|file:/home/shreya...|(20,[0,1,2,3,4,5,...|          0.0|
|file:/home/sh

### Logistic regression using pipelining

In [29]:
from pyspark.ml.feature import StringIndexer
from pyspark.ml.classification import LogisticRegression

indexed.show()
lr = LogisticRegression(labelCol="categoryIndex", featuresCol="features", maxIter=10)

# Train model with Training Data
lrModel = lr.fit(train)
print("Coefficients: \n" + str(lrModel.coefficientMatrix))
print("Intercept: " + str(lrModel.interceptVector))


# Convert indexed labels back to original labels.
labelConverter = IndexToString(inputCol="prediction", outputCol="predictedLabel",
                               labels=labelIndexer.labels)

# Chain indexers and forest in a Pipeline
pipelinelr = Pipeline(stages=[labelIndexer, featureIndexer, lr, labelConverter])

# Train model.  This also runs the indexers.
lrModel = pipelinelr.fit(train)


predictions =lrModel.transform(test)


# Select example rows to display.
predictions.select("prediction", "features","predictedLabel").show(10)

# Select (prediction, true label) and compute test error
evaluator = MulticlassClassificationEvaluator(
    labelCol="categoryIndex", predictionCol="prediction", metricName="accuracy")
accuracy = evaluator.evaluate(predictions)
print("Test Error = %g " % (1.0 - accuracy))



+--------------------+--------------------+-------------+
|               label|            features|categoryIndex|
+--------------------+--------------------+-------------+
|file:/home/shreya...|(20,[0,1,2,3,4,5,...|          0.0|
|file:/home/shreya...|(20,[0,1,2,3,4,5,...|          0.0|
|file:/home/shreya...|(20,[0,1,2,3,4,5,...|          1.0|
|file:/home/shreya...|(20,[0,1,2,3,4,5,...|          1.0|
|file:/home/shreya...|(20,[0,1,2,3,4,5,...|          1.0|
|file:/home/shreya...|(20,[0,1,2,3,4,5,...|          1.0|
|file:/home/shreya...|(20,[0,1,2,3,4,5,...|          0.0|
|file:/home/shreya...|(20,[0,1,2,3,4,5,...|          3.0|
|file:/home/shreya...|(20,[0,1,2,3,4,5,...|          0.0|
|file:/home/shreya...|(20,[0,1,2,3,4,5,...|          1.0|
|file:/home/shreya...|(20,[0,1,2,3,4,5,...|          0.0|
|file:/home/shreya...|(20,[0,1,2,3,4,5,...|          0.0|
|file:/home/shreya...|(20,[0,1,2,3,4,5,...|          2.0|
|file:/home/shreya...|(20,[0,1,2,3,4,5,...|          0.0|
|file:/home/sh

### Random Forest

In [30]:
from pyspark.ml import Pipeline
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.feature import IndexToString, StringIndexer, VectorIndexer
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
labelIndexer = StringIndexer(inputCol="label", outputCol="indexedLabel").fit(final)
labelConverter = IndexToString(inputCol="prediction", outputCol="predictedLabel",
                               labels=labelIndexer.labels)

# Automatically identify categorical features, and index them.
# Set maxCategories so features with > 4 distinct values are treated as continuous.
featureIndexer =\
    VectorIndexer(inputCol="features", outputCol="indexedFeatures", maxCategories=4).fit(final)


# Train a RandomForest model.
rf = RandomForestClassifier(labelCol="indexedLabel", featuresCol="indexedFeatures", numTrees=10)

# Convert indexed labels back to original labels.
# Chain indexers and forest in a Pipeline
pipelinerf = Pipeline(stages=[labelIndexer, featureIndexer, rf, labelConverter])

# Train model.  This also runs the indexers.
rfmodel = pipelinerf.fit(train)

# Make predictions.
predictions = rfmodel.transform(test)

# Select example rows to display.
predictions.select("predictedLabel", "label", "features").show(5)

# Select (prediction, true label) and compute test error
evaluator = MulticlassClassificationEvaluator(
    labelCol="indexedLabel", predictionCol="prediction", metricName="accuracy")
accuracy = evaluator.evaluate(predictions)
print("Test Error = %g" % (1.0 - accuracy))

rfModel = rfmodel.stages[2]
print(rfModel)  # summary only

+--------------------+--------------------+--------------------+
|      predictedLabel|               label|            features|
+--------------------+--------------------+--------------------+
|file:/home/shreya...|file:/home/shreya...|(20,[0,1,2,3,4,5,...|
|file:/home/shreya...|file:/home/shreya...|(20,[0,1,2,3,4,5,...|
|file:/home/shreya...|file:/home/shreya...|(20,[0,1,2,3,4,5,...|
|file:/home/shreya...|file:/home/shreya...|(20,[0,1,2,3,4,5,...|
|file:/home/shreya...|file:/home/shreya...|(20,[0,1,2,3,4,5,...|
+--------------------+--------------------+--------------------+
only showing top 5 rows

Test Error = 0.455224
RandomForestClassificationModel (uid=RandomForestClassifier_4fcf8b790b902673167f) with 10 trees


### Naive Bayes

In [31]:
from pyspark.ml.classification import NaiveBayes
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
#import LabeledPoint


# create the trainer and set its parameters
nb = NaiveBayes(smoothing=1.0, modelType="multinomial",labelCol="categoryIndex")

# train the model

# Chain indexers and forest in a Pipeline
pipelinenb = Pipeline(stages=[labelIndexer, featureIndexer, nb, labelConverter])

# Train model.  This also runs the indexers.
nbmodel = pipelinenb.fit(train)


# select example rows to display.
predictions = nbmodel.transform(test)
predictions.select("label","predictedLabel").show()

# compute accuracy on the test set
evaluator = MulticlassClassificationEvaluator(labelCol="categoryIndex", predictionCol="prediction",
                                              metricName="accuracy")
accuracy = evaluator.evaluate(predictions)
print("Test set accuracy = " + str(accuracy))

+--------------------+--------------------+
|               label|      predictedLabel|
+--------------------+--------------------+
|file:/home/shreya...|file:/home/shreya...|
|file:/home/shreya...|file:/home/shreya...|
|file:/home/shreya...|file:/home/shreya...|
|file:/home/shreya...|file:/home/shreya...|
|file:/home/shreya...|file:/home/shreya...|
|file:/home/shreya...|file:/home/shreya...|
|file:/home/shreya...|file:/home/shreya...|
|file:/home/shreya...|file:/home/shreya...|
|file:/home/shreya...|file:/home/shreya...|
|file:/home/shreya...|file:/home/shreya...|
|file:/home/shreya...|file:/home/shreya...|
|file:/home/shreya...|file:/home/shreya...|
|file:/home/shreya...|file:/home/shreya...|
|file:/home/shreya...|file:/home/shreya...|
|file:/home/shreya...|file:/home/shreya...|
|file:/home/shreya...|file:/home/shreya...|
|file:/home/shreya...|file:/home/shreya...|
|file:/home/shreya...|file:/home/shreya...|
|file:/home/shreya...|file:/home/shreya...|
|file:/home/shreya...|file:/home

### Applying classifiers on the validation sets
#### A validation set is being used. We used 'trump' as the topic for validation set which we though might get classified in Diplomacy and technology considering recent activities. We have got reasonably satisfactory results.

In [32]:
#Validation Sets:
from pyspark.ml.feature import HashingTF, IDF, Tokenizer
line=sc.wholeTextFiles("/home/shreyas/Lab3data/ValSet")
from pyspark.ml.feature import StopWordsRemover
from pyspark.sql import Row
from pyspark.ml.feature import Tokenizer, RegexTokenizer
from pyspark.sql.functions import col, udf
from pyspark.sql.types import IntegerType
import re
a=line.collect()

rdd = sc.parallelize(a)
people = rdd.map(lambda x: Row(sentence=(x[1])))
schemaPeople = sqlContext.createDataFrame(people)

regexTokenized = regexTokenizer.transform(schemaPeople)
remover = StopWordsRemover(inputCol="words", outputCol="filtered")
sw=remover.transform(regexTokenized)

hashingTF = HashingTF(inputCol="filtered", outputCol="rawFeatures", numFeatures=20)
featurizedData = hashingTF.transform(sw)

idf = IDF(inputCol="rawFeatures", outputCol="features")
idfModel = idf.fit(featurizedData)
valData = idfModel.transform(featurizedData)
#rescaledData.show(truncate=False)

print("Random Forest")
predictionsvalrf = rfmodel.transform(valData)
predictionsvalrf.select('prediction','predictedLabel').show()

print("Naive Bayes")
predictionsvalnb = nbmodel.transform(valData)
predictionsvalnb.select('prediction','predictedLabel').show()

print("Logistic regression")
predictionsvallr = lrModel.transform(valData)
predictionsvallr.select('prediction','predictedLabel').show()


Random Forest
+----------+--------------------+
|prediction|      predictedLabel|
+----------+--------------------+
|       1.0|file:/home/shreya...|
|       2.0|file:/home/shreya...|
|       1.0|file:/home/shreya...|
|       2.0|file:/home/shreya...|
|       2.0|file:/home/shreya...|
|       2.0|file:/home/shreya...|
|       1.0|file:/home/shreya...|
|       2.0|file:/home/shreya...|
|       1.0|file:/home/shreya...|
|       1.0|file:/home/shreya...|
|       1.0|file:/home/shreya...|
|       1.0|file:/home/shreya...|
|       2.0|file:/home/shreya...|
|       2.0|file:/home/shreya...|
|       1.0|file:/home/shreya...|
|       1.0|file:/home/shreya...|
|       2.0|file:/home/shreya...|
|       2.0|file:/home/shreya...|
|       1.0|file:/home/shreya...|
|       1.0|file:/home/shreya...|
+----------+--------------------+
only showing top 20 rows

Naive Bayes
+----------+--------------------+
|prediction|      predictedLabel|
+----------+--------------------+
|       0.0|file:/home/shreya.