# Note:  PySpark

## What is Spark

    - a platform for cluster computing
        - spread data and computations over clusters with multiple nodes 
        - splitting up your data makes it easier to work with very large datasets because each node only works with a small amount of data.
    -  parallel computation 
        - As each node works on its own subset of the total data, it also carries out a part of the total calculations required
        - both data processing and computation are performed in parallel over the nodes in the cluster. 
        
## Cluster
      - Cluster will be hosted on a remote machine that's connected to all other nodes. 
      - There will be one computer, called the **master** : splitting up the data and the computations.
      - The master is connected to the rest of the computers in the cluster, which are called slaves 
      - The master sends the slaves data and calculations to run, and they send their results back to the master.
      
## [Connection to spark](http://spark.apache.org/docs/2.1.0/api/python/pyspark.html)
      - Creating the connection is as simple as creating an instance of the `SparkContext` class. 
      - The class constructor takes a few optional arguments that allow you to specify the attributes of the cluster you're connecting to
      - An object holding all these attributes can be created with the SparkConf() constructor. 
      
##  Core data structure 
       - Resilient Distributed Dataset (RDD)
           - a set of (key, value) pairs
           - a  low level object
        - Spark DataFrame abstraction built on top of RDDs
           - behave a lot like a SQL table 
           - more optimized for complicated operations than RDDs
           - create a SparkSession object from your SparkContext. 
                  - SparkContext as your connection to the cluster 
                  - SparkSession as your interface with that connection.

# 1st Step: connect to a cluster
    
 - cluster will be hosted on a **remote machine** that's connected to all other nodes. 
 - There will be one computer, called the **master** that manages splitting up the data and the computations.
 - The master is connected to the rest of the computers in the cluster, which are called **slaves**.
 - master sends the slaves data and calculations to run, and they send their results back to the master.
 - `SparkConf()`

In [1]:
import pyspark
pyspark.SparkContext.version

<property at 0x7f3e05739a98>

# Note
  - Spark's core data structure is the Resilient Distributed Dataset (RDD)
  
# Creating a SparkSession


In [4]:
# Import SparkSession from pyspark.sql
from pyspark.sql import SparkSession

# Create my_spark
my_spark = SparkSession.builder.getOrCreate()

In [5]:
import findspark
findspark.init()
import pyspark as ps
import warnings
from pyspark.sql import SQLContext


In [6]:
from pyspark.sql import SparkSession
# initialise sparkContext
spark = SparkSession.builder \
    .master('local') \
    .appName('muthootSample1') \
    .config('spark.executor.memory', '5gb') \
    .config("spark.cores.max", "6") \
    .getOrCreate()

sc = spark.sparkContext

# using SQLContext to read parquet file
from pyspark.sql import SQLContext
sqlContext = SQLContext(sc)

In [7]:
df = sqlContext.read.format('com.databricks.spark.csv').options(header='true', inferschema='true').load('clean_tweet.csv')


In [8]:
type(df)

pyspark.sql.dataframe.DataFrame

## Viewing tables
    - SparkSession has an attribute called catalog which lists all the data inside the cluster. This attribute has a few methods for extracting different pieces of information.

One of the most useful is the .listTables() method, which returns the names of all the tables in your cluster as a list.

In [9]:
# Print the tables in the catalog
print(spark.catalog.listTables())

[]


In [11]:
df.show()

+---+--------------------+------+
|_c0|                text|target|
+---+--------------------+------+
|  0|awww that s a bum...|     0|
|  1|is upset that he ...|     0|
|  2|i dived many time...|     0|
|  3|my whole body fee...|     0|
|  4|no it s not behav...|     0|
|  5|  not the whole crew|     0|
|  6|          need a hug|     0|
|  7|hey long time no ...|     0|
|  8|k nope they didn ...|     0|
|  9|        que me muera|     0|
| 10|spring break in p...|     0|
| 11|i just re pierced...|     0|
| 12|i couldn t bear t...|     0|
| 13|it it counts idk ...|     0|
| 14|i would ve been t...|     0|
| 15|i wish i got to w...|     0|
| 16|hollis death scen...|     0|
| 17| about to file taxes|     0|
| 18|ahh ive always wa...|     0|
| 19|oh dear were you ...|     0|
+---+--------------------+------+
only showing top 20 rows



