## Question

You are receiving a stream of 2D points and you need to keep a count of how many points fall in each quadrant.

- 2D Points : (x,y)
- 4 quadrants

In [1]:
# Importing the required modules
from pyspark import SparkContext
from pyspark.streaming import StreamingContext

In [2]:
# Initializing the spark context
sc = SparkContext(appName="Lab assignment")

In [3]:
# Creating streaming context and setting the batchDuration to 5 seconds
ssc = StreamingContext(sc, 5)

In [4]:
# Since I'm using stateful transformations, I need to configure checkpoint directory
ssc.checkpoint("checkpoint")

In [5]:
# Connecting to the netcat server
sreaming = ssc.socketTextStream("172.31.60.179", 9999)

In [6]:
# Function to map the points to the quadrant

def whichQuadrant(line):
    try:
        (x, y) = [float(x) for x in line.split()]
    except:
        print("Invalid input, please pass the input in 'x y' format")
        return ("Invalid input", 1)
    # Check which quadrant the values belong
    if x > 0 and y > 0:
        quadrant = "First quadrant"
    elif x < 0 and y > 0:
        quadrant = "Second quadrant"
    elif x < 0 and y < 0:
        quadrant = "Third quadrant"
    elif x > 0 and y < 0:
        quadrant = "Fourth quadrant"
    elif x == 0 and y != 0:
        quadrant = "Lies on Y axis"
    elif x != 0 and y == 0:
        quadrant = "Lies on X axis"
    else:
        quadrant = "Origin"
    
    # Return value is tuple of quadrant and counter
    return (quadrant, 1)

In [7]:
# This updates the running count
def updateFunc(newValues, prevSum):
    return sum(newValues) + (prevSum or 0)

In [8]:
# mapping the lines to quadrant
count = sreaming.map(whichQuadrant).updateStateByKey(updateFunc)

count.pprint() # To print the output

In [9]:
# Starts the spark streaming context
ssc.start()

In [10]:
# Await for a termination command
ssc.awaitTermination()

-------------------------------------------
Time: 2017-09-22 06:39:15
-------------------------------------------

-------------------------------------------
Time: 2017-09-22 06:39:20
-------------------------------------------
('First quadrant', 1)

-------------------------------------------
Time: 2017-09-22 06:39:25
-------------------------------------------
('First quadrant', 1)
('Lies on Y axis', 1)

-------------------------------------------
Time: 2017-09-22 06:39:30
-------------------------------------------
('First quadrant', 1)
('Origin', 1)
('Invalid input', 1)
('Lies on X axis', 1)
('Lies on Y axis', 1)

-------------------------------------------
Time: 2017-09-22 06:39:35
-------------------------------------------
('First quadrant', 1)
('Origin', 1)
('Invalid input', 1)
('Lies on X axis', 1)
('Lies on Y axis', 1)
('Third quadrant', 1)

-------------------------------------------
Time: 2017-09-22 06:39:40
-------------------------------------------
('First quadrant', 1)

KeyboardInterrupt: 