### USER-DEFINED FUNCTIONS(UDF)

In [0]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf
from pyspark.sql.types import *
from pyspark.sql.functions import *
#import pyspark.sql.functions as F

In [0]:
data = [("Alice", 10), ("Bob", 20), ("Charlie", 30)]

df = spark.createDataFrame(data, ["name", "age"])

display(df)

name,age
Alice,10
Bob,20
Charlie,30


In [0]:
try:
    add_five_udf = udf(lambda x: x + 5, IntegerType())
    df1 = df.withColumn("new_age", add_five_udf(df["age"]))
    display(df1)

except Exception as e:
    print(f"ERROR!:{e}")

name,age,new_age
Alice,10,15
Bob,20,25
Charlie,30,35


In [0]:
@udf(StringType())
def upper_case(name):
    return name.upper() if name else None


data = [(1, "Alice"), (2, "Bob"), (3, "Charlie")]
df = spark.createDataFrame(data, ["id", "name"])


df = df.withColumn("name_upper", upper_case(col("name")))
display(df)

id,name,name_upper
1,Alice,ALICE
2,Bob,BOB
3,Charlie,CHARLIE


In [0]:
schema = StructType([
    StructField("Name", StringType(), True),
    StructField("Height_cm", FloatType(), True),
    StructField("Weight_kg", FloatType(), True)
])

data = [
    ("Alice", 165.0, 55.0),
    ("Bob", 175.5, 75.0),
    ("Charlie", 180.2, 82.5),
    ("David", 160.0, 60.0)
]


df = spark.createDataFrame(data, schema=schema)

@udf(FloatType())
def calculate_bmi(weight, height):
    return weight / (height ** 2) if height else None

df2 = df.withColumn("BMI", calculate_bmi(col("Weight_kg"), col("Height_cm")))
display(df2)

Name,Height_cm,Weight_kg,BMI
Alice,165.0,55.0,0.002020202
Bob,175.5,75.0,0.002435045
Charlie,180.2,82.5,0.0025406473
David,160.0,60.0,0.00234375


In [0]:
schema = StructType([
    StructField("Name", StringType(), True),
    StructField("Height_cm", FloatType(), True),
    StructField("Weight_kg", FloatType(), True)
])


data = [
    ("Alice", 165.0, 55.0),
    ("Bob", 175.5, 75.0),
    ("Charlie", 180.2, 82.5),
    ("David", 160.0, 60.0)
]


df = spark.createDataFrame(data, schema=schema)


def calculate_bmi(weight, height_cm):
    height_m = height_cm / 100 
    return weight / (height_m ** 2) if height_m else None  


spark.udf.register("calculate_bmi", calculate_bmi, FloatType())


df2 = df.withColumn("BMI", expr("calculate_bmi(Weight_kg, Height_cm)"))
display(df2)


Name,Height_cm,Weight_kg,BMI
Alice,165.0,55.0,20.20202
Bob,175.5,75.0,24.350452
Charlie,180.2,82.5,25.406473
David,160.0,60.0,23.4375


### RESILIENT DISTRIBUTED DATASETS(RDD)

In [0]:
from pyspark.sql import Row

rdd = spark.sparkContext.parallelize([("Alice", 25), ("Bob", 30)])
df = rdd.map(lambda x: Row(name=x[0], age=x[1])).toDF()
df.show()


+-----+---+
| name|age|
+-----+---+
|Alice| 25|
|  Bob| 30|
+-----+---+



In [0]:
print(type(rdd))

<class 'pyspark.rdd.RDD'>


In [0]:
rdd = spark.sparkContext.parallelize([1, 2, 3, 4, 5])
sum_rdd = rdd.reduce(lambda x, y: x + y)
print(sum_rdd)  

15


>parallelize - divides data into small block assigned to multiple node to process data at the same time.

In [0]:
rdd = spark.sparkContext.parallelize([1, 2, 3, 4, 5])
print(rdd)

ParallelCollectionRDD[84] at readRDDFromInputStream at PythonRDD.scala:435


