This tutorial demonstrates how to work with the basics of Spark by creating RDDs and performing basic operations on them. You will aso learn to do operations of spark dataframe and querry using SparkSQL. Finally, two examples utilizing Spark Mllib are also included.

## RDD Operations: wordcount

In this example, we are couting how many times each word appears in a file called README.md. The fisrt step is to create a RDD from the data file called README.md. We will do some simple operations like count, take, collect on the RDD. Then we will use transfomations like filter, flatmap and map to get the wordcount.

In [None]:
data = sc.textFile("/users/PZS0645/support/workshop/Bigdata/README.md")

Once a RDD is created, we can do operations on the RDD.
For example, count the number of lines of RDD

In [None]:
data.count()

In [None]:
#See what’s in the RDD
data.take(3)

In [None]:
data.collect()

The first command shows the first three lines (each line is preceded by the letter u)of RDD while the second shows the entire file. We should be cautious with collect() function when data size is large as it requires a large amount of memory allocated  for the driver node to collect entire data

In [None]:
#Check the data type
type(data)

Next we’ll do a simple transformation: filter all the lines with “Spark” in them and count such lines.

In [None]:
linesWithSpark = data.filter(lambda line: "Spark" in line)

In [None]:
linesWithSpark.count()

The “filter” function finds lines with “Spark” in them and saves that to the variable “linesWithSpark”.
We can then use the function “count” to display the number of lines. Here the function *filter* is a tranformation and *count* is a action operator.

Next, we’ll combine those two commands into one

In [None]:
data.filter(lambda line: "Spark" in line).count()

In our final interactive example, we’ll show how to count the number of times a word appears in the file

In [None]:
from operator import add
wordCounts = data.flatMap(lambda line: line.split()).map(lambda word: (word, 1)).reduceByKey(add)

In [None]:
wordCounts.collect()

The result will be a screen of data that shows you each word in the file and how many times it appeared


## Spark DataFrames


For these examples, we just need to import two pyspark.sql libraries:


In [None]:
from pyspark.sql.types import *  # Necessary for creating schemas
from pyspark.sql.functions import * # Importing PySpark functions

### Creating DataFrames

Making a Simple DataFrame from a Tuple List.

In [None]:
# Make a tuple list
a_list = [('a', 1), ('b', 2), ('c', 3)]

In [None]:
# Create a Spark DataFrame, without supplying a schema value
df_from_list_no_schema = \
sqlContext.createDataFrame(a_list)

In [None]:
# Print the DF object
print (df_from_list_no_schema)


In [None]:
# Print a collected list of Row objects
print (df_from_list_no_schema.collect())

In [None]:
# Show the DataFrame
df_from_list_no_schema.show()

Making a Simple DataFrame from a Tuple List and a Schema

In [None]:
# Create a Spark DataFrame, this time with schema
nice_df = sqlContext.createDataFrame(a_list, ['letters', 'numbers']) # this simple schema contains just column names

In [None]:
# Show the DataFrame
nice_df.show()

In [None]:
# Show the DataFrame's schema
nice_df.printSchema()

### Simple Inspection Functions

We now have a nice_df, here are some nice functions for inspecting the DataFrame.

In [None]:
# `columns`: return all column names as a list
nice_df.columns

In [None]:
# `dtypes`: get the datatypes for all columns
nice_df.dtypes

In [None]:
# `printSchema()`: prints the schema of the supplied DF
nice_df.printSchema()

In [None]:
# `first()` returns the first row as a Row while
# `head()` and `take()` return `n` number of Row objects
print (nice_df.first()) # can't supply a value; never a list
print (nice_df.head(2)) # can optionally supply a value (default: 1);
                      # with n > 1, a list
print (nice_df.take(2)) # expects a value; always a list

In [None]:
# `count()`: returns a count of all rows in DF
nice_df.count()

In [None]:
# `describe()`: print out stats for numerical columns
nice_df.describe().show() # can optionally supply a list of column names

Relatively Simple DataFrame Manipulation Functions
Let's use these functions:

