# UDF

User defined functions

In [0]:
data1 = [(1,'Nilesh','55000','3'),(2,'Nita','90000','5')]
columns =['id','name','salary','bonus']
df = spark.createDataFrame(data = data1, schema= columns)
df.show()

+---+------+------+-----+
| id|  name|salary|bonus|
+---+------+------+-----+
|  1|Nilesh| 55000|    3|
|  2|  Nita| 90000|    5|
+---+------+------+-----+



In [0]:
a=10
int(a)

Out[7]: 10

In [0]:
import sys
def total_sal(sal, com):
    try:
        out=int(sal)+(float(sal)*(float(com)/100))
    except:
        out = sys.exc_info()[1]

    return out

In [0]:
total_sal('3000','3')

Out[10]: 3090.0

In [0]:
from pyspark.sql.functions import udf, col
from pyspark.sql.types import IntegerType, FloatType

commission_calc = udf(lambda sal, com : total_sal(sal, com), FloatType())

In [0]:
df.withColumn('COM', commission_calc(df.salary, df.bonus) ).show()

+---+------+------+-----+-------+
| id|  name|salary|bonus|    COM|
+---+------+------+-----+-------+
|  1|Nilesh| 55000|    3|56650.0|
|  2|  Nita| 90000|    5|94500.0|
+---+------+------+-----+-------+



In [0]:
## instead of doing all the operations we may use decorator here
import sys
@udf(returnType = FloatType())
def total_sal1(sal, com):
    try:
        out=int(sal)+(float(sal)*(float(com)/100))
    except:
        out = sys.exc_info()[1]

    return out

In [0]:
from pyspark.sql.functions import lit
total_sal1(lit(3000),lit(3))

Out[17]: Column<'total_sal1(3000, 3)'>

In [0]:
df.select('*', total_sal1(df.salary, df.bonus).alias('commission')).show()

+---+------+------+-----+----------+
| id|  name|salary|bonus|commission|
+---+------+------+-----+----------+
|  1|Nilesh| 55000|    3|   56650.0|
|  2|  Nita| 90000|    5|   94500.0|
+---+------+------+-----+----------+



In [0]:
### creating the udf function and using the same as SQL function
df.createOrReplaceTempView('dummy_emp_tbl')

In [0]:
def total_sal2(sal, com):
    try:
        out=int(sal)+(float(sal)*(float(com)/100))
    except:
        out = sys.exc_info()[1]

    return out

In [0]:
spark.udf.register(name = 'cal_com', f=total_sal2, returnType=FloatType())

Out[38]: <function __main__.total_sal2(sal, com)>

In [0]:
spark.udf.register(name = 'cal_com', f=total_sal2, returnType=FloatType())

In [0]:
%sql

select * from dummy_emp_tbl;

select salary, bonus, cal_Com(salary, bonus)
from dummy_emp_tbl;

salary,bonus,"cal_com(salary, bonus)"
55000,3,56650.0
90000,5,94500.0


## RDD

Resilient Distributed Dataset

it's immutable, and data is in memmory processing

In [0]:
data1 = [(1,'Nilesh','55000','3'),(2,'Nita','90000','5')]
rdd = spark.sparkContext.parallelize(data1)
print(rdd)

ParallelCollectionRDD[10] at readRDDFromInputStream at PythonRDD.scala:435


In [0]:
## collect the data from different nodes and partition
print(type(rdd))

<class 'pyspark.rdd.RDD'>


In [0]:
print(rdd.collect())

[(1, 'Nilesh', '55000', '3'), (2, 'Nita', '90000', '5')]


In [0]:
### 
df1 = rdd.toDF(['id','name','salary','interest'])
df1.show()

+---+------+------+--------+
| id|  name|salary|interest|
+---+------+------+--------+
|  1|Nilesh| 55000|       3|
|  2|  Nita| 90000|       5|
+---+------+------+--------+



In [0]:
df2 = spark.createDataFrame(rdd,['id','name','salary','interest'] )
df2.show()

+---+------+------+--------+
| id|  name|salary|interest|
+---+------+------+--------+
|  1|Nilesh| 55000|       3|
|  2|  Nita| 90000|       5|
+---+------+------+--------+



In [0]:
data1 = [('Nilesh','Chavan'),('Nita','Kale')]
rdd1 = spark.sparkContext.parallelize(data1)
print(rdd1)

ParallelCollectionRDD[12] at readRDDFromInputStream at PythonRDD.scala:435


In [0]:
df3 = spark.createDataFrame(rdd1,['col1','col2'] )
df3.show()

+------+------+
|  col1|  col2|
+------+------+
|Nilesh|Chavan|
|  Nita|  Kale|
+------+------+



In [0]:
tobj=rdd1.collect()
tobj[:]

