In [None]:
# import for pyspark sql
from pyspark.sql import SparkSession 
from pyspark.sql import Row, SQLContext
from pyspark.sql.types import *
from pyspark.conf import SparkConf

# imports for streaming
from pyspark import SparkContext 
from pyspark.streaming import StreamingContext
from pyspark import StorageLevel

# mllib imports 
from pyspark.mllib.regression import LabeledPoint
from pyspark.mllib.classification import LogisticRegressionWithLBFGS
from pyspark.mllib.feature import HashingTF

import time
import pyspark


In [None]:
# Create Spark Session
ss = SparkSession \
        .builder \
        .appName("MyExamples") \
        .config("spark.some.config.option", "some-value") \
        .getOrCreate()

In [None]:
# define Spark Context
sc = SparkContext("local[2]", "my app")..getOrCreate()

In [None]:
sc = SparkContext("local[2]", "NetworkWordCount").getOrCreate()

# Opening files
f1 = open("SMSSpamCollection", "r")
f2 = open("spam.txt", "w")
f3 = open("ham.txt", "w")


# parallelize input file (all emails)
originalLines = sc.parallelize(f1)

# creating temp RDDs
tempSpam = originalLines.filter(lambda x: "spam" in x)
tempHam = originalLines.filter(lambda x: "ham" in x)

# eliminate non relevant features
spamRdd = tempSpam.map(lambda x: x.replace("spam\t", ""))
hamRdd = tempHam.map(lambda x: x.replace("ham\t", ""))

# collecting RDDs to lists
spam = spamRdd.collect()
ham = hamRdd.collect()

# writing lists to files
for i in spam:
    f2.write(i)
    f2.write("\n")

for i in ham:
    f3.write(i)
    f3.write("\n")

# parallelize input files
spam = sc.textFile("spam.txt")
ham = sc.textFile("ham.txt")

# Create a HashingTF instance to map email text to vectors of 10,000 features.
tf = HashingTF(numFeatures=10000)

# Each email is split into words, and each word is mapped to one feature.
spamFeatures = spam.map(lambda email: tf.transform(email.split(" ")))
hamFeatures = ham.map(lambda email: tf.transform(email.split(" ")))

# Create LabeledPoint data sets for positive (spam) and negative (ham) examples.
positiveExamples = spamFeatures.map(lambda features: LabeledPoint(1, features))
negativeExamples = hamFeatures.map(lambda features: LabeledPoint(0, features))
trainingData = positiveExamples.union(negativeExamples)

# Cache since Logistic Regression is an iterative algorithm.
trainingData.cache()

# Run Logistic Regression using the SGD algorithm.
model = LogisticRegressionWithLBFGS.train(trainingData)

# Test on a positive example (spam) and a negative one (normal).
posTest = tf.transform("Customer service annoncement."
                       " You have a New Years delivery waiting for you. Please call 07046744435 now to arrange delivery".split(" "))
negTest = tf.transform("500 New Mobiles from 2004, MUST GO! Txt: NOKIA to No:"
                       " 89545 & collect yours today!From ONLY £1 www.4-tc.biz 2optout 087187262701.50gbp/mtmsg18".split(" "))
print("Prediction for positive test example: %g" % model.predict(posTest))
print("Prediction for negative test example: %g" % model.predict(negTest))


In [None]:
# read and parallelize lines in text file
linesRdd = sc.textFile("SMSSpamCollection")
linesRdd.persist(StorageLevel.MEMORY_AND_DISK)

"""
Basic actions on rdd
"""
# show first line
print(linesRdd.first())

# filter lines
spamRdd = linesRdd.filter(lambda line: "spam" in line)
hamRdd = linesRdd.filter(lambda line: "ham" in line)

# taking and colecting number of lines to list
print(unitedRdd.take(2))

# eliminate non relevant features
spamRdd = spamRdd.map(lambda x: x.replace("spam\t", ""))
hamRdd = hamRdd.map(lambda x: x.replace("ham\t", ""))

#While FlatMap() is similar to Map, but FlatMap allows returning 0, 1 or more elements from map function.
wordsRDD = hamRdd.flatMap(lambda line: line.split(" "))

# combining words with reduce toa single variale
combinneWords = wordsRDD.reduce(lambda x, y: x + y)

# smple words whith 0.5 propobolity for every word to be samled
wordsRDD = wordsRDD.sample(False, 0.5)

