# Initialization

In [1]:
# Making sure to link pyspark to the right Spark folder with findspark
import findspark
from pyspark import SparkContext, SparkConf, SQLContext
findspark.init('/opt/spark')

In [2]:
conf = SparkConf().setAppName("pysparkApp")
sc = SparkContext(conf=conf)

# Use Case I : Words Count
We will check what hadoop file system tracks

In [3]:
! hadoop fs -ls hdfs://node-master:9000/user/root

Found 1 items
drwxr-xr-x   - root supergroup          0 2019-12-08 22:42 hdfs://node-master:9000/user/root/.sparkStaging


We will add "Alice In Wonderland" to hadoop file system

In [3]:
! hadoop fs -put datasets/alice_in_wonderland.txt

We can now see it as part of the HDFS

In [4]:
! hadoop fs -ls hdfs://node-master:9000/user/root

Found 2 items
drwxr-xr-x   - root supergroup          0 2019-12-11 20:06 hdfs://node-master:9000/user/root/.sparkStaging
-rw-r--r--   2 root supergroup     148574 2019-12-11 20:10 hdfs://node-master:9000/user/root/alice_in_wonderland.txt


In [6]:
# tokenizing our text and deleting empty lines
words = sc.textFile("hdfs://node-master:9000/user/root/alice_in_wonderland.txt").flatMap(lambda line: line.split(" "))
words = words.filter(lambda x: len(x) > 0)

# counting words by applying a map and reduce operations
words_count = words.map(lambda word: (word, 1)).reduceByKey(lambda x, y : x + y)

In [8]:
# saving the words count into a file (still in hdfs)
words_count.saveAsTextFile("result.txt")

In [8]:
! hadoop fs -ls hdfs://node-master:9000/user/root

Found 3 items
drwxr-xr-x   - root supergroup          0 2019-12-08 22:42 hdfs://node-master:9000/user/root/.sparkStaging
-rw-r--r--   2 root supergroup     148574 2019-12-08 22:43 hdfs://node-master:9000/user/root/alice_in_wonderland.txt
drwxr-xr-x   - root supergroup          0 2019-12-08 22:44 hdfs://node-master:9000/user/root/result.txt


In [9]:
# retrieving the file from hdfs to "normal" file system

In [10]:
! hadoop fs -get hdfs://node-master:9000/user/root/result.txt result.txt

19/12/11 20:16:02 WARN hdfs.DFSClient: DFSInputStream has been closed already
19/12/11 20:16:02 WARN hdfs.DFSClient: DFSInputStream has been closed already
19/12/11 20:16:02 WARN hdfs.DFSClient: DFSInputStream has been closed already


In [11]:
# printing 5 most common words
most_common_words = words_count.map(lambda x:(x[1], x[0])).sortByKey(False)
most_common_words.take(5)

[(1507, 'the'), (714, 'and'), (703, 'to'), (606, 'a'), (490, 'of')]

In [12]:
# printing everything!
for word in words_count.collect():
     print(word)

