<a href="https://colab.research.google.com/github/darwinyusef/pyspark/blob/master/Transformations_and_Actions_in_spark.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
#Installing java
!apt-get install openjdk-8-jdk-headless -qq > /dev/null

In [None]:
#Downloading latest spark version
!wget -q http://www-eu.apache.org/dist/spark/spark-2.4.0/spark-2.4.0-bin-hadoop2.7.tgz

In [None]:
#Untar it
!tar xf spark-2.4.0-bin-hadoop2.7.tgz

In [None]:
#Install pysprak
!pip install -q findspark

In [None]:
#set the locations where Spark and Java are installed to let know Collab where to find it
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-2.4.0-bin-hadoop2.7"

In [None]:

#Create a spark session
import findspark
findspark.init()
from pyspark.sql import SparkSession

spark = SparkSession.builder.master("local[*]").getOrCreate()

In [None]:
#Creating spark context-Its like connecting to spark cluster
from pyspark import SparkConf
from pyspark.context import SparkContext

sc = SparkContext.getOrCreate(SparkConf().setMaster("local[*]"))

In [None]:
#map function
sc.parallelize([3,4,5]).map(lambda x: range(1,x)).collect()

[range(1, 3), range(1, 4), range(1, 5)]

In [None]:
#flatmap function. Takes input as key value pair and creates one flattended list of output results
sc.parallelize([3,4,5]).flatMap(lambda x: range(1,x)).collect()

[1, 2, 1, 2, 3, 1, 2, 3, 4]

In [None]:
#Another flatmap example.So it creates output like map function but it flattens the output in a list
sc.parallelize([3,4,5]).flatMap(lambda x: [x, x*x]).collect()

[3, 9, 4, 16, 5, 25]

**Actons:**

In [None]:
#reduce(func)	:Aggregate the elements of the dataset using a function func (which takes two arguments and returns one).
##The function should be commutative and associative so that it can be computed correctly in parallel.
numbers = [1, 4, 6, 2, 9, 10]
# Define a new function combine
# Convert x and y to strings and create a tuple from x,y
def combine(x,y):
  return "(" + str(x) + ", " + str(y) + ")"


In [None]:
# Use reduce to apply combine to numbers
numbers = sc.parallelize([1,7,8,9,5,77,48])
print(numbers)
sum = numbers.reduce(lambda a,b:a+b)
print(type(sum))



ParallelCollectionRDD[8] at parallelize at PythonRDD.scala:195
<class 'int'>


**collect()**	Return all the elements of the dataset as an array at the driver program. This is usually useful after a filter or other operation that returns a sufficiently small subset of the data.

In [None]:
#Collect Example:
sc.parallelize([3,4,5]).map(lambda x: range(1,x)).collect()

[range(1, 3), range(1, 4), range(1, 5)]

**count():**Return the number of elements in the dataset.


In [None]:
!ls -ltr

total 222804
drwxr-xr-x 13 1000 1000      4096 Oct 29 06:36 spark-2.4.0-bin-hadoop2.7
-rw-r--r--  1 root root 227893062 Oct 29 07:10 spark-2.4.0-bin-hadoop2.7.tgz
drwxr-xr-x  2 root root      4096 Dec  5 17:39 sample_data
-rw-r--r--  1 root root    241176 Dec 10 08:48 siddhartha.txt


In [None]:
from google.colab import files

uploaded = files.upload()

for fn in uploaded.keys():
  print('User uploaded file "{name}" with length {length} bytes'.format(
      name=fn, length=len(uploaded[fn])))

In [None]:
#count() example
#word count example
text_file = sc.textFile('/content/siddhartha.txt')
counts = text_file.flatMap(lambda line: line.split(" ")) \
             .map(lambda word: (word, 1)) \
             .reduceByKey(lambda a, b: a + b)
counts.saveAsTextFile('/content/count_siddhartha.txt')


In [None]:
#Totla number of lines in text_file
print(text_file.count())

4319


In [None]:
#total number or records in varriable counrs
print(counts.count())

6902


In [None]:
#Create a spark dataframe of count
df= spark.createDataFrame(counts)
df.columns

['_1', '_2']

In [None]:
df.take(10)

[Row(_1='The', _2=78),
 Row(_1='Project', _2=78),
 Row(_1='EBook', _2=2),
 Row(_1='of', _2=1084),
 Row(_1='', _2=2197),
 Row(_1='is', _2=325),
 Row(_1='use', _2=13),
 Row(_1='anyone', _2=4),
 Row(_1='anywhere', _2=2),
 Row(_1='at', _2=147)]

In [None]:
#Total number of rows in dataframe
print(df.count())

6902


In [None]:
df.schema

StructType(List(StructField(_1,StringType,true),StructField(_2,LongType,true)))

In [None]:
#Logical and physical plan
df.explain()

== Physical Plan ==
Scan ExistingRDD[_1#24,_2#25L]