# get distict words
wordsRDD = wordsRDD.distinct()

# count number of words in rdd
wordsRDD.count()

# count number of times each element acures in rdd
wordsRDD.countByValue()

# collect rdd 
words = wordsRDD.collect()

"""
Numeric RDD Operations
count() Number of elements in the RDD
mean() Average of the elements
sum() Total
max() Maximum value
min() Minimum value
variance() Variance of the elements
sampleVariance() Variance of the elements, computed for a sample
stdev() Standard deviation
sampleStdev() Sample standard deviation
"""

"""
Transformations on one pair RDD  	 {(1, 2), (3, 4), (3, 6)}
rdd.reduceByKey((x, y) => x + y)	{(1,2), (3,10)}
rdd.groupByKey() 			{(1, [2]), (3, [4,6])}
rdd.mapValues(x => x+1) 		{(1,3), (3,5), (3,7)}
flatMapValues(func)
rdd.flatMapValues(x => (x to 5) {(1, 2), (1,3), (1, 4), (1, 5), (3, 4), (3,5)}
keys() 					{1, 3, 3}
values()					{2, 4, 6}
sortByKey()				{(1, 2), (3, 4), (3, 6)}
"""

"""
Transformations on two pair RDDs  (rdd = {(1, 2), (3, 4), (3, 6)} other = {(3, 9)})
rdd.subtractByKey(other) 	{(1, 2)}
rdd.join(other) {(3, (4, 9)), (3, (6, 9))}
rdd.rightOuterJoin(other) {(3,(Some(4),9)), (3,(Some(6),9))}
rdd.leftOuterJoin(other) {(1,(2,None)), (3, (4,Some(9))), (3, (6,Some(9)))}
rdd.cogroup(other) {(1,([2],[])), (3, ([4, 6],[9]))}
"""

"""
Actions between two rdd's
"""
# union of two rdd's
unitedRdd = spamRdd.union(hamRdd)

# intersection of rdd's
intersectRdd = spamRdd.intersection(hamRdd)

# substarct rdd's
subtractRdd = spamRdd.subtract(hamRdd)

# cortasian union of rdd's
cortasianRDD = spamRdd.cartesian(hamRdd)


In [None]:
"""
Accumulator empty line count

Accumulators work as follows:
We create them in the driver by calling the SparkContext.
accumulator(initial Value) method, which produces an accumulator holding an initial value.
The return type is an org.apache.spark.Accumulator[T] object, where T is the type of initialValue.

"""
# Create Accumulator[Int] initialized to 0
blankLines = sc.accumulator(0)
n = 0
def extractCallSigns(line):
    if (line.contains("*/a*/")):
        blankLines += 1
    return line.split(" ")
callSigns = linesRdd.flatMap(extractCallSigns)
print("Blank lines: %d" % blankLines.value)

In [None]:
"""
Broadcast

Broadcast variables are read-only shared variables that are cached and available on all nodes in a cluster
in-order to access or use by the tasks. Instead of sending this data along with every task,
PySpark distributes broadcast variables to the workers using efficient broadcast algorithms to reduce
communication costs.
"""

spark = SparkSession.builder.appName('My app').getOrCreate()

states = {"NY":"New York", "CA":"California", "FL":"Florida"}
broadcastStates = spark.sparkContext.broadcast(states)

data = [("James","Smith","USA","CA"),
    ("Michael","Rose","USA","NY"),
    ("Robert","Williams","USA","CA"),
    ("Maria","Jones","USA","FL")
  ]

columns = ["firstname","lastname","country","state"]
df = spark.createDataFrame(data = data, schema = columns)
df.printSchema()
df.show(truncate=False)

def state_convert(code):
    return broadcastStates.value[code]

result = df.rdd.map(lambda x: (x[0],x[1],x[2],state_convert(x[3]))).toDF(columns)

# df.show() is only for spark DataFrame
# truncate=False will show you the full column content
result.show(truncate=False)

In [None]:
"""
Piping to External Programs

Spark provides a pipe() method on RDDs. Spark’s pipe() lets us write parts of jobs using any
language we want as long as it can read and write to Unix standard streams.
"""

#Compute the distance of each call using an external R program
distScript = "./src/R/finddistance.R"
distScriptName = "finddistance.R"
sc.addFile(distScript)

pipeInputs =    #Some RDD
distances = pipeInputs.pipe(SparkFiles.get(distScriptName))
print distances.collect()