In [1]:
import findspark

In [2]:
findspark.init()

In [3]:
import pyspark

In [4]:
from pyspark.sql import SparkSession

In [5]:
spark = SparkSession.builder.getOrCreate()

In [6]:
df = spark.sql('''select 'spark' as hello ''')
df.show()

+-----+
|hello|
+-----+
|spark|
+-----+



In [7]:
data = [1, 2, 3, 4, 5]
distData = sc.parallelize(data)

In [8]:
print(distData)

ParallelCollectionRDD[3] at readRDDFromFile at PythonRDD.scala:262


In [9]:
words = sc.parallelize (
   ["scala", 
   "java", 
   "hadoop", 
   "spark", 
   "akka",
   "spark vs hadoop", 
   "pyspark",
   "pyspark and spark"]
)

In [10]:
print(words)

ParallelCollectionRDD[4] at readRDDFromFile at PythonRDD.scala:262


# Counting the number of elements in RDD

In [11]:
from pyspark import SparkContext
sc = SparkContext.getOrCreate()
words = sc.parallelize (
   ["scala", 
   "java", 
   "hadoop", 
   "spark", 
   "akka",
   "spark vs hadoop", 
   "pyspark",
   "pyspark and spark"]
)
counts = words.count()
print("Number of elements in RDD", counts)

Number of elements in RDD 8


In [12]:
coll = words.collect()
print("Elements in RDD",coll)

Elements in RDD ['scala', 'java', 'hadoop', 'spark', 'akka', 'spark vs hadoop', 'pyspark', 'pyspark and spark']


In [13]:
from pyspark import SparkContext
sc = SparkContext.getOrCreate()
words = sc.parallelize (
   ["scala", 
   "java", 
   "hadoop", 
   "spark", 
   "akka",
   "spark vs hadoop", 
   "pyspark",
   "pyspark and spark"]
)


In [14]:
def f(x): 
    return(x)
fore = words.foreach(f) 
print(fore)

None


In [15]:
from pyspark import SparkContext
sc = SparkContext.getOrCreate()
words = sc.parallelize (
   ["scala", 
   "java", 
   "hadoop", 
   "spark", 
   "akka",
   "spark vs hadoop", 
   "pyspark",
   "pyspark and spark"]
)
words_filter = words.filter(lambda x: 'spark' in x)
filtered = words_filter.collect()
print ("Fitered RDD ->",filtered)

Fitered RDD -> ['spark', 'spark vs hadoop', 'pyspark', 'pyspark and spark']


In [16]:
from pyspark import SparkContext
sc = SparkContext.getOrCreate()
words = sc.parallelize (
   ["scala", 
   "java", 
   "hadoop", 
   "spark", 
   "akka",
   "spark vs hadoop", 
   "pyspark",
   "pyspark and spark"]
)
words_map = words.map(lambda x: (x, 1))
mapping = words_map.collect()
print("Key value pair",mapping)

Key value pair [('scala', 1), ('java', 1), ('hadoop', 1), ('spark', 1), ('akka', 1), ('spark vs hadoop', 1), ('pyspark', 1), ('pyspark and spark', 1)]


In [17]:
from pyspark import SparkContext
from operator import add
sc = SparkContext.getOrCreate()
nums = sc.parallelize([1, 2, 3, 4, 5])
adding = nums.reduce(add)
print("Adding all the elements",adding)

Adding all the elements 15


In [18]:
from pyspark import SparkContext
sc = SparkContext.getOrCreate()
x = sc.parallelize([("spark", 1), ("hadoop", 4)])
y = sc.parallelize([("spark", 2), ("hadoop", 5)])
joined = x.join(y)
final = joined.collect()
print("Join RDD -> %s" % (final))

Join RDD -> [('hadoop', (4, 5)), ('spark', (1, 2))]


In [19]:
from pyspark import SparkContext 
sc = SparkContext.getOrCreate()
words = sc.parallelize (
   ["scala", 
   "java", 
   "hadoop", 
   "spark", 
   "akka",
   "spark vs hadoop", 
   "pyspark",
   "pyspark and spark"]
) 
words.cache() 
caching = words.persist().is_cached 
print("Words got chached > %s" % (caching))

Words got chached > True


There are two types of shared variables supported by apache spark. They are:
- Broadcast
- Accumulator
Apache Spark uses shared variables for parallel processing. 

# Broadcast


Broadcast variables are used to save the copy of data across all nodes. This variable is cached on all the machines and not sent on machines with tasks. The following code block has the details of a Broadcast class for PySpark.

A Broadcast variable has an attribute called value, which stores the data and is used to return a broadcasted value.

In [20]:
from pyspark import SparkContext 
sc = SparkContext.getOrCreate()
words_new = sc.broadcast(["scala", "java", "hadoop", "spark", "akka"]) 
data = words_new.value 
print("Stored data -> %s" % (data)) 
elem = words_new.value[2] 
print("Printing a particular element in RDD -> %s" % (elem))

Stored data -> ['scala', 'java', 'hadoop', 'spark', 'akka']
Printing a particular element in RDD -> hadoop


Command − The command for a broadcast variable is as follows −

$SPARK_HOME/bin/spark-submit broadcast.py

# Accumulator

Accumulator variables are used for aggregating the information through associative and commutative operations. For example, you can use an accumulator for a sum operation or counters (in MapReduce). The following code block has the details of an Accumulator class for PySpark.

class pyspark.Accumulator(aid, value, accum_param)

The following example shows how to use an Accumulator variable. An Accumulator variable has an attribute called value that is similar to what a broadcast variable has. It stores the data and is used to return the accumulator's value, but usable only in a driver program.

In this example, an accumulator variable is used by multiple workers and returns an accumulated value.

In [23]:
from pyspark import SparkContext 
sc = SparkContext.getOrCreate()
num = sc.accumulator(10) 
def f(x): 
   global num 
   num+=x 
rdd = sc.parallelize([20,30,40,50]) 
rdd.foreach(f) 
final = num.value 
print("Accumulated value is -> %i" % (final))

Accumulated value is -> 150


Command − The command for an accumulator variable is as follows −

$SPARK_HOME/bin/spark-submit accumulator.py