# Basics of Transformations Demo

In Spark Streaming, DStreams are treated very similarly to the RDDs that make them up. Like RDDs, there are a wide variety of data transformation options. 

Here are some examples of the transformations from the Spark documentation that might be useful for your purposes

| Transformation        | Meaning         |
| ------------------------------ |:-------------|
| **map**(func)      | Return a new DStream by passing each element of the source DStream through a function func.    |
| **flatMap**(func)	| Similar to map, but each input item can be mapped to 0 or more output items.    |
| **filter**(func)	| Return a new DStream by selecting only the records of the source DStream on which func returns true.    |
| **repartition**(numPartitions)	| Changes the level of parallelism in this DStream by creating more or fewer partitions.    |
| **union**(otherStream)	| Return a new DStream that contains the union of the elements in the source DStream and otherDStream. |
| **count**()	| Return a new DStream of single-element RDDs by counting the number of elements in each RDD of the source DStream.  |
| **reduce**(func)	| Return a new DStream of single-element RDDs by aggregating the elements in each RDD of the source DStream using  a function func (which takes two arguments and returns one). The function should be associative and commutative so that it can be computed in parallel.
| **countByValue**()	| When called on a DStream of elements of type K, return a new DStream of (K, Long) pairs where the value of each key is its frequency in each RDD of the source DStream.
| **reduceByKey**(func, [numTasks])	| When called on a DStream of (K, V) pairs, return a new DStream of (K, V) pairs where the values for each key are aggregated using the given reduce function. Note: By default, this uses Spark's default number of parallel tasks (2 for local mode, and in cluster mode the number is determined by the config property spark.default.parallelism) to do the grouping. You can pass an optional numTasks argument to set a different number of tasks.
| **join**(otherStream, [numTasks])	| When called on two DStreams of (K, V) and (K, W) pairs, return a new DStream of (K, (V, W)) pairs with all pairs of elements for each key.
| **cogroup**(otherStream, [numTasks])	| When called on a DStream of (K, V) and (K, W) pairs, return a new DStream of (K, Seq[V], Seq[W]) tuples.


If you look at the spark streaming documentation, you will also find the `transform(func)` and `updateStateByKey(func)`. We will discuss these later in the course.


### Demo (Part 1)

We're going to be demoing the map and flatmap functions with respect to DStreams. One important question is "What is the difference between the two?"

`map`: It returns a new RDD by applying a function to each element of the RDD. Function in map can return only one item. Works with DStreams as well as RDDs

`flatMap`: Similar to map, it returns a new RDD by applying  a function to each element of the RDD, but output is flattened.
Also, function in flatMap can return a list of elements (0 or more). Works with DStreams as well as RDDs.

Here's an example:

In [1]:
!pip install pyspark 

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Collecting pyspark
  Downloading pyspark-3.3.2.tar.gz (281.4 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m281.4/281.4 MB[0m [31m4.3 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Collecting py4j==0.10.9.5
  Downloading py4j-0.10.9.5-py2.py3-none-any.whl (199 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m199.7/199.7 KB[0m [31m12.1 MB/s[0m eta [36m0:00:00[0m
[?25hBuilding wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.3.2-py2.py3-none-any.whl size=281824025 sha256=56934471479c981193981acc525219a59b705621f4841e4079e1e63ee2406abf
  Stored in directory: /root/.cache/pip/wheels/6c/e3/9b/0525ce8a69478916513509d43693511463c6468db0de237c86
Successfully built pyspark
Installing collected packages: py4j, pyspa

In [None]:
'''
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q https://dlcdn.apache.org/spark/spark-3.2.0/spark-3.2.0-bin-hadoop3.2.tgz
!tar xf spark-3.2.0-bin-hadoop3.2.tgz
!pip install -q findspark
'''

In [None]:
'''
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.2.0-bin-hadoop3.2"
'''

In [None]:
'''
import findspark
findspark.init()
'''

In [2]:
# The first step is to import the required libraries 

from pyspark import SparkContext
from pyspark.streaming import StreamingContext

In [3]:
# Create a SparkContext 

sc = SparkContext(appName="PythonStreamingTransformationDemo")

# Usage of map() function. Parallelize and collect sample data
sc.parallelize([3,4,5]).map(lambda x: range(1,x)).collect()

[range(1, 3), range(1, 4), range(1, 5)]

In [4]:
# Usage of flatMap() function. Parallelize and collect sample data
sc.parallelize([3,4,5]).flatMap(lambda x: range(1,x)).collect()

[1, 2, 1, 2, 3, 1, 2, 3, 4]

notice o/p is flattened out in a single list

Here's Another Example:

In [5]:
sc.parallelize([3,4,5]).map(lambda x: [x,  x*x]).collect() 

[[3, 9], [4, 16], [5, 25]]

In [6]:
sc.parallelize([3,4,5]).flatMap(lambda x: [x, x*x]).collect() 

[3, 9, 4, 16, 5, 25]

notice that the list is flattened in the latter version

Here's another example, this time interacting with a file, which can often be useful for debugging code that interacts with full DStreams

There is a text file `greetings.txt` with following lines:
```
Good Morning
Good Evening
Good Day
Happy Birthday
Happy New Year
```

In [7]:
lines=sc.textFile("Greetings.txt")

In [8]:
lines.map(lambda line:line.split(" ")).collect()

[['Good', 'Morning'],
 ['Good', 'Evening'],
 ['Good', 'Day'],
 ['Happy', 'Birthday'],
 ['Happy', 'New', 'Year']]

In [9]:
lines.flatMap(lambda line:line.split(" ")).collect()

['Good',
 'Morning',
 'Good',
 'Evening',
 'Good',
 'Day',
 'Happy',
 'Birthday',
 'Happy',
 'New',
 'Year']

# Demo (Part 2)

Last time we went over the `map` and `flapmap` functions. We'll explore a few other options.

Suppose we have a this example text from Dr Suess's _The Cat in the Hat_.

In [1]:
!pip install pyspark

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Collecting pyspark
  Downloading pyspark-3.3.2.tar.gz (281.4 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m281.4/281.4 MB[0m [31m2.3 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Collecting py4j==0.10.9.5
  Downloading py4j-0.10.9.5-py2.py3-none-any.whl (199 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m199.7/199.7 KB[0m [31m9.7 MB/s[0m eta [36m0:00:00[0m
[?25hBuilding wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.3.2-py2.py3-none-any.whl size=281824025 sha256=5a0f3e1b3fc85aeb847a2d6027ef087a96628ff58657d55de7ffb8015f2ba7b0
  Stored in directory: /root/.cache/pip/wheels/6c/e3/9b/0525ce8a69478916513509d43693511463c6468db0de237c86
Successfully built pyspark
Installing collected packages: py4j, pyspa

In [2]:
from pyspark import SparkContext
from pyspark.streaming import StreamingContext
from pprint import pprint

In [8]:
#sc = SparkContext(appName="DrSeussExample")
scc = StreamingContext(sc, 10)

In [19]:
myFile = scc.sparkContext.textFile("DrSeuss.text")

In [20]:
wordspair=myFile.flatMap(lambda row:row.split(' ')).map(lambda x:(x,1))

In [21]:
wordspair.take(5)

[('The', 1), ('Cat', 1), ('in', 1), ('the', 1), ('Hat', 1)]

In [22]:
oldwordcount=wordspair.reduceByKey(lambda x,y:x+y)

In [23]:
oldwordcount.take(5)

[('The', 23), ('Cat', 11), ('in', 117), ('', 196), ('sun', 2)]

In [24]:
oldwordcountfilter=oldwordcount.filter(lambda x:x[1]>100)

In [25]:
oldwordcount.count()

1717

In [26]:
oldwordcountfilter.collect()

[('in', 117), ('', 196), ('the', 220), ('to', 102), ('I', 184), ('a', 198)]

In [29]:
oldwordcountfilterlimit=oldwordcountfilter.collect()

In [30]:
oldwordcountfilterlimit

[('in', 117), ('', 196), ('the', 220), ('to', 102), ('I', 184), ('a', 198)]

In [31]:
lines=sc.parallelize(["cat","rat","mat"])

In [32]:
unionRDD=oldwordcountfilter.union(lines)

In [33]:
unionRDD.collect()

[('in', 117),
 ('', 196),
 ('the', 220),
 ('to', 102),
 ('I', 184),
 ('a', 198),
 'cat',
 'rat',
 'mat']

In [None]:
#---------------------------

In [10]:

wordspair = myFile.flatMap(lambda row: row.split(" ")).map(lambda x: (x, 1)).reduceByKey(lambda x,y : x + y)
oldwordcount = wordspair.reduceByKey(lambda x,y : x + y)
lines = scc.socketTextStream("localhost", 9999)

In [11]:
print(lines)

<pyspark.streaming.dstream.DStream object at 0x7f872b4c9220>


Suppose then that we want to get wordcounts for this. We can use the map function from before here. `map` returns a new RDD containing values created by applying the supplied function to each value in the original RDD Here we use a lambda function which replaces some common punctuation characters with spaces and convert to lower  case, producing a new RDD:

In [12]:
wordcounts1 = lines.map(lambda x: x.replace(',',' ').replace('.',' ').replace('-',' ').lower())
wordcounts1top = wordcounts1.transform(lambda rdd: rdd.take(10))
wordcounts1top.pprint()

The flatMap function takes these input values and returns a new, flattened list. In this case, the lines are split into words and then each word becomes a separate value in the output RDD:

In [13]:
wordcounts2 = wordcounts1.flatMap(lambda x: x.split())
wordcounts2top = wordcounts2.transform(lambda rdd: rdd.take(10))
wordcounts2.pprint()

Expect that the input RDD contains tuples of the form (key,value). Create a new RDD containing a tuple for each unique value of key in the input, where the value in the second position of the tuple is created by  applying the supplied lambda function to the values with the matching key in the input RDD Here the key will be the word and lambda function will sum up the word counts for each word. The output RDD  will consist of a single tuple for each unique word in the data, where the word is stored at the first position  in the tuple and the word count is stored at the second position

In [15]:
wordcounts3 = wordcounts2.map(lambda x: (x, 1))
wordcounts3top = wordcounts3.transform(lambda rdd: rdd.take(20))
wordcounts3.pprint()

In [16]:
wordcounts4 = wordcounts3.reduceByKey(lambda x,y:x+y)
wordcounts4top = wordcounts4.transform(lambda rdd: rdd.take(20))
wordcounts4.pprint()

map a lambda function to the data which will swap over the first and second values in each tuple, now the word count appears in the first position and the word in the second position

In [17]:
wordcounts5 = wordcounts4.map(lambda x:(x[1],x[0]))
wordcounts5top = wordcounts5.transform(lambda rdd: rdd.take(20))
wordcounts5.pprint()

we sort the input RDD by the key value (i.e., the value at the first position in each tuple). In this example the first position stores the word count so this will sort the words so that the most frequently occurring words occur first in the RDD. The ascending=False parameter results in a descending sort order

In [37]:
#wordcounts6 = wordcounts5.sortByKey(ascending=False)
#wordcounts6top = wordcounts6.transform(lambda rdd: rdd.take(20))
#wordcounts6.pprint()

# References
1. https://spark.apache.org/docs/latest/streaming-programming-guide.html#transformations-on-dstreams