Out[32]: [('Nilesh', 'Chavan'), ('Nita', 'Kale')]

In [0]:
rdd2 = rdd1.map(lambda x: x+ (x[0]+' '+x[1],))
print(rdd2.collect())
df3 = rdd2.toDF(['f_name','l_name','full_name'])
df3.show()

[('Nilesh', 'Chavan', 'Nilesh Chavan'), ('Nita', 'Kale', 'Nita Kale')]
+------+------+-------------+
|f_name|l_name|    full_name|
+------+------+-------------+
|Nilesh|Chavan|Nilesh Chavan|
|  Nita|  Kale|    Nita Kale|
+------+------+-------------+



In [0]:
data123 = ['Mr. Anay','Miss Anita']
rdd = spark.sparkContext.parallelize(data123)

for i in rdd.collect():
    print(i)


Mr. Anay
Miss Anita


In [0]:
rdd123 = rdd.map(lambda x: x.split(' '))
for i in rdd123.collect():
    print(i) 

['Mr.', 'Anay']
['Miss', 'Anita']


In [0]:
rdd123 = rdd.flatMap(lambda x: x.split(' '))
for i in rdd123.collect():
    print(i) 

Mr.
Anay
Miss
Anita


## PartitionBy()

In [0]:
adv_df = spark.read.format('csv').option('header','true').option('inferSchema', 'true')\
    .load('dbfs:/FileStore/Advertising.csv')
adv_df.show()

+------+-----+-----+---------+-----+
| Month|   TV|radio|newspaper|sales|
+------+-----+-----+---------+-----+
|Jan-00|230.1| 37.8|     69.2| 22.1|
|Feb-00| 44.5| 39.3|     45.1| 10.4|
|Mar-00| 17.2| 45.9|     69.3|  9.3|
|Apr-00|151.5| 41.3|     58.5| 18.5|
|May-00|180.8| 10.8|     58.4| 12.9|
|Jun-00|  8.7| 48.9|     75.0|  7.2|
|Jul-00| 57.5| 32.8|     23.5| 11.8|
|Aug-00|120.2| 19.6|     11.6| 13.2|
|Sep-00|  8.6|  2.1|      1.0|  4.8|
|Oct-00|199.8|  2.6|     21.2| 10.6|
|Nov-00| 66.1|  5.8|     24.2|  8.6|
|Dec-00|214.7| 24.0|      4.0| 17.4|
|01-Jan| 23.8| 35.1|     65.9|  9.2|
|01-Feb| 97.5|  7.6|      7.2|  9.7|
|01-Mar|204.1| 32.9|     46.0| 19.0|
|01-Apr|195.4| 47.7|     52.9| 22.4|
|01-May| 67.8| 36.6|    114.0| 12.5|
|01-Jun|281.4| 39.6|     55.8| 24.4|
|01-Jul| 69.2| 20.5|     18.3| 11.3|
|01-Aug|147.3| 23.9|     19.1| 14.6|
+------+-----+-----+---------+-----+
only showing top 20 rows



In [0]:
adv_df.write.parquet(path = 'dbfs:/temps/work/', mode= 'overwrite', partitionBy = 'TV')

In [0]:
### read all folders created in parquet file
spark.read.parquet('dbfs:/temps/work/').show()

+------+-----+---------+-----+-----+
| Month|radio|newspaper|sales|   TV|
+------+-----+---------+-----+-----+
|Oct-00|  2.6|     21.2| 10.6|199.8|
|04-Mar|  3.1|     34.6| 11.4|199.8|
|08-Feb| 21.0|     22.0| 15.5|184.9|
|11-Aug| 43.9|      1.7| 20.7|184.9|
|08-May|  4.3|     49.8| 11.7|222.4|
|14-Jul|  3.4|     13.1| 11.5|222.4|
|01-Oct|  5.1|     23.5| 12.5|237.4|
|05-Sep| 27.5|     11.0| 18.9|237.4|
|05-Dec| 14.3|     31.7| 12.4|109.8|
|07-Jun| 47.8|     51.4| 16.7|109.8|
|08-Jan|  3.5|      5.9| 11.7|197.6|
|12-Sep| 23.3|     14.2| 16.6|197.6|
|03-Jun| 33.4|     38.7| 17.1|177.0|
|16-Jun|  9.3|      6.4| 12.8|177.0|
|06-Sep| 26.7|     22.3| 11.8| 76.4|
|09-Oct|  0.8|     14.8|  9.4| 76.4|
|Mar-00| 45.9|     69.3|  9.3| 17.2|
|16-Jan|  4.1|     31.6|  5.9| 17.2|
|02-Apr| 16.7|     22.9| 15.9|240.1|
|12-Mar|  7.3|      8.7| 13.2|240.1|
+------+-----+---------+-----+-----+
only showing top 20 rows