In [12]:
df = df.dropna()
df.count()

1596753

In [14]:
(train_set, val_set, test_set) = df.randomSplit([0.6, 0.2, 0.2], seed = 2000)

In [15]:
from pyspark.ml.feature import HashingTF, IDF, Tokenizer
from pyspark.ml.feature import StringIndexer
from pyspark.ml import Pipeline

tokenizer = Tokenizer(inputCol="text", outputCol='words')
hashtf = HashingTF(numFeatures=2**16, inputCol='words', outputCol='tf')
idf = IDF(inputCol='tf', outputCol='features', minDocFreq=5)#minDocFreq: remove sparse terms
label_stringIdx = StringIndexer(inputCol='target', outputCol='label')
pipeline = Pipeline(stages=[tokenizer, hashtf, idf, label_stringIdx])

pipelineFit = pipeline.fit(train_set)
train_df = pipelineFit.transform(train_set)
val_df = pipelineFit.transform(val_set)
train_df.show(5)

+---+--------------------+------+--------------------+--------------------+--------------------+-----+
|_c0|                text|target|               words|                  tf|            features|label|
+---+--------------------+------+--------------------+--------------------+--------------------+-----+
|  0|awww that s a bum...|     0|[awww, that, s, a...|(65536,[8436,8847...|(65536,[8436,8847...|  1.0|
|  1|is upset that he ...|     0|[is, upset, that,...|(65536,[1444,2071...|(65536,[1444,2071...|  1.0|
|  2|i dived many time...|     0|[i, dived, many, ...|(65536,[2548,2888...|(65536,[2548,2888...|  1.0|
|  3|my whole body fee...|     0|[my, whole, body,...|(65536,[158,11650...|(65536,[158,11650...|  1.0|
|  4|no it s not behav...|     0|[no, it, s, not, ...|(65536,[1968,4488...|(65536,[1968,4488...|  1.0|
+---+--------------------+------+--------------------+--------------------+--------------------+-----+
only showing top 5 rows



In [16]:
from pyspark.ml.classification import LogisticRegression
lr = LogisticRegression(maxIter=10)
lrModel = lr.fit(train_df)
predictions = lrModel.transform(val_df)

from pyspark.ml.evaluation import BinaryClassificationEvaluator
evaluator = BinaryClassificationEvaluator(rawPredictionCol="rawPrediction")
evaluator.evaluate(predictions)

0.8562990298888343

In [19]:
accuracy = predictions.filter(predictions.label == predictions.prediction).count() / float(val_set.count())
display(accuracy)

0.7936414031858717

In [18]:
%%time
from pyspark.ml.feature import CountVectorizer

tokenizer = Tokenizer(inputCol='text', outputCol='words')
cv = CountVectorizer(vocabSize=2**16, inputCol='words', outputCol='cv')
idf = IDF(inputCol='cv', outputCol='features', minDocFreq=5)
label_stringIdx = StringIndexer(inputCol='target', outputCol='label')
lr = LogisticRegression(maxIter=10)
pipeline = Pipeline(stages=[tokenizer, cv, idf, label_stringIdx, lr])

pipelineFit = pipeline.fit(train_set)
predictions = pipelineFit.transform(val_set)
accuracy = predictions.filter(predictions.label == predictions.prediction).count() / float(val_set.count())
roc_auc = evaluator.evaluate(predictions)


print("Accuracy Score: {0:.4f}".format(accuracy))
print("ROC-AUC: {0:.4f}".format(roc_auc))

Accuracy Score: 0.7936
ROC-AUC: 0.8627
CPU times: user 70 ms, sys: 40 ms, total: 110 ms
Wall time: 1min 19s


In [20]:
from pyspark.ml.feature import NGram, VectorAssembler
from pyspark.ml.feature import ChiSqSelector

def build_trigrams(inputCol=['text', 'target'], n=3):
    tokenizer = [Tokenizer(inputCol='text', outputCol='words')]
    ngrams = [
        NGram(n=i, inputCol='words', outputCol='{0}_grams'.format(i)) for i in range(1, n+1)
    ]
    
    cv = [
        CountVectorizer(vocabSize=2**14, inputCol="{0}_grams".format(i), outputCol="{0}_tf".format(i))
        for i in range(1, n+1)
    ]
    
    idf = [
        IDF(inputCol="{0}_tf".format(i), outputCol="{0}_tfidf".format(i), minDocFreq=5)
        for i in range(1, n+1)
    ]
    
    assembler = [
        VectorAssembler(inputCols=["{0}_tfidf".format(i) for i in range(1, n+1)], outputCol = "rawFeatures")
    ]
    
    label_stringIdx = [
        StringIndexer(inputCol="target", outputCol="label")
    ]
    
    selector = [
        ChiSqSelector(numTopFeatures=2**14, featuresCol='rawFeatures', outputCol="features")
    ]
    
    lr = [LogisticRegression(maxIter=10)]
    
    return Pipeline(stages=tokenizer + ngrams + cv + idf+ assembler + label_stringIdx + selector + lr)

In [21]:
%%time
trigram_pipelineFit = build_trigrams().fit(train_set)
predictions = trigram_pipelineFit.transform(val_set)
accuracy = predictions.filter(predictions.label == predictions.prediction).count() / float(val_set.count())

# print accuracy, roc_auc
print("Accuracy Score: {0:.4f}".format(accuracy))
print("ROC-AUC: {0:.4f}".format(roc_auc))


KeyboardInterrupt: 

In [33]:
def build_trigrams_wo_chi(inputCol=['text', 'target'], n=3):
    tokenizer = [Tokenizer(inputCol='text', outputCol='words')]
    ngrams = [
        NGram(n=i, inputCol='words', outputCol='{0}_grams'.format(i))
        for i in range(1, n+1)
    ]
    
    cv = [
        CountVectorizer(vocabSize=2**14, inputCol="{0}_grams".format(i), outputCol="{0}_tf".format(i))
        for i in range(1, n+1)
    ]
    
    idf = [
        IDF(inputCol="{0}_tf".format(i), outputCol="{0}_tfidf".format(i), minDocFreq=5)
        for i in range(1, n+1)
    ]
    
    assembler = [
        VectorAssembler(inputCols=["{0}_tfidf".format(i) for i in range(1, n+1)], outputCol = "rawFeatures")
    ]
    
    label_stringIdx = [
        StringIndexer(inputCol="target", outputCol="label")
    ]
    
    
    lr = [LogisticRegression(maxIter=10)]
    
    return Pipeline(stages=tokenizer + ngrams + cv + idf+ assembler + label_stringIdx + lr)

In [34]:
%%time
trigramwocs_pipelineFit = build_trigrams_wo_chi().fit(train_set)
predictions_wocs = trigramwocs_pipelineFit.transform(val_set)
accuracy_wocs = predictions_wocs.filter(predictions_wocs.label == predictions_wocs.prediction).count() / float(val_set.count())
roc_auc_wocs = evaluator.evaluate(predictions_wocs)
# print accuracy, roc_auc
print("Accuracy Score: {0:.4f}".format(accuracy_wocs))
print("ROC-AUC: {0:.4f}".format(roc_auc_wocs))

IllegalArgumentException: 'Field "features" does not exist.\nAvailable fields: _c0, text, target, words, 1_grams, 2_grams, 3_grams, 1_tf, 2_tf, 3_tf, 1_tfidf, 2_tfidf, 3_tfidf, rawFeatures, label'

In [None]:
test_predictions = trigramwocs_pipelineFit.transform(test_set)
test_accuracy = test_predictions.filter(test_predictions.label == test_predictions.prediction).count() / float(test_set.count())
test_roc_auc = evaluator.evaluate(test_predictions)
# print accuracy, roc_auc
print("Accuracy Score: {0:.4f}".format(test_accuracy))
print("ROC-AUC: {0:.4f}".format(test_roc_auc))