# reduce the dataset and count words

- After splitting the lines in the file into a long list of words using `flatMap()` transformation, in the next step, you'll remove stop words from your data. Stop words are common words that are often uninteresting. For example "I", "the", "a" etc., are stop words. You can remove many obvious stop words with a list of your own. 
- you'll next create a pair RDD where each element is a pair tuple (k, v) where k is the key and v is the value. In this example, pair RDD is composed of `(w, 1)` where `w` is for each word in the RDD and `1` is a number. Finally, you'll combine the values with the same key from the pair RDD using `reduceByKey()` operation

 - Remember you already have a `SparkContext` `sc` and `splitRDD` available in your workspace.


## Instructions
Please perform following activities

    1) Load the data from visits.txt on HDFS into a Rdd
    2) Count the number of elements in the above Rdd
    3) Verify this count against the number of lines in the file
    4) Select only those records from Rdd where the 19th field matches POTUS and then 
        a) Capture the 0th field  (This datapoint is lname in the data set)
        b) Capture the 1st field (This datapoint is fname in the data set)
        c) Capture the 6th field (This datapoint is time_of_arrival in the data set)
        d) Capture the 11th field (This datapoint is appt_scheduled_time in the data set)
        e) Capture the 21st field (This datapoint is location in the data set)
        f) Capture the 25th field (This datapoint is comment in the data set)
    5) Count the number of records in a final Rdd

    • Dataset
        ◦ dataset/visits.txt
    • TimeLine
        ◦ 45 Mins


In [1]:
# Intialization
import os
import sys

os.environ["SPARK_HOME"] = "/home/talentum/spark"
os.environ["PYLIB"] = os.environ["SPARK_HOME"] + "/python/lib"
# In below two lines, use /usr/bin/python2.7 if you want to use Python 2
os.environ["PYSPARK_PYTHON"] = "/usr/bin/python3.6" 
os.environ["PYSPARK_DRIVER_PYTHON"] = "/usr/bin/python3"
sys.path.insert(0, os.environ["PYLIB"] +"/py4j-0.10.7-src.zip")
sys.path.insert(0, os.environ["PYLIB"] +"/pyspark.zip")

# NOTE: Whichever package you want mention here.
# os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages com.databricks:spark-xml_2.11:0.6.0 pyspark-shell' 
# os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages org.apache.spark:spark-avro_2.11:2.4.0 pyspark-shell'
os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages com.databricks:spark-xml_2.11:0.6.0,org.apache.spark:spark-avro_2.11:2.4.3 pyspark-shell'
# os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages com.databricks:spark-xml_2.11:0.6.0,org.apache.spark:spark-avro_2.11:2.4.0 pyspark-shell'

In [2]:
#Entrypoint 2.x
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("Spark SQL basic example").enableHiveSupport().getOrCreate()

# On yarn:
# spark = SparkSession.builder.appName("Spark SQL basic example").enableHiveSupport().master("yarn").getOrCreate()
# specify .master("yarn")

sc = spark.sparkContext

In [3]:
sc \
.textFile("file:///home/talentum/test-jupyter/P2/M1/sm3/day3/p2/Dataset/constitution.txt") \
.flatMap(lambda line : line.split(" ")) \
.map(lambda w: (w, 1)) \
.reduceByKey(lambda x, y: x + y).take(10)

[('We', 2),
 ('the', 662),
 ('People', 2),
 ('of', 493),
 ('United', 85),
 ('States,', 55),
 ('in', 137),
 ('Order', 1),
 ('to', 183),
 ('form', 1)]

In [4]:
# File path for the "visit" dataset
file_path = "file:///home/talentum/test-jupyter/P2/M1/sm3/day3/Eval_Labs/dataset/visits.txt"

# Creating the base RDD
baseRDD = sc.textFile(file_path)

print(baseRDD.count())

splitRDD = baseRDD.flatMap(lambda line : line.split('\n'))

print(splitRDD.count())
#splitRDD.collect()

447598
447598


In [50]:
split_RDD = splitRDD.map(lambda x : x.split(","))

split_RDD_potus = split_RDD.filter(lambda x : x[19] == "POTUS")

split_RDD_potus_filter = split_RDD_potus.map(lambda x : (x[0],x[1], x[6], x[11], x[21], x[25]))

In [43]:
split_RDD_potus_filter.count()

21819

In [52]:
split_RDD_potus_filter.take(3)

[('BUCKLEY', 'SUMMER', '10/12/2010 14:48', '10/12/2010 14:45', 'WH', ''),
 ('CLOONEY', 'GEORGE', '10/12/2010 14:47', '10/12/2010 14:45', 'WH', ''),
 ('PRENDERGAST', 'JOHN', '10/12/2010 14:48', '10/12/2010 14:45', 'WH', '')]