# Playing with Spark Streaming

In [None]:
import findspark
findspark.init()

### Making Dstreams

In [32]:
import pyspark
import pyspark.sql as sql
ss = sql.SparkSession.builder.appName('streaming').getOrCreate()
sc = ss.sparkContext

In [33]:
from pyspark.streaming import StreamingContext
ssc = StreamingContext(sc,1)

In [34]:
import time

if __name__ == "__main__":
    rddQueue = []
    for i in range(3):
        rddQueue += [sc.parallelize([j for j in range(0,1001)])]
    
    inputStream = ssc.queueStream(rddQueue)
    mappedStream = inputStream.map(lambda x: (x*5))
    mappedStream.pprint()
    
    ssc.start()
    time.sleep(2)
    ssc.stop(stopSparkContext=True, stopGraceFully=True)

-------------------------------------------
Time: 2019-05-27 14:38:59
-------------------------------------------
0
5
10
15
20
25
30
35
40
45
...

-------------------------------------------
Time: 2019-05-27 14:39:00
-------------------------------------------
0
5
10
15
20
25
30
35
40
45
...

-------------------------------------------
Time: 2019-05-27 14:39:01
-------------------------------------------
0
5
10
15
20
25
30
35
40
45
...

-------------------------------------------
Time: 2019-05-27 14:39:02
-------------------------------------------



## Transformations on Dstreams

In [None]:
import findspark
findspark.init()

In [35]:
import pyspark
import pyspark.sql as sql
from pyspark.streaming import StreamingContext
ss = sql.SparkSession.builder.appName('streaming').getOrCreate()
sc = ss.sparkContext
ssc = StreamingContext(sc,1)

In [36]:
stream = ssc.queueStream([sc.parallelize([(1,"a"), (2,"b"),(1,"c"),(2,"d"),(1,"e"),(3,"f")])])

In [37]:
# stream.pprint()
largest = stream.reduce(max)
largest.pprint()

ssc.start()
ssc.stop(stopSparkContext=True, stopGraceFully=True)

-------------------------------------------
Time: 2019-05-27 14:39:18
-------------------------------------------
(3, 'f')

-------------------------------------------
Time: 2019-05-27 14:39:19
-------------------------------------------



### transform()

In [None]:
ON HOLD

### Window()

process the streaming data within a window and specific time

In [1]:
import findspark
findspark.init()

import pyspark
import pyspark.sql as sql
from pyspark.streaming import StreamingContext
ss = sql.SparkSession.builder.appName('streaming').getOrCreate()
sc = ss.sparkContext
ssc = StreamingContext(sc,1)

In [2]:
lines = ssc.socketTextStream(hostname='localhost',port=6666)

In [3]:
words = lines.map(lambda x:(x,1))
words.window(windowDuration=20, slideDuration=10).pprint()
ssc.start()

-------------------------------------------
Time: 2019-05-27 14:59:18
-------------------------------------------
('hello', 1)
('garvit', 1)

-------------------------------------------
Time: 2019-05-27 14:59:28
-------------------------------------------
('hello', 1)
('garvit', 1)
('how are you', 1)



In [4]:
ssc.stop()
sc.stop()

### CountByWindow()

count the size of window

similar to window(30,20).count()

### ReduceByKeyAndWindow()

same as of "ReduceByKey()" but applied on "window()"

In [18]:
import findspark
findspark.init()

import pyspark
import pyspark.sql as sql
from pyspark.streaming import StreamingContext
ss = sql.SparkSession.builder.appName('streaming').getOrCreate()
sc = ss.sparkContext
ssc = StreamingContext(sc,1)
ssc.checkpoint("checkpoint")

In [19]:
lines = ssc.socketTextStream('localhost',7777)

In [20]:
words = lines.map(lambda x:x.replace('?','').replace(',','')).flatMap(lambda x: x.split(" ")).map(lambda x:(x,1))
words.reduceByKeyAndWindow(lambda x,y:(x+y), 40,20).pprint()
# lines.reduceByKeyAndWindow(lambda x,y:(x+y),30,10).pprint()
ssc.start()

-------------------------------------------
Time: 2019-05-27 15:03:32
-------------------------------------------

-------------------------------------------
Time: 2019-05-27 15:03:33
-------------------------------------------

-------------------------------------------
Time: 2019-05-27 15:03:34
-------------------------------------------
('hello', 1)

-------------------------------------------
Time: 2019-05-27 15:03:35
-------------------------------------------
('hello', 1)

-------------------------------------------
Time: 2019-05-27 15:03:36
-------------------------------------------
('garvit', 1)
('hello', 1)

-------------------------------------------
Time: 2019-05-27 15:03:37
-------------------------------------------
('garvit', 2)
('hello', 1)

-------------------------------------------
Time: 2019-05-27 15:03:38
-------------------------------------------
('garvit', 2)
('hello', 1)



In [21]:
sc.stop()
ssc.stop()

### countByValueAndWindow()

same as "countByValue()" but applied on "window"

In [5]:
import findspark
findspark.init()

import pyspark
import pyspark.sql as sql
from pyspark.streaming import StreamingContext
ss = sql.SparkSession.builder.appName('streaming').getOrCreate()
sc = ss.sparkContext
ssc = StreamingContext(sc,1)
ssc.checkpoint("checkpoint")

In [6]:
lines = ssc.socketTextStream('localhost',5551)

In [7]:
count = lines.map(lambda x:(x,1))
count.countByValueAndWindow(30,20).pprint()
ssc.start()

-------------------------------------------
Time: 2019-05-27 22:07:32
-------------------------------------------
(('hello', 1), 1)
(('garvit', 1), 2)
(('aman', 1), 1)
(('mohan', 1), 1)



In [8]:
ssc.stop()

### foreachRDD()

In [47]:
ON HOLD

## Testing Area

In [10]:
import findspark
findspark.init()

import pyspark
import pyspark.sql as sql
ss = sql.SparkSession.builder.appName('RDD Practice').getOrCreate()
sc = ss.sparkContext

In [23]:
data = sc.textFile('a.txt')
data.collect()

['hello garvit', 'how are you?', 'I am fine', 'hello Ankit, How are you?']

In [24]:
RDD1 = data.map(lambda x:x.replace('?','').replace(',','')).flatMap(lambda x: x.split(" ")).map(lambda x:(x,1))
RDD1.collect()

[('hello', 1),
 ('garvit', 1),
 ('how', 1),
 ('are', 1),
 ('you', 1),
 ('I', 1),
 ('am', 1),
 ('fine', 1),
 ('hello', 1),
 ('Ankit', 1),
 ('How', 1),
 ('are', 1),
 ('you', 1)]

In [29]:
RDD2 = RDD1.countByValue()
RDD2

defaultdict(int,
            {('hello', 1): 2,
             ('garvit', 1): 1,
             ('how', 1): 1,
             ('are', 1): 2,
             ('you', 1): 2,
             ('I', 1): 1,
             ('am', 1): 1,
             ('fine', 1): 1,
             ('Ankit', 1): 1,
             ('How', 1): 1})

In [21]:
x = sc.parallelize([1,2,3,4,5,6,7,8,9,10],2)

In [22]:
x.collect()

[1, 2, 3, 4, 5, 6, 7, 8, 9, 10]

In [None]:
summ = x.reduce()