## Find Pyspark

In [None]:
import findspark
findspark.init()

### Start a Spark Session

In [None]:
from pyspark.context import SparkContext
sc = SparkContext('local')


#### TO stop a running spark session

In [None]:
#Execute if the following error occurs"
'''
Cannot run multiple SparkContexts at once;existing SparkContext(app=pyspark-shell, master=local)created by __init__ at <ipython-input-19-5796b8bfe42c>:3 
'''
sc.stop()

In [None]:
text_file = sc.textFile("data/wordcount.txt")
counts = text_file.flatMap(lambda line: line.split(" ")) \
             .map(lambda word: (word, 1)) \
             .reduceByKey(lambda a, b: a + b)
#counts.saveAsTextFile("hdfs://...")
output = counts.collect()
for (word, count) in output:
        print("%s: %i" % (word, count))

Unless: 1
required: 1
by: 1
applicable: 1
law: 1
or: 2
agreed: 1
to: 1
in: 1
writing,: 1
software: 1
distributed: 2
under: 2
the: 4
License: 2
is: 1
on: 1
an: 1
"AS: 1
IS": 1
BASIS,: 1
WITHOUT: 1
WARRANTIES: 1
OR: 1
CONDITIONS: 1
OF: 1
ANY: 1
KIND,: 1
either: 1
express: 1
implied.: 1
See: 1
for: 1
specific: 1
language: 1
governing: 1
permissions: 1
and: 1
limitations: 1
License.: 1


In [None]:
import numpy as np

from pyspark.mllib.stat import Statistics

mat = sc.parallelize(
    [np.array([1.0, 10.0, 100.0]), np.array([2.0, 20.0, 200.0]), np.array([3.0, 30.0, 300.0])]
)  # an RDD of Vectors

# Compute column summary statistics.
summary = Statistics.colStats(mat)
print(summary.mean())  # a dense vector containing the mean value for each column
print(summary.variance())  # column-wise variance
print(summary.numNonzeros())  # number of nonzeros in each column

[  2.  20. 200.]
[1.e+00 1.e+02 1.e+04]
[3. 3. 3.]


In [None]:
from pyspark.mllib.stat import Statistics

seriesX = sc.parallelize([1.0, 2.0, 3.0, 3.0, 5.0])  # a series
# seriesY must have the same number of partitions and cardinality as seriesX
seriesY = sc.parallelize([11.0, 22.0, 33.0, 33.0, 555.0])

# Compute the correlation using Pearson's method. Enter "spearman" for Spearman's method.
# If a method is not specified, Pearson's method will be used by default.
print("Correlation is: " + str(Statistics.corr(seriesX, seriesY, method="pearson")))

data = sc.parallelize(
    [np.array([1.0, 10.0, 100.0]), np.array([2.0, 20.0, 200.0]), np.array([5.0, 33.0, 366.0])]
)  # an RDD of Vectors

# calculate the correlation matrix using Pearson's method. Use "spearman" for Spearman's method.
# If a method is not specified, Pearson's method will be used by default.
print(Statistics.corr(data, method="pearson"))

Correlation is: 0.8500286768773004
[[1.         0.97888347 0.99038957]
 [0.97888347 1.         0.99774832]
 [0.99038957 0.99774832 1.        ]]


In [None]:
# Will start a spark session and then run programs
from pyspark.mllib.fpm import FPGrowth
# $example off$
from pyspark import SparkContext

if __name__ == "__main__":
    sc = SparkContext(appName="FPGrowth")

    # $example on$
    data = sc.textFile("data/mllib/fimi_sample.txt")
    transactions = data.map(lambda line: line.strip().split(' '))
    model = FPGrowth.train(transactions, minSupport=0.003, numPartitions=10)
    result = model.freqItemsets().collect()
    for fi in result:
        print(fi)
        
sc.stop()
    # $example off$

### FP growth-MLlib

In [None]:
from pyspark.ml.fpm import FPGrowth
# $example off$
from pyspark.sql import SparkSession

if __name__ == "__main__":
    spark = SparkSession\
        .builder\
        .appName("FPGrowthExample")\
        .getOrCreate()

    # $example on$
    df = spark.createDataFrame([
        (0, [1, 2, 5]),
        (1, [1, 2, 3, 5]),
        (2, [1, 2])
    ], ["id", "items"])

    fpGrowth = FPGrowth(itemsCol="items", minSupport=0.5, minConfidence=0.6)
    model = fpGrowth.fit(df)

    # Display frequent itemsets.
    model.freqItemsets.show()

    # Display generated association rules.
    model.associationRules.show()

    # transform examines the input items against all the association rules and summarize the
    # consequents as prediction
    model.transform(df).show()
    # $example off$

    spark.stop()

+---------+----+
|    items|freq|
+---------+----+
|      [1]|   3|
|      [2]|   3|
|   [2, 1]|   3|
|      [5]|   2|
|   [5, 2]|   2|
|[5, 2, 1]|   2|
|   [5, 1]|   2|
+---------+----+