("Alice's", 9)
('in', 345)
("ALICE'S", 3)
('ADVENTURES', 1)
('Lewis', 1)
('Carroll', 1)
('MILLENNIUM', 1)
('FULCRUM', 1)
('EDITION', 1)
('Down', 1)
('Rabbit-Hole', 1)
('was', 328)
('very', 126)
('of', 490)
('sitting', 10)
('her', 203)
('sister', 5)
('bank,', 2)
('do:', 1)
('once', 18)
('into', 67)
('book', 3)
('reading,', 1)
('but', 102)
('no', 64)
('pictures', 4)
('it,', 38)
('is', 63)
('use', 16)
('thought', 63)
('`without', 1)
('own', 9)
('mind', 4)
('(as', 2)
('as', 237)
('hot', 4)
('feel', 8)
('stupid),', 1)
('making', 8)
('would', 68)
('worth', 4)
('trouble', 4)
('getting', 21)
('daisies,', 1)
('when', 66)
('suddenly', 9)
('White', 22)
('eyes', 18)
('ran', 13)
('close', 12)
('VERY', 12)
('nor', 2)
('think', 37)
('out', 96)
('way', 37)
('hear', 14)
('say', 35)
('`Oh', 2)
('Oh', 5)
('afterwards,', 1)
('occurred', 2)
('ought', 13)
('have', 73)
('at', 197)
('this,', 17)
('seemed', 27)
('quite', 53)
('TOOK', 1)
('OUT', 1)
('OF', 3)
('ITS', 1)
('WAISTCOAT-', 1)
('POCKET,', 1)
('looked'

# Use Case II : Classification Iris Dataset

In [12]:
from pyspark.ml import Pipeline
from pyspark.ml.feature import StringIndexer, VectorAssembler
from pyspark.ml.classification import DecisionTreeClassifier
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.sql.types import FloatType

In [13]:
! hadoop fs -put datasets/iris.csv

In [14]:
sqlContext = SQLContext(sc)

In [15]:
df = sqlContext.read.csv("hdfs://node-master:9000/user/root/iris.csv", header=True)

In [16]:
df.show(5)

+------------+-----------+------------+-----------+-------+
|sepal_length|sepal_width|petal_length|petal_width|variety|
+------------+-----------+------------+-----------+-------+
|         5.1|        3.5|         1.4|         .2| Setosa|
|         4.9|          3|         1.4|         .2| Setosa|
|         4.7|        3.2|         1.3|         .2| Setosa|
|         4.6|        3.1|         1.5|         .2| Setosa|
|           5|        3.6|         1.4|         .2| Setosa|
+------------+-----------+------------+-----------+-------+
only showing top 5 rows



In [17]:
df.printSchema()

root
 |-- sepal_length: string (nullable = true)
 |-- sepal_width: string (nullable = true)
 |-- petal_length: string (nullable = true)
 |-- petal_width: string (nullable = true)
 |-- variety: string (nullable = true)



In [18]:
df = df.withColumn("slength", df.sepal_length.cast(FloatType())).drop("sepal_length").withColumnRenamed("slength", "sepal_length")
df = df.withColumn("plength", df.petal_length.cast(FloatType())).drop("petal_length").withColumnRenamed("plength", "petal_length")
df = df.withColumn("swidth", df.sepal_width.cast(FloatType())).drop("sepal_width").withColumnRenamed("swidth", "sepal_width")
df = df.withColumn("pwidth", df.petal_width.cast(FloatType())).drop("petal_width").withColumnRenamed("pwidth", "petal_width")

In [19]:
indexer = StringIndexer(inputCol="variety", outputCol="variety_label").fit(df)

In [20]:
assembler = VectorAssembler(inputCols=["sepal_length", "petal_length", "sepal_width", "petal_width"], 
                            outputCol="features")

In [21]:
dt = DecisionTreeClassifier(labelCol="variety_label", featuresCol="features")

In [22]:
(trainingData, testData) = df.randomSplit([0.7, 0.3])

In [23]:
pipeline = Pipeline(stages=[indexer, assembler, dt])

In [24]:
model = pipeline.fit(trainingData)

In [25]:
predictions = model.transform(testData)

In [26]:
predictions.select("prediction", "variety_label").show(5)

+----------+-------------+
|prediction|variety_label|
+----------+-------------+
|       2.0|          2.0|
|       2.0|          2.0|
|       2.0|          2.0|
|       2.0|          2.0|
|       2.0|          2.0|
+----------+-------------+
only showing top 5 rows



In [27]:
evaluator = MulticlassClassificationEvaluator(labelCol="variety_label",
                                              predictionCol="prediction",
                                              metricName="accuracy")

In [28]:
accuracy = evaluator.evaluate(predictions)
print("Test Accuracy = {:.2%} ".format(accuracy))
print("Test Error    = {:.2%} ".format(1.0 - accuracy))

Test Accuracy = 95.35% 
Test Error    = 4.65% 
