In [None]:
!apt-get update # Update apt-get repository.
!apt-get install openjdk-8-jdk-headless -qq > /dev/null # Install Java.
!wget -q http://archive.apache.org/dist/spark/spark-3.1.1/spark-3.1.1-bin-hadoop3.2.tgz # Download Apache Sparks.
!tar xf spark-3.1.1-bin-hadoop3.2.tgz # Unzip the tgz file.
!pip install -q findspark # Install findspark. Adds PySpark to the System path during runtime.

0% [Working]            Get:1 http://security.ubuntu.com/ubuntu jammy-security InRelease [110 kB]
0% [Waiting for headers] [1 InRelease 14.2 kB/110 kB 13%] [Waiting for headers] [Connecting to ppa.l                                                                                                    Get:2 https://cloud.r-project.org/bin/linux/ubuntu jammy-cran40/ InRelease [3,626 B]
0% [Waiting for headers] [1 InRelease 14.2 kB/110 kB 13%] [2 InRelease 3,626 B/3,626 B 100%] [Connec0% [Waiting for headers] [1 InRelease 43.1 kB/110 kB 39%] [Connecting to ppa.launchpadcontent.net (1                                                                                                    Hit:3 http://archive.ubuntu.com/ubuntu jammy InRelease
                                                                                                    Get:4 https://developer.download.nvidia.com/compute/cuda/repos/ubuntu2204/x86_64  InRelease [1,581 B]
Get:5 http://archive.ubuntu.com/ubuntu jammy-upd

In [None]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.1.1-bin-hadoop3.2"
# Initialize findspark
import findspark
findspark.init()

(1) User defined function (udf) to add and return values

In [None]:
from pyspark.sql import SparkSession, functions as f
from pyspark.sql.types import StructType, StructField, IntegerType, Row, StringType

spark = SparkSession.builder.appName("Using udf").getOrCreate()

#--Dataframe with 4 columns
df = spark.createDataFrame([(1, 10000, 400, 100),
                            (2, 14000 , 500, 200),
                            (3, 12000 , 800, 300)],
                   ['Roll_Number', 'Fees', 'Fine', 'Discount'])
df.show()

+-----------+-----+----+--------+
|Roll_Number| Fees|Fine|Discount|
+-----------+-----+----+--------+
|          1|10000| 400|     100|
|          2|14000| 500|     200|
|          3|12000| 800|     300|
+-----------+-----+----+--------+



Declaring a user defined function and registration as udf

In [None]:
def std_fn(fees, fine, discount):
  return Row('o1','o2')(fees + fine, fees - discount)

#--Assignment of structure and and name of "to be newly" added columns
schema = StructType([StructField("Fees + Fine", IntegerType(), False),
                     StructField("Fees - Discount", IntegerType(), False)])

df_udf = f.udf(std_fn, schema)

Add new columns without SQL

In [None]:
upd_df1 = df.withColumn('r', df_udf(df['Fees'], df['Fine'], df['Discount']))
upd_df1.select("Roll_number", "Fees", "Fine", "Discount", "r.*").show()

+-----------+-----+----+--------+-----------+---------------+
|Roll_number| Fees|Fine|Discount|Fees + Fine|Fees - Discount|
+-----------+-----+----+--------+-----------+---------------+
|          1|10000| 400|     100|      10400|           9900|
|          2|14000| 500|     200|      14500|          13800|
|          3|12000| 800|     300|      12800|          11700|
+-----------+-----+----+--------+-----------+---------------+



Using SQL

In [None]:
upd_df1.createOrReplaceTempView("stds")
query1 = "select roll_number, fees, fine, discount, r.* from stds"
spark.sql(query1).show()

query2 = 'select * from stds'
spark.sql(query2).show()

+-----------+-----+----+--------+-----------+---------------+
|roll_number| fees|fine|discount|Fees + Fine|Fees - Discount|
+-----------+-----+----+--------+-----------+---------------+
|          1|10000| 400|     100|      10400|           9900|
|          2|14000| 500|     200|      14500|          13800|
|          3|12000| 800|     300|      12800|          11700|
+-----------+-----+----+--------+-----------+---------------+

+-----------+-----+----+--------+--------------+
|Roll_Number| Fees|Fine|Discount|             r|
+-----------+-----+----+--------+--------------+
|          1|10000| 400|     100| {10400, 9900}|
|          2|14000| 500|     200|{14500, 13800}|
|          3|12000| 800|     300|{12800, 11700}|
+-----------+-----+----+--------+--------------+



(2) Applying custom function with udf on pyspark columns

In [37]:
from pyspark.sql import SparkSession, functions as f, types as t
spark = SparkSession.builder.appName("Custom functions").getOrCreate()

In [38]:
#--DataFrame with column names
data = [('Arun',1,2,3), ('Aniket',4,5,6), ('Ishita',7,8,9)]
columns = ['name','maths_marks','science_marks', 'english_marks']
df = spark.createDataFrame(data, columns)
df.show()

+------+-----------+-------------+-------------+
|  name|maths_marks|science_marks|english_marks|
+------+-----------+-------------+-------------+
|  Arun|          1|            2|            3|
|Aniket|          4|            5|            6|
|Ishita|          7|            8|            9|
+------+-----------+-------------+-------------+



In [40]:
#--udf lambda function
m_udf = f.udf(lambda a,b,c : a + b + c, t.IntegerType())

In [41]:
#--adding column to df and displaying
df.withColumn("Sum of marks", m_udf(df['maths_marks'],df['science_marks'], df['english_marks']))
df.createOrReplaceTempView("marks")

q1 = "select * from marks"
spark.sql(q1).show()

spark.udf.register("m_udf", m_udf)
q2 = 'select *, m_udf(maths_marks, science_marks, english_marks) as sum_marks from marks'
spark.sql(q2).show()

+------+-----------+-------------+-------------+
|  name|maths_marks|science_marks|english_marks|
+------+-----------+-------------+-------------+
|  Arun|          1|            2|            3|
|Aniket|          4|            5|            6|
|Ishita|          7|            8|            9|
+------+-----------+-------------+-------------+

+------+-----------+-------------+-------------+---------+
|  name|maths_marks|science_marks|english_marks|sum_marks|
+------+-----------+-------------+-------------+---------+
|  Arun|          1|            2|            3|        6|
|Aniket|          4|            5|            6|       15|
|Ishita|          7|            8|            9|       24|
+------+-----------+-------------+-------------+---------+