+----------+----------+------------------+----+
|antecedent|consequent|        confidence|lift|
+----------+----------+------------------+----+
|    [5, 2]|       [1]|               1.0| 1.0|
|       [2]|       [1]|               1.0| 1.0|
|       [2]|       [5]|0.6666666666666666| 1.0|
|    [2, 1]|       [5]|0.6666666666666666| 1.0|
|       [5]|       [2]|               1.0| 1.0|
|       [5]|       [1]|               1.0| 1.0|
|    [5, 1]|       [2]|               1.0| 1.0|
|       [1]|       [2]|               1.0| 1.0|
|       [1]|       [5]|0.6666666666666666| 1.0|
+----------+----------+------------------+----+

+---+------------+----------+
| id|       items|prediction|
+---+------------+----------+
|  0|   [1, 2, 5]|        []|
|  1|[1, 2, 3, 5]|        []|
|  2|      [1, 2]|       [5]|
+---+--

### Decision Tree Classification: Gini Impurity

In [None]:
from pyspark import SparkContext
# $example on$
from pyspark.mllib.tree import DecisionTree, DecisionTreeModel
from pyspark.mllib.util import MLUtils
# $example off$

if __name__ == "__main__":

    sc = SparkContext(appName="PythonDecisionTreeClassificationExample")

    # $example on$
    # Load and parse the data file into an RDD of LabeledPoint.
    data = MLUtils.loadLibSVMFile(sc, 'data/mllib/sample_libsvm_data.txt')
    # Split the data into training and test sets (30% held out for testing)
    (trainingData, testData) = data.randomSplit([0.7, 0.3])

    # Train a DecisionTree model.
    #  Empty categoricalFeaturesInfo indicates all features are continuous.
    model = DecisionTree.trainClassifier(trainingData, numClasses=2, categoricalFeaturesInfo={},
                                         impurity='gini', maxDepth=5, maxBins=32)

    # Evaluate model on test instances and compute test error
    predictions = model.predict(testData.map(lambda x: x.features))
    labelsAndPredictions = testData.map(lambda lp: lp.label).zip(predictions)
    testErr = labelsAndPredictions.filter(
        lambda lp: lp[0] != lp[1]).count() / float(testData.count())
    print('Test Error = ' + str(testErr))
    print('Learned classification tree model:')
    print(model.toDebugString())

    # Save and load model
    model.save(sc, "target/tmp/myDecisionTreeClassificationModel")
    sameModel = DecisionTreeModel.load(sc, "target/tmp/myDecisionTreeClassificationModel")
    # $example off$
sc.stop()

Test Error = 0.037037037037037035
Learned classification tree model:
DecisionTreeModel classifier of depth 2 with 5 nodes
  If (feature 434 <= 70.5)
   If (feature 99 <= 35.0)
    Predict: 0.0
   Else (feature 99 > 35.0)
    Predict: 1.0
  Else (feature 434 > 70.5)
   Predict: 1.0



### Naive Bayes Classification

In [None]:
from pyspark.context import SparkContext
sc = SparkContext('local')

In [None]:
from pyspark.mllib.classification import NaiveBayes, NaiveBayesModel
from pyspark.mllib.util import MLUtils


# Load and parse the data file.
data = MLUtils.loadLibSVMFile(sc, "data/mllib/sample_libsvm_data.txt")

# Split data approximately into training (60%) and test (40%)
training, test = data.randomSplit([0.6, 0.4])

# Train a naive Bayes model.
model = NaiveBayes.train(training, 1.0)

# Make prediction and test accuracy.
predictionAndLabel = test.map(lambda p: (model.predict(p.features), p.label))
accuracy = 1.0 * predictionAndLabel.filter(lambda pl: pl[0] == pl[1]).count() / test.count()
print('model accuracy {}'.format(accuracy))
sc.stop()



model accuracy 1.0


In [None]:
import shutil

from pyspark import SparkContext
# $example on$
from pyspark.mllib.classification import NaiveBayes, NaiveBayesModel
from pyspark.mllib.util import MLUtils


# $example off$

if __name__ == "__main__":

    sc = SparkContext(appName="PythonNaiveBayesExample")

    # $example on$
    # Load and parse the data file.
    data = MLUtils.loadLibSVMFile(sc, "data/mllib/sample_libsvm_data.txt")

    # Split data approximately into training (60%) and test (40%)
    training, test = data.randomSplit([0.6, 0.4])

    # Train a naive Bayes model.
    model = NaiveBayes.train(training, 1.0)

    # Make prediction and test accuracy.
    predictionAndLabel = test.map(lambda p: (model.predict(p.features), p.label))
    accuracy = 1.0 * predictionAndLabel.filter(lambda pl: pl[0] == pl[1]).count() / test.count()
    print('model accuracy {}'.format(accuracy))

    # Save and load model
    output_dir = 'target/tmp/myNaiveBayesModel'
    shutil.rmtree(output_dir, ignore_errors=True)
    model.save(sc, output_dir)
    sameModel = NaiveBayesModel.load(sc, output_dir)
    predictionAndLabel = test.map(lambda p: (sameModel.predict(p.features), p.label))
    accuracy = 1.0 * predictionAndLabel.filter(lambda pl: pl[0] == pl[1]).count() / test.count()
    print('sameModel accuracy {}'.format(accuracy))
    sc.stop()

    # $example off$

model accuracy 0.975
sameModel accuracy 0.975