In [0]:
spark.read.parquet('dbfs:/temps/work/TV=120.5').show()

+------+-----+---------+-----+
| Month|radio|newspaper|sales|
+------+-----+---------+-----+
|06-Jun| 28.5|     14.2| 14.2|
+------+-----+---------+-----+



In [0]:
adv_df.write.parquet(path = 'dbfs:/temps/work/', mode= 'overwrite', partitionBy = ['TV','radio'])

## Json type

In [0]:
data1 = [('Sunita', "{'g1': 'football', 'g2': 'Archary'}"),('Fatima',"{'g1': 'Baseball', 'g2': 'chess'}")]
columns =['name', 'games']

df = spark.createDataFrame(data = data1, schema = columns)
df.show(truncate = False)
df.printSchema()

+------+-----------------------------------+
|name  |games                              |
+------+-----------------------------------+
|Sunita|{'g1': 'football', 'g2': 'Archary'}|
|Fatima|{'g1': 'Baseball', 'g2': 'chess'}  |
+------+-----------------------------------+

root
 |-- name: string (nullable = true)
 |-- games: string (nullable = true)



convert the json string to map type

to_json is the function to convert any map type or struct type  to json string

In [0]:
from pyspark.sql.functions import from_json
from pyspark.sql.types import MapType, StringType

MapTypeStructure= MapType(StringType(), StringType() )

df1 = df.withColumn('dict_map', from_json(df.games,  MapTypeStructure))
df1.show(truncate = False)
df1.printSchema()

+------+-----------------------------------+-------------------------------+
|name  |games                              |dict_map                       |
+------+-----------------------------------+-------------------------------+
|Sunita|{'g1': 'football', 'g2': 'Archary'}|{g1 -> football, g2 -> Archary}|
|Fatima|{'g1': 'Baseball', 'g2': 'chess'}  |{g1 -> Baseball, g2 -> chess}  |
+------+-----------------------------------+-------------------------------+

root
 |-- name: string (nullable = true)
 |-- games: string (nullable = true)
 |-- dict_map: map (nullable = true)
 |    |-- key: string
 |    |-- value: string (valueContainsNull = true)



In [0]:
df2 = df1.withColumn('g1', df1.dict_map.g1).withColumn('g2',df1.dict_map.g2) 

In [0]:
df2.show(truncate= False)

+------+-----------------------------------+-------------------------------+--------+-------+
|name  |games                              |dict_map                       |g1      |g2     |
+------+-----------------------------------+-------------------------------+--------+-------+
|Sunita|{'g1': 'football', 'g2': 'Archary'}|{g1 -> football, g2 -> Archary}|football|Archary|
|Fatima|{'g1': 'Baseball', 'g2': 'chess'}  |{g1 -> Baseball, g2 -> chess}  |Baseball|chess  |
+------+-----------------------------------+-------------------------------+--------+-------+



## Struct Type

In [0]:
df.printSchema()

root
 |-- name: string (nullable = true)
 |-- games: string (nullable = true)



In [0]:
from pyspark.sql.functions import from_json
from pyspark.sql.types import StructType, StructField , StringType

schema1 = StructType([ StructField('g1',StringType()),\
                    StructField('g2',StringType())])

df1 = df.withColumn('game_struct', from_json(df.games, schema1 ))
df1.show(truncate = False)
df1.printSchema()

+------+-----------------------------------+-------------------+
|name  |games                              |game_struct        |
+------+-----------------------------------+-------------------+
|Sunita|{'g1': 'football', 'g2': 'Archary'}|{football, Archary}|
|Fatima|{'g1': 'Baseball', 'g2': 'chess'}  |{Baseball, chess}  |
+------+-----------------------------------+-------------------+

root
 |-- name: string (nullable = true)
 |-- games: string (nullable = true)
 |-- game_struct: struct (nullable = true)
 |    |-- g1: string (nullable = true)
 |    |-- g2: string (nullable = true)



In [0]:
df2 = df1.withColumn('game1', df1.game_struct.g1 )
df2.show()

