In [1]:
# findspark
# finds spark installation in the system,  using SPARK_HOME
# automatically set path, environment needed for pyspark
# load spark libraries etc
# /opt/spark3.1.2.......
# good for single machine development, learning
import findspark
findspark.init()

In [2]:
# Create Spark Context - helps to create rdd, dag, job, task execute task etc, Spark context is SPARK CORE
# this code is called spark application, or spark driver
# every spark driver shall have ONLY ONE spark context
from pyspark import SparkContext
# local is execution mode, spark driver, 
# spark executor runs in same JVM in same machine / not distributed
# good for development, learning , not for production
# SparkBasic is application name of your choice
sc = SparkContext("local", "SparkBasic")

22/05/05 00:01:16 WARN Utils: Your hostname, ubuntu-virtual-machine resolves to a loopback address: 127.0.1.1; using 192.168.174.129 instead (on interface ens33)
22/05/05 00:01:16 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
22/05/05 00:01:19 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
22/05/05 00:01:29 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.


In [3]:
# create RDD, from hardcoded data
# data is hardcoded in Spark Driver, this notebook
# RDD shall be created in Executor process
# Lazy evaluation 
# intellisense - editor/notebook automatically brings up functions as you type
# sc.<TAB><TAB><TAB> - will bring up all functions
# creating RDD using parallelize method, by loading hardcoded data
# RDD shall have partition(s)
# the data hardcoded shall be loaded into partitions
# at this moment, no data shall be loaded, as this is lazy loading
# when we apply action only then data shall be loaded
# create and return RDD
rdd = sc.parallelize([0,1,2,3,4,5,6,7,8,9])

In [4]:
# apply filter operation, we call this as TRANSFORMATION
# TRANSFORMATION is code/task applied on partitioned data
# filter is higher order function, accept a function as input
# filter apply data (n) from partition to function supplied lambda n: n % 2 == 1
# lambda n: n % 2 == 1 returns either true or false,
# filter collect all the numbers where fitler return true 1, 3, 5, 7, 9
# LAZY Evaluation, no partition, no data , not code loaded into Exeuctor
# until we apply ACTION on RDD
# lambda n: n % 2 == 1 shall be executed by executor
# spark driver shall send lambda n: n % 2 == 1 code to executor called task
oddRdd = rdd.filter (lambda n: n % 2 == 1)


In [5]:
# collect is an ACTION method
# Every ACTION Methods create JOB
# JOB is split into STAGES
# Each STAGE shall have TASKs
# TASKs shall be running on Executor on PARTITION
# Finally, collect bring the output back to DRIVER from EXECUTORs
# Action is the one will create and distribute partitions, run tasks on executors
results = oddRdd.collect()
print(results)

[Stage 0:>                                                          (0 + 1) / 1]

[1, 3, 5, 7, 9]


                                                                                

In [6]:
# min is an ACTION,
# this create job, stages, DAG, tasks execute on cluster independently 
r = oddRdd.min ()
print(r) # print min of odd number

1


In [7]:
r2 = oddRdd.max()
print(r2)

9


In [8]:
r3 = oddRdd.mean()
print(r3)

5.0


In [9]:
r4 = oddRdd.sum()
print(r4)

25
