<a href="https://colab.research.google.com/github/TABEYWICKRAMA/BigData/blob/main/PySpark_HandsOnExperience_V4.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>


*   SparkContext is the main entry point to use PySpark features.
*   SparkContext represents the connection to a Spark Cluster.
*   Only one SparkContext must be active per JVM.
*   must stop() the active SparkContext before creating new one.

In [1]:
#install PySpark
!pip install pyspark

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Collecting pyspark
  Downloading pyspark-3.3.1.tar.gz (281.4 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m281.4/281.4 MB[0m [31m5.4 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Collecting py4j==0.10.9.5
  Downloading py4j-0.10.9.5-py2.py3-none-any.whl (199 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m199.7/199.7 KB[0m [31m21.3 MB/s[0m eta [36m0:00:00[0m
[?25hBuilding wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.3.1-py2.py3-none-any.whl size=281845512 sha256=9570823f46194d6198450eb30172a5381a35c0e8e413a6513344899c64471940
  Stored in directory: /root/.cache/pip/wheels/43/dc/11/ec201cd671da62fa9c5cc77078235e40722170ceba231d7598
Successfully built pyspark
Installing collected packages: py4j, pyspa

In [2]:
# Import SparkContext and SparkConf
from pyspark import SparkContext, SparkConf

## One way to create SparkContext



*   SparkContext - use to get PySpark features.
*   SparkConf    - use to define configurations. (ex: AppName) these configurations are useful when creating SparkContext



In [3]:
conf = SparkConf().setAppName("PysparkBasics").setMaster("local")     #define configurations
sc = SparkContext(conf=conf)        #create SparkContext using created configurations 

In [4]:
sc.getConf().getAll()   #show all the configurations that are relevant for this SparkContext

[('spark.master', 'local'),
 ('spark.driver.extraJavaOptions',
  '-XX:+IgnoreUnrecognizedVMOptions --add-opens=java.base/java.lang=ALL-UNNAMED --add-opens=java.base/java.lang.invoke=ALL-UNNAMED --add-opens=java.base/java.lang.reflect=ALL-UNNAMED --add-opens=java.base/java.io=ALL-UNNAMED --add-opens=java.base/java.net=ALL-UNNAMED --add-opens=java.base/java.nio=ALL-UNNAMED --add-opens=java.base/java.util=ALL-UNNAMED --add-opens=java.base/java.util.concurrent=ALL-UNNAMED --add-opens=java.base/java.util.concurrent.atomic=ALL-UNNAMED --add-opens=java.base/sun.nio.ch=ALL-UNNAMED --add-opens=java.base/sun.nio.cs=ALL-UNNAMED --add-opens=java.base/sun.security.action=ALL-UNNAMED --add-opens=java.base/sun.util.calendar=ALL-UNNAMED --add-opens=java.security.jgss/sun.security.krb5=ALL-UNNAMED'),
 ('spark.executor.id', 'driver'),
 ('spark.app.submitTime', '1673148787641'),
 ('spark.app.startTime', '1673148787862'),
 ('spark.app.id', 'local-1673148789173'),
 ('spark.rdd.compress', 'True'),
 ('spark.

In [5]:
# sc.stop()     #used to stop active SparkContext

## RDD - Reselient Distributed Dataset


*   RDD is much like a Dataset.
*   we can create RDD using HDFS Storage, Local Storage or .csv or anyway
*   RDD could be divided into multiple partitions. accessing these RDDs faster. 






Before apply ML algorithm into RDD we need to preprocess the data. this is called as "Operations" in PySpark. Operations on RDD,
1.   Transformations

        *   map()
        *   flatmap()
        *   filter()
        *   distinct()
        *   reduceByKey()
        *   groupByKey()
        *   mapValues()
        *   flatMapValues()
        *   sortByKey()

2.   Action

        *   Collect()
        *   Count()
        *   countByValue()
        *   Take()
        *   Top()
        *   Reduce()
        *   Fold()
        *   Foreach()
        *   savaAsTextFile()

---





*   Transformations : In PySpark Transformations are "Lazly evaluated". That's mean, when we perform Transformation it will create a new RDD. 
  *   when we apply any Transformation on RDD it will create **DAG** (Directed Acyclic Graph)



*   Action : when we apply a Action on RDD it will give us a perticular value(integer, string, etc). 
  *   when we apply an Action on RDD, it will execute all of the transformations apply on the RDD.




## Create RDD and their Basic Actions

***parallelize function - use to distribute data among the cluster***

In [6]:
names = sc.parallelize(['Adam','Cray','Shaun','Brain','Mark','Christ','Frans','Mark','Adam','Adam','Bill'])
# in here 'names' is the RDD

In [7]:
type(names)

pyspark.rdd.RDD

In [8]:
names.countByValue()

defaultdict(int,
            {'Adam': 3,
             'Cray': 1,
             'Shaun': 1,
             'Brain': 1,
             'Mark': 2,
             'Christ': 1,
             'Frans': 1,
             'Bill': 1})

In [9]:
names.collect()     #when we use 'collect' all the data will come to the RAM

['Adam',
 'Cray',
 'Shaun',
 'Brain',
 'Mark',
 'Christ',
 'Frans',
 'Mark',
 'Adam',
 'Adam',
 'Bill']

In [10]:
names.take(5)         #we use 'take' keyword, using this we can load only necessary data to RAM.
# this will return first 5 values

['Adam', 'Cray', 'Shaun', 'Brain', 'Mark']

## Foreach 
*   Foreach is an action
*   this takes each element and applies a function. 
*   but Foreach does not return any value. 
*   this is useful for perform some calculation on an RDD and log the result somewhere else. 





In [11]:
def f(x): print(x)
a=sc.parallelize([1,2,3,4,5]).foreach(lambda x : print(x))

In [12]:
type(a)

NoneType

In [13]:
employees = sc.textFile("/content/drive/MyDrive/PySpark/names.txt.txt")
# in here we use 'names.txt' file as our RDD

In [14]:
type(employees)

pyspark.rdd.RDD

In [15]:
employees.collect()

["'Adam'",
 "'Cray'",
 "'Shaun'",
 "'Mark'",
 "'Christ'",
 "'Frans'",
 "'Mark'",
 "'Adam'",
 "'Adam'",
 "'Bill'",
 "'Adam'",
 "'Cray'",
 "'Shaun'",
 "'Brain'",
 "'Mark'",
 "'Christ'",
 "'Frans'",
 "'Mark'",
 "'Adam'",
 "'Adam'",
 "'Bill'"]

In [16]:
employees.first()

"'Adam'"

In [17]:
employees.count()

21

In [18]:
employees.top(5)

["'Shaun'", "'Shaun'", "'Mark'", "'Mark'", "'Mark'"]

In [19]:
employees.distinct().count()

8

## Taking number example for better clarity

In [20]:
num = sc.parallelize([5,5,4,3,2,9,2],3)

In [21]:
num.countByValue()      #this prints how many times each value comes.

defaultdict(int, {5: 2, 4: 1, 3: 1, 2: 2, 9: 1})

In [22]:
type(num)

pyspark.rdd.RDD

## GLOM : RDD OF TUPLES

*   Spark does not allow the worker to  refer to the specific partitions in RDD. 
*   Because in Spark, information is stored in different different partitions in each RDD.
*   but GLOM transform each partition into tuple.
*   we use GLOM function to make each partitions into tuple.


In [23]:
num.glom().collect()     #num = sc.parallelize([5,5,4,3,2,9,2],3) ->   [[5, 5], [4, 3], [2, 9, 2]] 
                         # because we gave numSlices = 3. (no of partitions = no of lists = numSlices)

[[5, 5], [4, 3], [2, 9, 2]]

In [24]:
num.glom().collect()[1]

[4, 3]

In [25]:
type(num.glom())

pyspark.rdd.PipelinedRDD

In [26]:
num.max()

9

In [27]:
num.min()

2

In [28]:
num.mean()

4.285714285714286

## Reduce() Action

---




In [29]:
num.collect()

[5, 5, 4, 3, 2, 9, 2]

In [30]:
num.reduce(lambda a,b: a+b)

30

In [31]:
num.reduce(lambda a,b: a*b)

10800

In [32]:
num.reduce(lambda x,y : x if x > y else y)

9

In [33]:
num.reduce(lambda x,y : x if x < y else y)

2

In [34]:
def myfun(a,b):      #myfun is an user define function
  return a*2 + b*2

In [35]:
num.reduce(myfun)

232

In [36]:
num.takeOrdered(3)   #this will ordered first three elements

[2, 2, 3]

## Fold() Action

---



In [37]:
num = sc.parallelize([5,5,4,3,2,9,2],3)
num.collect()

[5, 5, 4, 3, 2, 9, 2]

In [38]:
num.glom().collect()

[[5, 5], [4, 3], [2, 9, 2]]

In [39]:
num.reduce(lambda a,b:a+b)    #this return sum of all the numbers

30

In [40]:
num.reduce(lambda a,b:a*b)    ##this return multiplication of all the numbers

10800

In [41]:
num.fold(0,lambda a,b:a+b)

30

In [42]:
num.fold(0,lambda a,b:a*b)

0

In [43]:
num.fold(1,lambda a,b:a+b)  #num.glom().collect() -> [[5, 5], [4, 3], [2, 9, 2]], no of partitions are 3 
                            #and all together 1 paritition so total partitions = 4. 
                            # 30 + 4 = 34
                            # this returns, sum of all the numbers(30) + no of partitions(4) 

34

In [44]:
from operator import add
b=sc.parallelize([5,5,4,3,2,9,2],3)
b.fold(0,add)

30

In [45]:
from operator import add
b=sc.parallelize([5,5,4,3,2,9,2],3)
b.fold(1,add)

34

In [53]:
from operator import add,mul
num3=sc.parallelize([5,5,4,3,2,9,2]).fold(1,mul)
num3

10800

## Create a RDD using range function

In [50]:
b = sc.parallelize(range(1,10))

In [51]:
b.collect()

[1, 2, 3, 4, 5, 6, 7, 8, 9]