+------+--------------------+-------------------+--------+
|  name|               games|        game_struct|   game1|
+------+--------------------+-------------------+--------+
|Sunita|{'g1': 'football'...|{football, Archary}|football|
|Fatima|{'g1': 'Baseball'...|  {Baseball, chess}|Baseball|
+------+--------------------+-------------------+--------+



In [0]:
data1 = [('Sunita', {'g1': 'football', 'g2': 'Archary'}),('Fatima',{'g1': 'Baseball', 'g2': 'chess'})]
columns =['name', 'games']

df = spark.createDataFrame(data = data1, schema = columns)
df.show(truncate = False)
df.printSchema()

+------+-------------------------------+
|name  |games                          |
+------+-------------------------------+
|Sunita|{g1 -> football, g2 -> Archary}|
|Fatima|{g1 -> Baseball, g2 -> chess}  |
+------+-------------------------------+

root
 |-- name: string (nullable = true)
 |-- games: map (nullable = true)
 |    |-- key: string
 |    |-- value: string (valueContainsNull = true)



In [0]:
from pyspark.sql.functions import to_json
df1 = df.withColumn('game_dict', to_json(df.games))
df1.show()
df1.printSchema()

+------+--------------------+--------------------+
|  name|               games|           game_dict|
+------+--------------------+--------------------+
|Sunita|{g1 -> football, ...|{"g1":"football",...|
|Fatima|{g1 -> Baseball, ...|{"g1":"Baseball",...|
+------+--------------------+--------------------+

root
 |-- name: string (nullable = true)
 |-- games: map (nullable = true)
 |    |-- key: string
 |    |-- value: string (valueContainsNull = true)
 |-- game_dict: string (nullable = true)



In [0]:
data1

Out[10]: [('Sunita', {'g1': 'football', 'g2': 'Archary'}),
 ('Fatima', {'g1': 'Baseball', 'g2': 'chess'})]

In [0]:
df.show(truncate=False)
df.printSchema()

+------+-------------------------------+
|name  |games                          |
+------+-------------------------------+
|Sunita|{g1 -> football, g2 -> Archary}|
|Fatima|{g1 -> Baseball, g2 -> chess}  |
+------+-------------------------------+

root
 |-- name: string (nullable = true)
 |-- games: map (nullable = true)
 |    |-- key: string
 |    |-- value: string (valueContainsNull = true)



## Json tuple
to extract few keys from the given string in the form of dictionary

In [0]:
from pyspark.sql.functions import json_tuple
data1 = [('Sunita', "{'g1': 'football', 'g2': 'Archary'}"),('Fatima',"{'g1': 'Baseball', 'g2': 'chess'}")]
columns =['name', 'games']

df = spark.createDataFrame(data = data1, schema = columns)
df.show(truncate = False)
df.printSchema()

+------+-----------------------------------+
|name  |games                              |
+------+-----------------------------------+
|Sunita|{'g1': 'football', 'g2': 'Archary'}|
|Fatima|{'g1': 'Baseball', 'g2': 'chess'}  |
+------+-----------------------------------+

root
 |-- name: string (nullable = true)
 |-- games: string (nullable = true)



In [0]:
df2= df.select(df.name, json_tuple(df.games,'g1','g2').alias('g1','g2'))
df2.show()

+------+--------+-------+
|  name|      g1|     g2|
+------+--------+-------+
|Sunita|football|Archary|
|Fatima|Baseball|  chess|
+------+--------+-------+



## get json_objects

get the information from nested dictionary

In [0]:
data = [('EMP100', "{'name': 'Anil', 'address':{'city':'Solapur', 'landmark':'Navi Peth'}, 'gender': 'M'}"),
        ('EMP101', "{'name':'Sunil', 'address':{'city':'Kolhapur', 'landmark':'Juni Peth'}, 'gender': 'M'}"),
        ('EMP102', "{'name': 'Neel', 'address':{'city':'Nagpur', 'landmark':'Peth'}, 'gender': 'M'}")]

schema = ['Emp_id', 'pdetails']

df = spark.createDataFrame(data, schema)
df.show(truncate = False)
df.printSchema()


+------+--------------------------------------------------------------------------------------+
|Emp_id|pdetails                                                                              |
+------+--------------------------------------------------------------------------------------+
|EMP100|{'name': 'Anil', 'address':{'city':'Solapur', 'landmark':'Navi Peth'}, 'gender': 'M'} |
|EMP101|{'name':'Sunil', 'address':{'city':'Kolhapur', 'landmark':'Juni Peth'}, 'gender': 'M'}|
|EMP102|{'name': 'Neel', 'address':{'city':'Nagpur', 'landmark':'Peth'}, 'gender': 'M'}       |
+------+--------------------------------------------------------------------------------------+

root
 |-- Emp_id: string (nullable = true)
 |-- pdetails: string (nullable = true)



In [0]:
from pyspark.sql.functions import get_json_object
df1 = df.select('Emp_id', get_json_object('pdetails','$.address.city').alias('city'))
df1.show()
df1.printSchema()

+------+--------+
|Emp_id|    city|
+------+--------+
|EMP100| Solapur|
|EMP101|Kolhapur|
|EMP102|  Nagpur|
+------+--------+

root
 |-- Emp_id: string (nullable = true)
 |-- city: string (nullable = true)