In [0]:
df11 = rdd.map(lambda x: (x,)).toDF(["Numbers"])
display(df11)

Numbers
1
2
3
4
5


>union

In [0]:
rdd1 = spark.sparkContext.parallelize([1, 2, 3])
rdd2 = spark.sparkContext.parallelize(['A','B','C'])

merged_rdd = rdd1.union(rdd2)
print(merged_rdd.collect()) 

[1, 2, 3, 'A', 'B', 'C']


>intersection

In [0]:
rdd1 = spark.sparkContext.parallelize([1, 2, 3])
rdd2 = spark.sparkContext.parallelize([4, 5, 6,2,3])

common_rdd = rdd1.intersection(rdd2)
print(common_rdd.collect())

[2, 3]


>subtract

In [0]:
rdd1 = spark.sparkContext.parallelize([1, 2, 3, 4, 5])
rdd2 = spark.sparkContext.parallelize([2, 4])

diff_rdd = rdd1.subtract(rdd2)
print(diff_rdd.collect()) 


[1, 3, 5]


>zip

In [0]:
rdd1 = spark.sparkContext.parallelize(["Alice", "Bob", "Charlie"])
rdd2 = spark.sparkContext.parallelize([25, 30])

zipped_rdd = rdd1.zip(rdd2) 
display(zipped_rdd)

org.apache.spark.api.java.JavaPairRDD@7d6028b0

>Map()

In [0]:
#without_Map

rdd = spark.sparkContext.parallelize([1, 2, 3, 4, 5])
new_rdd = []
for num in rdd.collect():
    new_rdd.append(num * num)
print(new_rdd) 

[1, 4, 9, 16, 25]


In [0]:
#Same above example with map function
rdd = spark.sparkContext.parallelize([1, 2, 3, 4, 5])
squared_rdd = rdd.map(lambda x: x * x)
print(squared_rdd.collect()) 

[1, 4, 9, 16, 25]


>Map() vs FlatMap()

In [0]:
rdd = spark.sparkContext.parallelize(["Hello World", "PySpark is fun"])
flat_mapped_rdd = rdd.flatMap(lambda x: x.split(" "))

for i in flat_mapped_rdd.collect():
    print(i)


Hello
World
PySpark
is
fun


In [0]:
rdd = spark.sparkContext.parallelize(["Hello World", "PySpark is fun"])
mapped_rdd = rdd.map(lambda x: x.split(" "))

for i in mapped_rdd.collect():
    print(i)


['Hello', 'World']
['PySpark', 'is', 'fun']


>partitionBy


In [0]:
data=spark.read.csv('dbfs:/FileStore/Pokemon.csv',header=True,inferSchema=True)
data.write.mode("overwrite").partitionBy("Generation").parquet("partitioned_data")
print("Partitioned Data Successfully Saved!")

Partitioned Data Successfully Saved!


In [0]:
data1=spark.read.parquet('dbfs:/partitioned_data/Generation=1',header=True,inferrSchema=True)
display(data1)

#,Name,Type 1,Type 2,Total,HP,Attack,Defense,Sp. Atk,Sp. Def,Speed,Legendary
1,Bulbasaur,Grass,Poison,318,45,49,49,65,65,45,False
2,Ivysaur,Grass,Poison,405,60,62,63,80,80,60,False
3,Venusaur,Grass,Poison,525,80,82,83,100,100,80,False
3,VenusaurMega Venusaur,Grass,Poison,625,80,100,123,122,120,80,False
4,Charmander,Fire,,309,39,52,43,60,50,65,False
5,Charmeleon,Fire,,405,58,64,58,80,65,80,False
6,Charizard,Fire,Flying,534,78,84,78,109,85,100,False
6,CharizardMega Charizard X,Fire,Dragon,634,78,130,111,130,85,100,False
6,CharizardMega Charizard Y,Fire,Flying,634,78,104,78,159,115,100,False
7,Squirtle,Water,,314,44,48,65,50,64,43,False