unionAll(): combine two DataFrames together
orderBy(): perform sorting of DataFrame columns
select(): select which DataFrame columns to retain
drop(): select a single DataFrame column to remove
filter(): retain DataFrame rows that match a condition

In [None]:
# Take the DataFrame and add it to itself
(nice_df
 .unionAll(nice_df)
 .show())

# Add it to itself twice
(nice_df
 .unionAll(nice_df)
 .unionAll(nice_df)
 .show())

# Coercion will occur if schemas don't align
(nice_df
 .select(['numbers', 'letters'])
 .unionAll(nice_df)
 .show())

(nice_df
 .select(['numbers', 'letters'])
 .unionAll(nice_df)
 .printSchema())

In [None]:

# Sorting the DataFrame by the `numbers` column
(nice_df
 .unionAll(nice_df)
 .unionAll(nice_df)
 .orderBy('numbers')
 .show())

# Sort the same column in reverse order
(nice_df
 .unionAll(nice_df)
 .unionAll(nice_df)
 .orderBy('numbers',
          ascending = False)
 .show())

In [None]:
# `select()` and `drop()` both take a list of column names
# and these functions do exactly what you might expect

# Select only the first column of the DF
(nice_df
 .select('letters')
 .show())

# Re-order columns in the DF using `select()`
(nice_df
 .select(['numbers', 'letters'])
 .show())

# Drop the second column of the DF
(nice_df
 .drop('letters')
 .show())

In [None]:
# The `filter()` function performs filtering of DF rows

# Here is some numeric filtering with comparison operators
# (>, <, >=, <=, ==, != all work)

# Filter rows where values in `numbers` is > 1
(nice_df
 .filter(nice_df.numbers > 1)
 .show())

# Perform two filter operations
(nice_df
 .filter(nice_df.numbers > 1)
 .filter(nice_df.numbers < 3)
 .show())

# Not just numbers! Use the `filter()` + `isin()`
# combo to filter on string columns with a set of values
(nice_df
 .filter(nice_df.letters
         .isin(['a', 'b']))
 .show())


## Structured Data

For this example, we will load data from a csv file, create a dataframe and then perform operations on it. You can read more about the data here, http://kdd.ics.uci.edu/databases/kddcup99/kddcup99

In [None]:
data=spark.read.csv("/users/PZS0645/support/workshop/Bigdata/data.csv", header='TRUE')

In [None]:
data.show()

In [None]:
#Check the schema of the data
data.printSchema()

In [None]:
#Check the schema of the data
data.take(3)

Check out first 5 records as table

In [None]:
#Check the schema of the data
data.count()

In [None]:
# select two feilds and only show those.
data.select("dst_bytes","flag").show(5)

In [None]:
# select two feilds and only show those.
data.filter(data.flag!="SF").show(5)

In [None]:
#groub data by a column and operations on grouped data
data.select("protocal_type", "duration", "dst_bytes").groupBy("protocal_type").count().show()

In [None]:
#filter and groupBy
data.select("protocal_type", "duration", "dst_bytes").filter(data.duration>1000).filter(data.dst_bytes==0).groupBy("protocal_type").count().show()

### SparkSQL querries

Inorder to run SparkSQL querries, we have to register the dataframe as table.

In [None]:
data.registerTempTable("interactions")

Now we can querry on the table called *interactions* based on conditions. For example, select tcp network interactions with more than 1 second duration and no transfer from destination

In [None]:
tcp = sqlContext.sql(" SELECT duration, dst_bytes FROM interactions WHERE protocal_type ='tcp' AND duration>1000 AND dst_bytes=0")


In [None]:
tcp.show(5)

## Spark Mllib examples

### Logistic Regression

In this tutorial we will use Spark's machine learning library MLlib to build a Logistic Regression classifier for network attack detection. We will use the complete KDD Cup 1999 datasets (http://kdd.ics.uci.edu/databases/kddcup99/kddcup99) in order to test Spark capabilities with large datasets.



In [None]:
#import training data
import urllib.request
f = urllib.request.urlretrieve ("http://kdd.ics.uci.edu/databases/kddcup99/kddcup.data_10_percent.gz", "kddcup.data.gz")

