In [6]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('demo spark')\
        .master("local[*]")\
        .getOrCreate()

In [5]:
spark.stop()

In [6]:
from pyspark import SparkConf, SparkContext
conf = SparkConf().setAppName("MyApp").setMaster("local[*]")
sc = SparkContext(conf=conf)

In [18]:
rdd1 = spark.sparkContext.parallelize([1, 2, 3, 4, 5])
rdd2 = spark.sparkContext.parallelize([4, 5, 6, 7])


In [13]:
# union(other)
# A union will get all the elements from both the data sets.

rdd_union =rdd1.union(rdd2)
rdd_union.collect()

[1, 2, 3, 4, 5, 4, 5, 6, 7]

In [14]:
rdd1.union(rdd2).distinct().collect()

[1, 2, 3, 4, 5, 6, 7]

In [17]:
rdd1.take(5)

[1, 2, 3, 4, 5]

In [19]:
rdd1.intersection(rdd2).take(4)

[4, 5]

In [21]:
rdd1.subtract(rdd2).collect()

[1, 2, 3]

In [None]:
# Example pattern to remember
rdd1 = spark.sparkContext.parallelize(["a", "b", "c", "d"])
rdd2 = spark.sparkContext.parallelize(["c", "d", "e", "f"])

rdd_intersect = rdd1.intersection(rdd2)
print(rdd_intersect.collect())  


In [None]:
rdd1.subtract(rdd2).collect()

In [12]:
# Number of customers placed order in July or Aug Month

In [22]:
ord = spark.sparkContext.textFile('c:/data/Orders')


In [19]:
julyOrd = ord.filter(lambda x : str(x.split(',')[1].split('-')[1]) == '07').map(lambda x : x.split(',')[2])
augOrd = ord.filter(lambda x : str(x.split(',')[1].split('-')[1]) == '08').map(lambda x : x.split(',')[2])
julyAugOrders = julyOrd.union(augOrd).distinct().count()


In [20]:
print(julyAugOrders)

7633


In [22]:
# Orders applied both in July and Aug datamonth.
julyAugCommonOrders=julyOrd.subtract(augOrd).count()
julyAugCommonOrders

3836

In [24]:
#sample
rdd = spark.sparkContext.parallelize(range(100), 4)
rdd.sample(fraction=0.1,withReplacement=False).collect()


[2, 7, 25, 32, 35, 37, 38, 41, 44, 45, 56, 97]

In [25]:
# take sample
rdd.takeSample(seed=10,num=5,withReplacement=True)


[13, 37, 17, 69, 86]

In [26]:
#repartition
ord = spark.sparkContext.textFile('c:/data/orders')
ord.glom().map(len).collect()



[34564, 34319]

In [27]:
ord1 =ord.repartition(5)
ord1.glom().map(len).collect()

[13783, 13770, 13770, 13770, 13790]

In [28]:
ord2 = ord1.coalesce(1)

In [29]:
ord2.getNumPartitions()

1

In [25]:
ord = spark.sparkContext.textFile('c:/data/orders',4)
ord.glom().map(len).collect()


[17414, 17150, 17165, 17154]

In [26]:
# coalesce(5)
ord = spark.sparkContext.textFile('c:/data/orders',4)
ord.getNumPartitions()

4

In [36]:
ord1.coalesce(1).getNumPartitions()

1

In [31]:
#saveAsTextFile
julyOrd = ord.filter(lambda x : str(x.split(',')[1].split('-')[1]) == '07')
augOrd = ord.filter(lambda x : str(x.split(',')[1].split('-')[1]) == '08')
julyAugOrders = julyOrd.union(augOrd).distinct()
julyAugOrders.coalesce(1).saveAsTextFile('c:/data/part3')

In [32]:
rdd = spark.sparkContext.parallelize([('a','apple'),('b','ball'),('c','cat')])


In [39]:
rdd.cache()

ParallelCollectionRDD[104] at readRDDFromFile at PythonRDD.scala:297

In [41]:
rdd.getStorageLevel()

StorageLevel(False, True, False, False, 1)

In [44]:
rdd.saveAsSequenceFile('c:/data/seq1/')

In [45]:
# rdd persistent
rdd = spark.sparkContext.parallelize(("b", "a", "c"))
print(rdd.persist().is_cached)
rdd.getStorageLevel()  #default memory_only


True


StorageLevel(False, True, False, False, 1)

In [46]:
from pyspark import StorageLevel
rdd.unpersist()

rdd.persist(StorageLevel.MEMORY_AND_DISK_2) #ore RDD partitions in memory; if memory is not enough, spill extra partitions to disk
                                            #_2	Replicate each partition 2 times across different nodes (for fault tolerance)

rdd.getStorageLevel()


StorageLevel(True, True, False, False, 2)

In [2]:
# A broadcast variable is a read-only distributed variable cached on each executor, 
# used to speed up operations by avoiding repeated data transfer.
# When to Use?
# Use broadcast when:
#     You have a small lookup table (e.g., 10 MB)
#     You join large RDD with small RDD
#     You use common config rules, mapping dictionaries, tax slabs, country-codes, etc.

In [44]:
# small lookup table
dept_lookup = {1: "HR", 2: "Finance", 3: "IT"}
broadcast_dept = spark.sparkContext.broadcast(dept_lookup)

emp_rdd = spark.sparkContext.parallelize([
    (101, "Ravi", 1),
    (102, "Kiran", 3),
    (103, "John", 2)
])

result = emp_rdd.map(
    lambda x: (x[0], x[1], broadcast_dept.value[x[2]])
)

result.collect()

[(101, 'Ravi', 'HR'), (102, 'Kiran', 'IT'), (103, 'John', 'Finance')]

In [49]:
dept_lookup.values()
dept_lookup.items()

dict_items([(1, 'HR'), (2, 'Finance'), (3, 'IT')])

In [48]:
tax_rules = {  
    "A": 10,   
    "B": 20,   
    "C": 30    
}

salary_rdd = spark.sparkContext.parallelize([
    ("Ravi", "A", 50000),
    ("Kiran", "B", 60000),
    ("John", "C", 90000)
])

broadcast_tax = spark.sparkContext.broadcast(tax_rules)
tax_rdd = salary_rdd.map(
    lambda x: (x[0],x[2], x[2] * broadcast_tax.value[x[1]] / 100)
)

tax_rdd.collect()

[('Ravi', 50000, 5000.0), ('Kiran', 60000, 12000.0), ('John', 90000, 27000.0)]

In [10]:
# An Accumulator is a write-only shared variable used for aggregating values 
        across tasks in a distributed cluster.
#     Think of it like a global counter or global sum, where:
#     Executors (workers) can only add values
#     Driver can read the final value

In [50]:
blank_lines = spark.sparkContext.accumulator(0)

def check_blank(line):
    global blank_lines
    if line.strip() == "":
        blank_lines += 1
    return line

rdd = spark.sparkContext.parallelize(["hello", "", "world", "  ", "spark"])

rdd.map(check_blank).collect()

print("Total blank lines:", blank_lines.value)


Total blank lines: 2


In [51]:
sum_acc = spark.sparkContext.accumulator(0)

rdd = spark.sparkContext.parallelize([10, 20, 30, 40])

def add_value(x, acc):
    acc.add(x)

rdd.foreach(lambda x: add_value(x, sum_acc))

print("Total Sum:", sum_acc.value)


Total Sum: 100
