In [None]:
!pip install pyspark



In [None]:
from pyspark import SparkContext, SparkConf

In [None]:
conf = SparkConf().setAppName("Employee Join Example")
sc = SparkContext(conf=conf)

In [None]:
rdd1 = sc.textFile("emp1.txt")
rdd2 = sc.textFile("emp2.txt")

In [None]:
rdd1_kv = rdd1.map(lambda line: tuple(line.split(",", 1)))
rdd2_kv = rdd2.map(lambda line: tuple(line.split(",", 1)))

In [None]:
header_rdd1 = rdd1_kv.first()
header_rdd2 = rdd2_kv.first()
rdd1_kv = rdd1_kv.filter(lambda x: x != header_rdd1)
rdd2_kv = rdd2_kv.filter(lambda x: x != header_rdd2)

In [None]:
inner_join_rdd = rdd1_kv.join(rdd2_kv)

In [None]:
print("Inner Join Result:")
print(inner_join_rdd.collect())

Inner Join Result:
[('4', ('Charlie,Human Resources', '55000,1')), ('10', ('Emily,Software Development', '70000,5')), ('12', ('Benjamin,Digital Marketing', '45000,1')), ('16', ('Lucas,Finance', '80000,3')), ('20', ('Scarlett,Software Development', '70000,3')), ('24', ('Chloe,Data Science', '70000,2')), ('26', ('Samuel,Finance', '75000,4')), ('40', ('Sofia,Human Resources', '70000,2')), ('44', ('Luke,Software Development', '70000,4')), ('50', ('Emma,Software Development', '60000,2')), ('3', ('Bob,Data Science', '70000,3')), ('6', ('Michael,Software Development', '80000,6')), ('7', ('Olivia,Data Science', '90000,3')), ('15', ('Isabella,Human Resources', '70000,2')), ('18', ('Grace,Data Science', '65000,4')), ('23', ('David,Software Development', '65000,3')), ('25', ('Sebastian,Human Resources', '60000,3')), ('30', ('Harper,Software Development', '70000,4')), ('31', ('Luna,Data Science', '55000,1')), ('32', ('William,Human Resources', '60000,3')), ('36', ('Mia,Human Resources', '60000,2')

In [None]:
left_outer_join_rdd = rdd1_kv.leftOuterJoin(rdd2_kv).mapValues(lambda x: (x[0], x[1] if x[1] else "None"))

In [None]:
print("\nLeft Outer Join Result:")
print(left_outer_join_rdd.collect())


Left Outer Join Result:
[('4', ('Charlie,Human Resources', '55000,1')), ('10', ('Emily,Software Development', '70000,5')), ('12', ('Benjamin,Digital Marketing', '45000,1')), ('16', ('Lucas,Finance', '80000,3')), ('20', ('Scarlett,Software Development', '70000,3')), ('24', ('Chloe,Data Science', '70000,2')), ('26', ('Samuel,Finance', '75000,4')), ('40', ('Sofia,Human Resources', '70000,2')), ('44', ('Luke,Software Development', '70000,4')), ('50', ('Emma,Software Development', '60000,2')), ('3', ('Bob,Data Science', '70000,3')), ('6', ('Michael,Software Development', '80000,6')), ('7', ('Olivia,Data Science', '90000,3')), ('15', ('Isabella,Human Resources', '70000,2')), ('18', ('Grace,Data Science', '65000,4')), ('23', ('David,Software Development', '65000,3')), ('25', ('Sebastian,Human Resources', '60000,3')), ('30', ('Harper,Software Development', '70000,4')), ('31', ('Luna,Data Science', '55000,1')), ('32', ('William,Human Resources', '60000,3')), ('36', ('Mia,Human Resources', '600

In [None]:
rdd2_kv_int = rdd2_kv.mapValues(lambda x: tuple(map(int, x.split(","))))

In [None]:
total_salary_rdd = rdd1_kv.join(rdd2_kv_int).mapValues(lambda x: x[0] + str(x[1][0] * x[1][1]))

In [None]:
print("\nTotal Salary of Employees:")
print(total_salary_rdd.collect())


Total Salary of Employees:
[('4', 'Charlie,Human Resources55000'), ('10', 'Emily,Software Development350000'), ('12', 'Benjamin,Digital Marketing45000'), ('16', 'Lucas,Finance240000'), ('20', 'Scarlett,Software Development210000'), ('24', 'Chloe,Data Science140000'), ('26', 'Samuel,Finance300000'), ('40', 'Sofia,Human Resources140000'), ('44', 'Luke,Software Development280000'), ('50', 'Emma,Software Development120000'), ('3', 'Bob,Data Science210000'), ('6', 'Michael,Software Development480000'), ('7', 'Olivia,Data Science270000'), ('15', 'Isabella,Human Resources140000'), ('18', 'Grace,Data Science260000'), ('23', 'David,Software Development195000'), ('25', 'Sebastian,Human Resources180000'), ('30', 'Harper,Software Development280000'), ('31', 'Luna,Data Science55000'), ('32', 'William,Human Resources180000'), ('36', 'Mia,Human Resources120000'), ('42', 'Ella,Data Science240000'), ('43', 'Daniel,Finance130000'), ('47', 'Julia,Software Development320000'), ('49', 'John,Data Science21

In [None]:
rdd1_pair = sc.parallelize([(1, "John"), (2, "Alice"), (3, "Bob")])
rdd2_pair = sc.parallelize([(1, 30), (2, 35), (3, 40)])

In [None]:
broadcast_rdd2 = sc.broadcast(dict(rdd2_pair.collect()))

In [None]:
result = rdd1_pair.map(lambda x: (x[0], (x[1], broadcast_rdd2.value.get(x[0]))))

In [None]:
print("\nBroadcast Join Result:")
print(result.collect())


Broadcast Join Result:
[(1, ('John', 30)), (2, ('Alice', 35)), (3, ('Bob', 40))]


In [None]:
rdd = sc.parallelize(range(1, 11))

In [None]:
accumulator = sc.accumulator(0)
def update_acc(x):
    global accumulator
    accumulator += x
rdd.foreach(update_acc)
print("\nSum of Values using Accumulator:", accumulator.value)
sc.stop()



Sum of Values using Accumulator: 55