In [None]:
#create a RDD for training data
data_file = "./kddcup.data.gz"
raw_data = sc.textFile(data_file)

print ("Train data size is {}".format(raw_data.count()))

In [None]:
raw_data.take(4)

In [None]:
#Import test data
ft = urllib.request.urlretrieve("http://kdd.ics.uci.edu/databases/kddcup99/corrected.gz", "corrected.gz")


test_data_file = "./corrected.gz"
test_raw_data = sc.textFile(test_data_file)

print ("Test data size is {}".format(test_raw_data.count()))

In [None]:
#Preparing training data 
from pyspark.mllib.regression import LabeledPoint
from numpy import array

def parse_interaction(line):
    line_split = line.split(",")
    # leave_out = [1,2,3,41]
    clean_line_split = line_split[0:1]+line_split[4:41]
    attack = 1.0
    if line_split[41]=='normal.':
        attack = 0.0
    return LabeledPoint(attack, array([float(x) for x in clean_line_split]))

training_data = raw_data.map(parse_interaction)

In [None]:
training_data.take(3)

In [None]:
#preparing test data
test_data = test_raw_data.map(parse_interaction)

Logistic regression is widely used to predict a binary response. Spark implements two algorithms to solve logistic regression: mini-batch gradient descent and L-BFGS. L-BFGS is recommended over mini-batch gradient descent for faster convergence.



In [None]:
#Training a classifier
from pyspark.mllib.classification import LogisticRegressionWithLBFGS
from time import time

# Build the model
logit_model = LogisticRegressionWithLBFGS.train(training_data)

### Evaluating the model on new data
In order to measure the classification error on our test data, we use map on the test_data RDD and the model to predict each test point class.

In [None]:
labels_and_preds = test_data.map(lambda p: (p.label, logit_model.predict(p.features)))

In [None]:
labels_and_preds.take(5)

Classification results are returned in pars, with the actual test label and the predicted one. This is used to calculate the classification error by using filter and count as follows.

In [None]:

test_accuracy = labels_and_preds.filter(lambda vp:vp[0] == vp[1]).count() / float(test_data.count())


print ("Test accuracy is {}".format(test_accuracy))

### k-means clustering

k-means is one of the most commonly used clustering algorithms that clusters the data points into a predefined number of clusters. 

In [None]:
from pyspark.ml.clustering import KMeans

# Loads data.
dataset = spark.read.format("libsvm").load("/usr/local/spark/2.3.0/spark-2.3.0-bin-hadoop2.7/data/mllib/sample_kmeans_data.txt")

# Trains a k-means model.
kmeans = KMeans().setK(2).setSeed(1)
model = kmeans.fit(dataset)

# Evaluate clustering by computing Within Set Sum of Squared Errors.
wssse = model.computeCost(dataset)
print("Within Set Sum of Squared Errors = " + str(wssse))

# Shows the result.
centers = model.clusterCenters()
print("Cluster Centers: ")
for center in centers:
    print(center)

That's the end of our workshop! You can save your work by clicking on *File* and *save* and then use it later. Once your are done with the tutorial, you can click on *Logout* on the top right corner.



## References

http://spark.apache.org The main reference for PySpark is:

http://spark.apache.org/docs/latest/api/python/index.html These examples are available at:

https://github.com/rich-iannone/so-many-pyspark-examples Information on the Parquet file format can be found at its project page:

http://parquet.apache.org The GitHub project page for spark-csv package; contains usage documentation:

https://github.com/databricks/spark-csv

https://www.codementor.io/jadianes/spark-mllib-logistic-regression-du107neto



print python version

In [1]:
import sys
print(sys.version)

3.6.3 |Anaconda custom (64-bit)| (default, Oct 13 2017, 12:02:49) 
[GCC 7.2.0]


get spark node numbers

In [4]:
sc._jsc.sc().getExecutorMemoryStatus().size()

1

In [5]:
sc._jsc.sc().getExecutorMemoryStatus().keySet().size()

1