##RDD functions


In [0]:
help(spark.sparkContext.textFile)

Help on method textFile in module pyspark.context:

textFile(name: str, minPartitions: Optional[int] = None, use_unicode: bool = True) -> pyspark.rdd.RDD[str] method of dbruntime.spark_connection.RemoteContext instance
    Read a text file from HDFS, a local file system (available on all
    nodes), or any Hadoop-supported file system URI, and return it as an
    RDD of Strings. The text files must be encoded as UTF-8.
    
    .. versionadded:: 0.7.0
    
    Parameters
    ----------
    name : str
        directory to the input data files, the path can be comma separated
        paths as a list of inputs
    minPartitions : int, optional
        suggested minimum number of partitions for the resulting RDD
    use_unicode : bool, default True
        If `use_unicode` is False, the strings will be kept as `str` (encoding
        as `utf-8`), which is faster and smaller than unicode.
    
        .. versionadded:: 1.2.0
    
    Returns
    -------
    :class:`RDD`
        RDD represen

In [0]:
#we can read data from file in 2 ways 
order_items=sc.textFile('dbfs:/FileStore/order_items')
print(list(order_items.take(5)))

['1,1,957,1,299.98,299.98', '2,2,1073,1,199.99,199.99', '3,2,502,5,250.0,50.0', '4,2,403,1,129.99,129.99', '5,4,897,2,49.98,24.99']


In [0]:
rdd=spark.sparkContext.textFile('dbfs:/FileStore/order_items')
for i in rdd.take(5): print(i)

1,1,957,1,299.98,299.98
2,2,1073,1,199.99,199.99
3,2,502,5,250.0,50.0
4,2,403,1,129.99,129.99
5,4,897,2,49.98,24.99


In [0]:
# no of partition
rdd.getNumPartitions()

2

In [0]:
#gives no of records in each partition
rdd.glom().map(len).collect() 

[87942, 84256]

In [0]:
#create rdd from python list
lst=[1,2,3,4,5]
help(spark.sparkContext.parallelize)#takes list as input and we can give nof of partition also

Help on method parallelize in module pyspark.context:

parallelize(c: Iterable[~T], numSlices: Optional[int] = None) -> pyspark.rdd.RDD[~T] method of dbruntime.spark_connection.RemoteContext instance
    Distribute a local Python collection to form an RDD. Using range
    is recommended if the input represents a range for performance.
    
    .. versionadded:: 0.7.0
    
    Parameters
    ----------
    c : :class:`collections.abc.Iterable`
        iterable collection to distribute
    numSlices : int, optional
        the number of partitions of the new RDD
    
    Returns
    -------
    :class:`RDD`
        RDD representing distributed collection.
    
    Examples
    --------
    >>> sc.parallelize([0, 2, 3, 4, 6], 5).glom().collect()
    [[0], [2], [3], [4], [6]]
    >>> sc.parallelize(range(0, 6, 2), 5).glom().collect()
    [[], [0], [], [2], [4]]
    
    Deal with a list of strings.
    
    >>> strings = ["a", "b", "c"]
    >>> sc.parallelize(strings, 2).glom().collect()
 

In [0]:
pyt_rdd=spark.sparkContext.parallelize(lst)
print(list(pyt_rdd.take(5)))#action
pyt_rdd.collect()#action

[1, 2, 3, 4, 5]


[1, 2, 3, 4, 5]

###Map function

In [0]:
ord=spark.sparkContext.textFile('dbfs:/FileStore/order')
for i in ord.take(5): print(i)                              

1,2013-07-25 00:00:00.0,11599,CLOSED
2,2013-07-25 00:00:00.0,256,PENDING_PAYMENT
3,2013-07-25 00:00:00.0,12111,COMPLETE
4,2013-07-25 00:00:00.0,8827,CLOSED
5,2013-07-25 00:00:00.0,11318,COMPLETE


In [0]:
# project all the order_id
ord_id=ord.map(lambda x: x.split(',')[2])
for i in ord_id.take(5):print(i)

11599
256
12111
8827
11318


In [0]:
# project all orders and status
ord_id_status=ord.map(lambda x: (x.split(',')[2],x.split(',')[3]))
for i in ord_id_status.take(5): print(i)

('11599', 'CLOSED')
('256', 'PENDING_PAYMENT')
('12111', 'COMPLETE')
('8827', 'CLOSED')
('11318', 'COMPLETE')


In [0]:
# combine order id and status with '&'
ord_id_statu=ord.map(lambda x: x.split(',')[2] + '&' +x.split(',')[3])
for i in ord_id_statu.take(5): print(i)

11599&CLOSED
256&PENDING_PAYMENT
12111&COMPLETE
8827&CLOSED
11318&COMPLETE


In [0]:
#convert the order date into yyy/mm/dd formate
order_date=ord.map(lambda x: x.split(',')[1].split(' ')[0].split('-')[0]
    +'/'+ x.split(',')[1].split(' ')[0].split('-')[1]
    +'/'+ x.split(',')[1].split(' ')[0].split('-')[2])
for i in order_date.take(5): print(i)

2013/07/25
2013/07/25
2013/07/25
2013/07/25
2013/07/25


In [0]:
#create key-value pair key=orde_id,value=whole record
ord_rdd=ord.map(lambda x: (x.split(',')[2],x))
for i in ord_rdd.take(5):print(i)

('11599', '1,2013-07-25 00:00:00.0,11599,CLOSED')
('256', '2,2013-07-25 00:00:00.0,256,PENDING_PAYMENT')
('12111', '3,2013-07-25 00:00:00.0,12111,COMPLETE')
('8827', '4,2013-07-25 00:00:00.0,8827,CLOSED')
('11318', '5,2013-07-25 00:00:00.0,11318,COMPLETE')


#flatMap

In [0]:
#flatMap will make each value within a map records as each record
fm_ord=ord.flatMap(lambda x: x.split(','))
for i in fm_ord.take(5): print(i)

1
2013-07-25 00:00:00.0
11599
CLOSED
2


In [0]:
#count the words
word_count=ord.flatMap(lambda x: x.split(',')).map(lambda w: (w,1)).reduceByKey(lambda x,y:x+y)
for i in word_count.take(5): print(i)

('1', 2)
('CLOSED', 7556)
('256', 11)
('12111', 7)
('4', 7)


##filter

In [0]:
filter_ord=ord.map(lambda x: (x.split(',')[3],x)).filter(lambda x:x[0] in ['CLOSED','COMPLETE'] and 
                                                                (x[1].split(',')[1].split('-')[0]=='2014'))
for i in filter_ord.take(5): print(i)

('COMPLETE', '25882,2014-01-01 00:00:00.0,4598,COMPLETE')
('COMPLETE', '25888,2014-01-01 00:00:00.0,6735,COMPLETE')
('COMPLETE', '25889,2014-01-01 00:00:00.0,10045,COMPLETE')
('CLOSED', '25891,2014-01-01 00:00:00.0,3037,CLOSED')
('COMPLETE', '25895,2014-01-01 00:00:00.0,1044,COMPLETE')


In [0]:
print(ord.count())
print(filter_ord.count())

68883
16831


##mapValue 
######take key,value pair as input
######do not change the key apply function log to value of the same key

In [0]:
rdd = sc.parallelize((("a", (1,2,3)), ("b", (3,4,5)),("a", (1,2,3,4,5))))
def f(x): return len(x)#length of value 
def mult(x):return tuple(i*2 for i in x) #mult 2 to each number in vlaue
rdd.mapValues(mult).collect()

[('a', (2, 4, 6)), ('b', (6, 8, 10)), ('a', (2, 4, 6, 8, 10))]

#join
#######join operation will applicable on key value pairs
######When called on datasets of type (K, V) and (K, W), returns a dataset of (K, (V, W)) pairs with all pairs of elements for each key

In [0]:
ord_keyvalue=ord.map(lambda x: (x.split(',')[0],x.split(',')[2]))# ord_id,cust_id
print('ord_key_value')
for i in ord_keyvalue.take(5): print(i)

print('order_items_key_value')
order_items_keyvalue=order_items.map(lambda x: (x.split(',')[1],x.split(',')[4]))# ord_id,price
for i in order_items_keyvalue.take(5): print(i)

print('join output')
ord_join_order_items=ord_keyvalue.join(order_items_keyvalue)
for i in ord_join_order_items.take(5):print(i)
print('SUBTOTAL')
subtotal=ord_join_order_items.map(lambda x: (x[0],x[1][1])).reduceByKey(lambda x,y:x+y)
for i in subtotal.take(5):print(i)

ord_key_value
('1', '11599')
('2', '256')
('3', '12111')
('4', '8827')
('5', '11318')
order_items_key_value
('1', '299.98')
('2', '199.99')
('2', '250.0')
('2', '129.99')
('4', '49.98')
join output
('4', ('8827', '49.98'))
('4', ('8827', '299.95'))
('4', ('8827', '150.0'))
('4', ('8827', '199.92'))
('10', ('5648', '199.99'))
SUBTOTAL
('4', '49.98299.95150.0199.92')
('10', '199.9999.96129.9921.99199.99')
('12', '299.98100.0149.94499.95250.0')
('16', '119.98299.95')
('20', '250.0199.92129.99299.95')


#cogroup
######When called on datasets of type (K, V) and (K, W), returns a dataset of (K, (Iterable V, Iterable W)) tuples.

In [0]:

x = sc.parallelize([("a", 1), ("b", 4)])
y = sc.parallelize([("a", 2)])
xy = x.cogroup(y)
for i,j in xy.take(2): print(i,list(map(list,j)))

a [[1], [2]]
b [[4], []]


In [0]:
#list all price corresponding to each order id
#creating two rdd from order_items data to apply cogroup
order_items.glom().map(len).collect()
order_items_1=spark.sparkContext.parallelize(order_items.take(87942))
order_items_2=spark.sparkContext.parallelize(order_items.take(84256))

#creating keyvalue pairs
order_items1_keyvalue=order_items_1.map(lambda x: (x.split(',')[1],x.split(',')[4]))
order_items2_keyvalue=order_items_2.map(lambda x: (x.split(',')[1],x.split(',')[4]))
#cogroup
order_items_cogroup=order_items1_keyvalue.cogroup(order_items2_keyvalue)
for i,j in order_items_cogroup.take(5):
    print(i,list(map(list,j)))

4 [['49.98', '299.95', '150.0', '199.92'], ['49.98', '299.95', '150.0', '199.92']]
10 [['199.99', '99.96', '129.99', '21.99', '199.99'], ['199.99', '99.96', '129.99', '21.99', '199.99']]
12 [['299.98', '100.0', '149.94', '499.95', '250.0'], ['299.98', '100.0', '149.94', '499.95', '250.0']]
16 [['119.98', '299.95'], ['119.98', '299.95']]
20 [['250.0', '199.92', '129.99', '299.95'], ['250.0', '199.92', '129.99', '299.95']]


#Cartesian(cross join)

In [0]:
rdd = sc.parallelize((1,3,2))
sorted(rdd.cartesian(rdd).collect())

[(1, 1), (1, 2), (1, 3), (2, 1), (2, 2), (2, 3), (3, 1), (3, 2), (3, 3)]

#Aggregation operations
######Total aggregations – reduce, count (Actions)
######By Key aggregations – reduceByKey, aggregrateByKey, groupByKey, countByKey (Transformations)

In [0]:
### Count the number of orders which are closed.
ord.filter(lambda x : x.split(',')[3]=='CLOSED').count()


7556

In [0]:
### Find the total quantity sold for Order ID 1-10.
Result=order_items.filter(lambda x : int(x.split(',')[1]) < 11)\
             .map(lambda x : float(x.split(',')[3]))\
             .reduce(lambda x,y : x+y)
print(Result)
from operator import add
Result=order_items.filter(lambda x : int(x.split(',')[1]) < 11).map(lambda x : float(x.split(',')[3])).reduce(add)
print(Result)

62.0
62.0


In [0]:
### For a given order 10 find the maximum subtotal out of all orders.
max_subtotal_10=order_items.filter(lambda x : int(x.split(',')[1])==10).map(lambda x : x.split(',')[4]).reduce(lambda a,b : a if 
(float(a.split(',')[0]) > float(b.split(',')[0])) else b)
print(max_subtotal_10)
max_subtotal_10=order_items.filter(lambda x : int(x.split(',')[1])==10).map(lambda x : x.split(',')[4]).reduce(max)


199.99


#Aggregation keys
######groupByKey():
######aggregrateByKey(): 
######reduceByKey():
######countByKey():


#groupByKey(numpartiton=None,partition function)
###### When called on a dataset of (K, V) pairs, returns a dataset of (K, Iterable V) pairs.
######groupByKey perform shuffling .while shuffling create another stage and more i/o operation takes palce.


In [0]:
# Find aggregated revenue for each product
aggregated_revenue = order_items.map(lambda x: (x.split(',')[2], float(x.split(',')[4]))) \
                                .groupByKey() \
                                .mapValues(list)  # Convert values to list directly

print('groupByKey output')
for i, j in aggregated_revenue.take(1):
    print(i, j)  # j is already a list, no need for further conversion

print('mapValues output')
# Sum the values for each key
print(sorted(aggregated_revenue.mapValues(sum).take(5)))

print('map output')
# Another way to sum the values for each key
aggregated_revenue.map(lambda x: (x[0], sum(x[1]))).take(5)


groupByKey output
957 [299.98, 299.98, 299.98, 299.98, 299.98, 299.98, 299.98, 299.98, 299.98, 299.98, 299.98, 299.98, 299.98, 299.98, 299.98, 299.98, 299.98, 299.98, 299.98, 299.98, 299.98, 299.98, 299.98, 299.98, 299.98, 299.98, 299.98, 299.98, 299.98, 299.98, 299.98, 299.98, 299.98, 299.98, 299.98, 299.98, 299.98, 299.98, 299.98, 299.98, 299.98, 299.98, 299.98, 299.98, 299.98, 299.98, 299.98, 299.98, 299.98, 299.98, 299.98, 299.98, 299.98, 299.98, 299.98, 299.98, 299.98, 299.98, 299.98, 299.98, 299.98, 299.98, 299.98, 299.98, 299.98, 299.98, 299.98, 299.98, 299.98, 299.98, 299.98, 299.98, 299.98, 299.98, 299.98, 299.98, 299.98, 299.98, 299.98, 299.98, 299.98, 299.98, 299.98, 299.98, 299.98, 299.98, 299.98, 299.98, 299.98, 299.98, 299.98, 299.98, 299.98, 299.98, 299.98, 299.98, 299.98, 299.98, 299.98, 299.98, 299.98, 299.98, 299.98, 299.98, 299.98, 299.98, 299.98, 299.98, 299.98, 299.98, 299.98, 299.98, 299.98, 299.98, 299.98, 299.98, 299.98, 299.98, 299.98, 299.98, 299.98, 299.98, 2

[('957', 4118425.4199997648),
 ('502', 3147800.0),
 ('897', 20566.769999999986),
 ('365', 4421143.020001181),
 ('134', 20025.0)]

#reduceByKey(func,numpartition=none,partitionfunction)
###### .When called on a dataset of (K, V) pairs, returns a dataset of (K, V) pairs where the values for each key are aggregated using the given reduce function func, which must be of type (V,V) => V. 
###### .reduceByKey() perform combiner which reduce shuffling
![shuffling_with_combiner](/dbfs/FileStore/shuffling_with_coimbiner.png)


In [0]:
revenu_1=order_items.filter(lambda x: int(x.split(',')[1])==2).map(lambda x: (x.split(',')[1], float(x.split(',')[4]))).groupByKey().mapValues(list)
revenu_1.first()


('2', [199.99, 250.0, 129.99])

In [0]:
from operator import add
#Find total revenue sold for each order.(key=id,value=revenu)
total_revenu=order_items.map(lambda x: (x.split(',')[1], float(x.split(',')[4]))).reduceByKey(add)
total_revenu=order_items.map(lambda x: (x.split(',')[1], float(x.split(',')[4]))).reduceByKey(lambda x,y:x+y)
for i in total_revenu.take(5):print(i)

('1', 299.98)
('4', 699.85)
('8', 729.8399999999999)
('9', 599.96)
('10', 651.9200000000001)


In [0]:
total_revenu=order_items.map(lambda x: (x.split(',')[1], float(x.split(',')[4]))).reduceByKey(lambda x,y:x+y).filter(lambda x: int(x[0])==2).first()
print(total_revenu)

('2', 579.98)


In [0]:
#find max revenu for each order. (key=id,value=revenu)
max_revenu=order_items.map(lambda x: (x.split(',')[1], float(x.split(',')[4]))).reduceByKey(max)##using max function
max_revenu=order_items.map(lambda x: (x.split(',')[1], float(x.split(',')[4]))).reduceByKey(lambda x,y: x if x>y else y )#using max logic
for i in max_revenu.take(5):print(i)

('1', 299.98)
('4', 299.95)
('8', 299.95)
('9', 199.99)
('10', 199.99)


In [0]:
### Find the maximum revenue for each order. (key=id,value=entire record)
total_revenu=order_items.map(lambda x : (int(x.split(',') [1]),x)).reduceByKey(lambda a,b : a if (float(a.split(',')[4]) > float(b.split(',')[4])) else 
b)
for i in total_revenu.take(5):print(i)
print(sorted(total_revenu.take(5),reverse=True))



(2, '3,2,502,5,250.0,50.0')
(4, '6,4,365,5,299.95,59.99')
(8, '18,8,365,5,299.95,59.99')
(10, '28,10,1073,1,199.99,199.99')
(12, '37,12,191,5,499.95,99.99')
[(12, '37,12,191,5,499.95,99.99'), (10, '28,10,1073,1,199.99,199.99'), (8, '18,8,365,5,299.95,59.99'), (4, '6,4,365,5,299.95,59.99'), (2, '3,2,502,5,250.0,50.0')]


#aggregateByKey(zeroValue, seqOp, combOp, (numPartitions))
###### Zero Value: Initial value to initialize the accumulator. Use 0 for integer and NULL for collections.
######SeqOp: Function used to accumulate the results of each partition, and stores the running accumulated result to U. (U,T) => U.
######CombOp: Function is used to combine results of all partitions U.


In [0]:
#Find the maximum revenue for each Order.
ordItems=sc.parallelize([
(2,"Joseph",200), (2,"Jimmy",250), (2,"Tina",130), (4,"Jimmy",50), (4,"Tina",300),
(4,"Joseph",150), (4,"Ram",200), (7,"Tina",200), (7,"Joseph",300), (7,"Jimmy",80)],2)
#Create a Paired RDD
ordPair = ordItems.map(lambda x : (x[0],(x[1],x[2])))
#Initialize Accumulator
# Zero Value: Zero value in our case will be 0 as we are finding Maximum Marks
zero_val=0
#Define Sequence Operation
# Sequence operation : Finding Maximum revenue from each partition
def seq_op(accumulator, element):
    if(accumulator > element[1]):
        return accumulator 
    else: 
        return element[1]
#Define Combiner Operation
#Combiner Operation : Finding Maximum revenue from all partitions
def comb_op(accumulator1, accumulator2):
    if(accumulator1 > accumulator2):
        return accumulator1
    else:
        return accumulator2
aggr_ordItems = ordPair.aggregateByKey(zero_val, seq_op, comb_op)
for i in aggr_ordItems.collect(): print(i) 

(2, 250)
(4, 300)
(7, 300)


In [0]:
#Find the maximum revenue for each Order. Print customer name.
ordItems=sc.parallelize([
(2,"Joseph",200), (2,"Jimmy",250), (2,"Tina",130), (4,"Jimmy",50), (4,"Tina",300),
(4,"Joseph",150), (4,"Ram",200), (7,"Tina",200), (7,"Joseph",300), (7,"Jimmy",80)],2)
#Create a Paired RDD
ordPair = ordItems.map(lambda x : (x[0],(x[1],x[2])))
#Initialize Accumulator
# Zero Value: Zero value in our case will be 0 as we are finding Maximum Marks
zero_val=('',0)
#Define Sequence Operation
# Sequence operation : Finding Maximum revenue from each partition
def seq_op(accumulator, element):
    if(accumulator[1] > element[1]):
        return accumulator 
    else: 
        return element
#Define Combiner Operation
#Combiner Operation : Finding Maximum revenue from all partitions
def comb_op(accumulator1, accumulator2):
    if(accumulator1[1] > accumulator2[1]):
        return accumulator1
    else:
        return accumulator2
aggr_ordItems = ordPair.aggregateByKey(zero_val, seq_op, comb_op)
for i in aggr_ordItems.collect(): print(i)

(2, ('Jimmy', 250))
(4, ('Tina', 300))
(7, ('Joseph', 300))


In [0]:
#sum up all revenue and number of records for each order.
ordItems=sc.parallelize([
(2,"Joseph",200), (2,"Jimmy",250), (2,"Tina",130), (4,"Jimmy",50), (4,"Tina",300),
(4,"Joseph",150), (4,"Ram",200), (7,"Tina",200), (7,"Joseph",300), (7,"Jimmy",80)],2)
#Create a Paired RDD
ordPair = ordItems.map(lambda x : (x[0],(x[1],x[2])))
#Initialize Accumulator
# Zero Value: Zero value in our case will be 0 as we are finding Maximum Marks
zero_val=(0,0)
#Define Sequence Operation
# Sequence operation : Sum up all revenue and number of records per partition.
def seq_op(accumulator, element):
    return (accumulator[0] + element[1], accumulator[1] + 1)
#Define Combiner Operation
#Combiner Operation : Sum up all revenue and number of records for all partition.
def comb_op(accumulator1, accumulator2):
    return (accumulator1[0] + accumulator2[0], accumulator1[1] + accumulator2[1])
aggr_ordItems = ordPair.aggregateByKey(zero_val, seq_op, comb_op)
for i in aggr_ordItems.collect(): print(i)
#s

(2, (580, 3))
(4, (700, 4))
(7, (580, 3))


In [0]:
#max revenue and number of records for each order.
ordItems=sc.parallelize([
(2,"Joseph",200), (2,"Jimmy",250), (2,"Tina",130), (4,"Jimmy",50), (4,"Tina",300),
(4,"Joseph",150), (4,"Ram",200), (7,"Tina",200), (7,"Joseph",300), (7,"Jimmy",80)],2)
#Create a Paired RDD
ordPair = ordItems.map(lambda x : (x[0],(x[1],x[2])))
#Initialize Accumulator
# Zero Value: Zero value in our case will be 0 as we are finding Maximum Marks
zero_val=(0,0)
#Define Sequence Operation
# Sequence operation : Sum up all revenue and number of records per partition.
def seq_op(accumulator, element):
    if(accumulator[0]>element[1]):
        return (accumulator[0] , accumulator[1] + 1)
    else:
        return (element[1] , accumulator[1] + 1)
#Define Combiner Operation
#Combiner Operation : Sum up all revenue and number of records for all partition.
def comb_op(accumulator1, accumulator2):
    if(accumulator1[0] > accumulator2[0]):
        return (accumulator1[0], accumulator1[1] + accumulator2[1])
    else:
        return (accumulator2[0], accumulator1[1] + accumulator2[1])
aggr_ordItems = ordPair.aggregateByKey(zero_val, seq_op, comb_op)
for i in aggr_ordItems.collect(): print(i)

(2, (250, 3))
(4, (300, 4))
(7, (300, 3))


#countByKey()
#######Only available on RDDs of type (K, V). Returns a (K, Int) pairs with the count of each key. 
######• Returns a Collection Dictionary.FOR OUTPUT WE NEED TO USE items(),keys(),values()
######• No shuffle.

In [0]:
ordPair = ord.map(lambda x : (x.split(',')[3],1))
countByStatus = ordPair.countByKey()
print('##ITEMS##')
for i in countByStatus.items() : print(i)
print('##KEYS##')
for i in countByStatus.keys() : print(i)
print('##VALUES##')
for i in countByStatus.values() : print(i)

##ITEMS##
('CLOSED', 7556)
('PENDING_PAYMENT', 15030)
('COMPLETE', 22899)
('PROCESSING', 8275)
('PAYMENT_REVIEW', 729)
('PENDING', 7610)
('ON_HOLD', 3798)
('CANCELED', 1428)
('SUSPECTED_FRAUD', 1558)
##KEYS##
CLOSED
PENDING_PAYMENT
COMPLETE
PROCESSING
PAYMENT_REVIEW
PENDING
ON_HOLD
CANCELED
SUSPECTED_FRAUD
##VALUES##
7556
15030
22899
8275
729
7610
3798
1428
1558


###sortByKey(ascending=true,numPartition=None,keyfun=<function<lambda>>)

In [0]:
#Sort orders using customer id
ordPair = ord.map(lambda x : (int(x.split(',')[2]),x))
ordSort = ordPair.sortByKey(ascending=False)
for i in ordSort.take(10) : print(i)


(12435, '41643,2014-04-08 00:00:00.0,12435,PENDING')
(12435, '61629,2013-12-21 00:00:00.0,12435,CANCELED')
(12434, '1868,2013-08-03 00:00:00.0,12434,CLOSED')
(12434, '4799,2013-08-23 00:00:00.0,12434,PENDING_PAYMENT')
(12434, '5303,2013-08-26 00:00:00.0,12434,PENDING')
(12434, '6160,2013-09-02 00:00:00.0,12434,COMPLETE')
(12434, '13544,2013-10-16 00:00:00.0,12434,PENDING')
(12434, '42915,2014-04-16 00:00:00.0,12434,COMPLETE')
(12434, '51800,2014-06-14 00:00:00.0,12434,ON_HOLD')
(12434, '61777,2013-12-26 00:00:00.0,12434,COMPLETE')


In [0]:
#Sort orders using customer and status
ordPair = ord.map(lambda x : ((int(x.split(',')[2]), x.split(',')[3]),x))
ordSort = ordPair.sortByKey(ascending=True)
for i in ordSort.take(10) : print(i)

((1, 'COMPLETE'), '22945,2013-12-13 00:00:00.0,1,COMPLETE')
((2, 'COMPLETE'), '33865,2014-02-18 00:00:00.0,2,COMPLETE')
((2, 'COMPLETE'), '67863,2013-11-30 00:00:00.0,2,COMPLETE')
((2, 'ON_HOLD'), '57963,2013-08-02 00:00:00.0,2,ON_HOLD')
((2, 'PENDING_PAYMENT'), '15192,2013-10-29 00:00:00.0,2,PENDING_PAYMENT')
((3, 'COMPLETE'), '22646,2013-12-11 00:00:00.0,3,COMPLETE')
((3, 'COMPLETE'), '23662,2013-12-19 00:00:00.0,3,COMPLETE')
((3, 'COMPLETE'), '35158,2014-02-26 00:00:00.0,3,COMPLETE')
((3, 'COMPLETE'), '57617,2014-07-24 00:00:00.0,3,COMPLETE')
((3, 'COMPLETE'), '61453,2013-12-14 00:00:00.0,3,COMPLETE')


###Global Ranking or Ranking per Group
######Global Ranking :
######• sortByKey and take
######• takeOrdered or top

In [0]:
global_ranking=order_items.map(lambda x:(x.split(',')[4],x.split(',')[2])).sortByKey(ascending=False)
for i in global_ranking.take(10):print(i)

('999.99', '60')
('999.99', '60')
('999.99', '60')
('999.99', '60')
('999.99', '60')
('999.99', '60')
('999.99', '60')
('999.99', '60')
('999.99', '60')
('999.99', '60')


In [0]:
global_ranking=order_items.map(lambda x:(x.split(',')[4],x.split(',')[2])).takeOrdered(2,key= lambda x:-float(x[0]))
for i in global_ranking:print(i)


('1999.99', '208')
('1999.99', '208')


In [0]:
filter_ranking=order_items.map(lambda x:(x.split(',')[4],int(x.split(',')[2]))).filter(lambda x:x[1]==208)
for i in filter_ranking.take(2):print (i)

('1999.99', 208)
('1999.99', 208)


- `sortByKey`():
- This function sorts the entire RDD based on the keys and then returns the sorted RDD.
- When you use take(5) on the sorted RDD, it retrieves the first 5 elements from the globally sorted RDD.
- `takeOrdered`():
- This function retrieves the top N elements directly from the RDD based on the specified key function.
- It does not sort the entire RDD but uses a more efficient algorithm to find the top N elements.

In [0]:
# Sample data
order_items_sample = sc.parallelize([
    "1,product1,category1,10.0,3",
    "2,product2,category2,230.0,1",
    "3,product3,category3,440.0,2",
    "4,product4,category4,550.0,2",
    "5,product5,category5,600.0,2",
    "6,product6,category6,7650.0,2"
])

# Using sortByKey
global_ranking_sort = order_items_sample.map(lambda x: (x.split(',')[3], x.split(',')[1])).sortByKey(ascending=False)
for i in global_ranking_sort.take(5):
    print(i)


('7650.0', 'product6')
('600.0', 'product5')
('550.0', 'product4')
('440.0', 'product3')
('230.0', 'product2')


In [0]:
# Using takeOrdered
global_ranking_take = order_items_sample.map(lambda x: (x.split(',')[3], x.split(',')[1])).takeOrdered(5, key=lambda x: -float(x[0]))
for i in global_ranking_take:
    print(i)

('7650.0', 'product6')
('600.0', 'product5')
('550.0', 'product4')
('440.0', 'product3')
('230.0', 'product2')


#Ranking by group

#union(),intersection(),
#subtract(other,no of partition=none),
#distinct(no of partition=none)

In [0]:
july_order=ord.filter(lambda x: x.split(',')[1].split('-')[1]=='07').map(lambda x:x.split(',')[2])
for i in july_order.take(5):print (i)
print(f'july_order_customers:{july_order.count()}')


aug_order=ord.filter(lambda x:x.split(',')[1].split('-')[1]=='08').map(lambda x:x.split(',')[2])
for i in aug_order.take(5):print(i)
print(f'august_order_customers :{aug_order.count()}')

11599
256
12111
8827
11318
july_order_customers:6001
11607
5105
7802
553
1604
august_order_customers :5680


In [0]:
july_aug_order=july_order.union(aug_order)#union will take duplicates also
print(f"union output count without distinct:{july_aug_order.count()}")
print(f"union output count without distinct:{july_aug_order.distinct().count()}")

union output count without distinct:11681
union output count without distinct:7633


In [0]:
july_aug_order=july_order.intersection(aug_order)#intersection will take duplicates out by applying distinct default
print(f"intersection output count :{july_aug_order.count()}")

intersection output count :1759


In [0]:
ex1=sc.parallelize([1,2,3,3,3])
ex2=sc.parallelize([1,3,5])
print(f"union without distinct:{ex1.union(ex2).collect()}")
print(f"union with distinct:{ex1.union(ex2).distinct().collect()}")
print(f"intersection:{ex1.intersection(ex2).collect()}")
print(f"subtract:{ex1.subtract(ex2).collect()}")
print(f"subtract:{ex2.subtract(ex1).collect()}")

union without distinct:[1, 2, 3, 3, 3, 1, 3, 5]
union with distinct:[1, 2, 3, 5]
intersection:[1, 3]
subtract:[2]
subtract:[5]


###sample(withReplacement=False,fraction,seed=none)-->transformation
####fraction value between 0 to 1
####0.1--10%,0.7-->70%
####takesample(withReplacement=False,num,seed=none)-->action

In [0]:
list1=sc.parallelize(range(100),4)
for i in list1.take(5):print(i)
output1=list1.sample(withReplacement=False,fraction=0.1,seed=30)
print(output1.take(5))
output1=list1.sample(withReplacement=True,fraction=0.1,seed=30)#value will repeat. if withReplacement is true
print(output1.take(5))

0
1
2
3
4
[0, 3, 17, 33, 34]
[2, 3, 3, 17, 18]


In [0]:
output2=list1.takeSample(withReplacement=True,num=10,seed=30)#duplicate value in output
print(output2)
output2=list1.takeSample(withReplacement=False,num=10,seed=30)#no duplicate values in output
print(output2)

[94, 41, 81, 87, 58, 41, 86, 56, 93, 92]
[92, 72, 24, 65, 42, 69, 53, 97, 18, 68]


##Repartition and coalesce

In [0]:
## Create some Dump data for testing
df = spark.range(1000000)
df = df.select(df.id,df.id*2,df.id*3)
df = df.union(df)
df = df.union(df)
df = df.union(df)
df = df.union(df)
df = df.union(df)
### Convert DataFrame to RDD. 
RDD = df.rdd.map(lambda x : str(x[0]) + ',' + str(x[1]) + ',' + str(x[2]))
### Save the file at a DBFS Path
RDD.coalesce(1).saveAsTextFile('/dbfs/user/table/test_data')


In [0]:
RDD=sc.textFile('/dbfs/user/table/test_data')
for i in RDD.take(5):
    print(i)
print(f"RDD count:{RDD.count()}")   
print(f'RDD no of partaition:{RDD.getNumPartitions()}')

rdd_filter=RDD.filter(lambda x: x.split(',')[0]=='1')
print(f"rdd_filter count:{rdd_filter.count()}")
print(f'rdd_filter no of partaition:{rdd_filter.getNumPartitions()}')

rdd_filter=rdd_filter.coalesce(1)
print(f'rdd_filter no of partaition:{rdd_filter.getNumPartitions()}')

0,0,0
1,2,3
2,4,6
3,6,9
4,8,12
RDD count:32000000
RDD no of partaition:3
rdd_filter count:32
rdd_filter no of partaition:3
rdd_filter no of partaition:1


In [0]:
#repartition
print(ord.getNumPartitions())
print(ord.glom().map(len).collect())
re_partiton_ord=ord.repartition(5)
print(re_partiton_ord.getNumPartitions())
print(re_partiton_ord.glom().map(len).collect())

2
[34564, 34319]
5
[13783, 13770, 13770, 13770, 13790]


In [0]:
#coalesce
re_partiton_ord.coalesce(10)#coalesce do not increase shuffling but it can decrease shuffling and it will not distribute uniform no of records also
print(re_partiton_ord.getNumPartitions())
coal_partiton_ord=re_partiton_ord.coalesce(10,shuffle=True)#It will work smilar to repartion
print(coal_partiton_ord.getNumPartitions())

5
10


###repartitionAndSortWithinPartitions(no of partition=none,function,ascending=False)

In [0]:
rdd = sc.parallelize(((9, ('a','z')), (3, ('x','f')), (6, ('j','b')), (4, ('a','b')), (8, ('s','b')), (1, ('a','b'))),2)
rdd2 = rdd.repartitionAndSortWithinPartitions(2, lambda x : x % 2, True)
rdd2.glom().collect()

[[(4, ('a', 'b')), (6, ('j', 'b')), (8, ('s', 'b'))],
 [(1, ('a', 'b')), (3, ('x', 'f')), (9, ('a', 'z'))]]

###persist() -->used to store intermediate result while processing
####storagLevel(disk,memory,offheap,deseralization,replication=1) -->disk,memory
##### if we try to change storage level for persist once persist is done it will through a error.So first we need to unprisest and we need to apply new storage level
###unpersist()

In [0]:
from pyspark import StorageLevel
df = spark.range(10)
print(df.rdd.is_cached)
df.rdd.persist()#using persist which take default memory_only
print(df.rdd.is_cached)
df.rdd.getStorageLevel()#checking type of storage


com.databricks.backend.common.rpc.CommandCancelledException
	at com.databricks.spark.chauffeur.ExecContextState.cancel(ExecContextState.scala:444)
	at com.databricks.spark.chauffeur.ChauffeurState.cancelExecution(ChauffeurState.scala:1268)
	at com.databricks.spark.chauffeur.ChauffeurState.$anonfun$process$1(ChauffeurState.scala:985)
	at com.databricks.logging.UsageLogging.$anonfun$recordOperation$1(UsageLogging.scala:573)
	at com.databricks.logging.UsageLogging.executeThunkAndCaptureResultTags$1(UsageLogging.scala:669)
	at com.databricks.logging.UsageLogging.$anonfun$recordOperationWithResultTags$4(UsageLogging.scala:687)
	at com.databricks.logging.UsageLogging.$anonfun$withAttributionContext$1(UsageLogging.scala:426)
	at scala.util.DynamicVariable.withValue(DynamicVariable.scala:62)
	at com.databricks.logging.AttributionContext$.withValue(AttributionContext.scala:216)
	at com.databricks.logging.UsageLogging.withAttributionContext(UsageLogging.scala:424)
	at com.databricks.logging.Usag

In [0]:
df.rdd.unpersist().getStorageLevel()#unpersist and knowning stoagrlevel to apply new storage level
df.rdd.persist(StorageLevel.MEMORY_AND_DISK_2). getStorageLevel()

com.databricks.backend.common.rpc.CommandCancelledException
	at com.databricks.spark.chauffeur.ExecContextState.cancel(ExecContextState.scala:444)
	at com.databricks.spark.chauffeur.ChauffeurState.cancelExecution(ChauffeurState.scala:1268)
	at com.databricks.spark.chauffeur.ChauffeurState.$anonfun$process$1(ChauffeurState.scala:985)
	at com.databricks.logging.UsageLogging.$anonfun$recordOperation$1(UsageLogging.scala:573)
	at com.databricks.logging.UsageLogging.executeThunkAndCaptureResultTags$1(UsageLogging.scala:669)
	at com.databricks.logging.UsageLogging.$anonfun$recordOperationWithResultTags$4(UsageLogging.scala:687)
	at com.databricks.logging.UsageLogging.$anonfun$withAttributionContext$1(UsageLogging.scala:426)
	at scala.util.DynamicVariable.withValue(DynamicVariable.scala:62)
	at com.databricks.logging.AttributionContext$.withValue(AttributionContext.scala:216)
	at com.databricks.logging.UsageLogging.withAttributionContext(UsageLogging.scala:424)
	at com.databricks.logging.Usag

In [0]:
from pyspark.sql import Row
person=Row('name','age')
person1=person('Alice',10)
person1.name

com.databricks.backend.common.rpc.CommandCancelledException
	at com.databricks.spark.chauffeur.ExecContextState.cancel(ExecContextState.scala:444)
	at com.databricks.spark.chauffeur.ChauffeurState.cancelExecution(ChauffeurState.scala:1268)
	at com.databricks.spark.chauffeur.ChauffeurState.$anonfun$process$1(ChauffeurState.scala:985)
	at com.databricks.logging.UsageLogging.$anonfun$recordOperation$1(UsageLogging.scala:573)
	at com.databricks.logging.UsageLogging.executeThunkAndCaptureResultTags$1(UsageLogging.scala:669)
	at com.databricks.logging.UsageLogging.$anonfun$recordOperationWithResultTags$4(UsageLogging.scala:687)
	at com.databricks.logging.UsageLogging.$anonfun$withAttributionContext$1(UsageLogging.scala:426)
	at scala.util.DynamicVariable.withValue(DynamicVariable.scala:62)
	at com.databricks.logging.AttributionContext$.withValue(AttributionContext.scala:216)
	at com.databricks.logging.UsageLogging.withAttributionContext(UsageLogging.scala:424)
	at com.databricks.logging.Usag

##User define functions(UDF)
####@udf(returnType=)
####udf() function
####spark.udf.register()-->sql


In [0]:
import string
from pyspark.sql import Row
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType,IntegerType

#one way of registering udf using @udf(returnType) -->decorator 
@udf(returnType=StringType())
def initCap(str):
    finalStr=""
    ar = str.split(" ")
    for word in ar:
        finalStr= finalStr + word[0:1].upper() + word[1:len(word)] + " " 
    return finalStr.strip()

#DataFrame:
emp=Row('emp_id','emp_name')
df=spark.createDataFrame([emp(1,'alice jony'),emp(2,'bob smith')],schema='emp_id int,emp_name string')
#usinf udf function
df.select(df.emp_name, initCap(df.emp_name).alias('new_name')).show()

#Spark Sql:
#by registering udf for sql use case
spark.udf.register("initcap1", initCap)
spark.sql('use default')
df.createOrReplaceTempView('emp')#create tempview from df
spark.sql(""" select emp_name, initcap1(emp_name) new_name from emp """).show()


+----------+----------+
|  emp_name|  new_name|
+----------+----------+
|alice jony|Alice Jony|
| bob smith| Bob Smith|
+----------+----------+

+----------+----------+
|  emp_name|  new_name|
+----------+----------+
|alice jony|Alice Jony|
| bob smith| Bob Smith|
+----------+----------+



In [0]:
# second way of registering upython  function  as udf using udf()
import string
from pyspark.sql import Row
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType,IntegerType
#python function
def initCap(str):
    finalStr=""
    ar = str.split(" ")
    for word in ar:
        finalStr= finalStr + word[0:1].upper() + word[1:len(word)] + " " 
    return finalStr.strip()




In [0]:
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType,IntegerType
#registering udf(python_method_name,return_type)
initcap_method=udf(initCap,StringType())
df.select(df.emp_name, initcap_method(df.emp_name).alias('new_name')).show()

#Spark Sql:
#by registering udf for sql use case
spark.udf.register("initcap1", initCap)
spark.sql('use default')
df.createOrReplaceTempView('emp')#create tempview from df
spark.sql(""" select emp_name, initcap1(emp_name) new_name from emp """).show()

+----------+----------+
|  emp_name|  new_name|
+----------+----------+
|alice jony|Alice Jony|
| bob smith| Bob Smith|
+----------+----------+

+----------+----------+
|  emp_name|  new_name|
+----------+----------+
|alice jony|Alice Jony|
| bob smith| Bob Smith|
+----------+----------+



In [0]:
#using Python Lambda Function and Use it in Spark Sql
from pyspark.sql.types import IntegerType
from pyspark.sql.functions import udf
slen = udf(lambda s: len(s), IntegerType())

#dataframe
df.select(df.emp_name, slen(df.emp_name).alias('length')).show()

#spark sql
spark.udf.register("slen", slen)
spark.sql("SELECT slen('test')").collect()

+----------+------+
|  emp_name|length|
+----------+------+
|alice jony|    10|
| bob smith|     9|
+----------+------+



[Row(slen(test)=4)]

###types of file reads

In [0]:
help(spark.read.csv)

Help on method csv in module pyspark.sql.readwriter:

csv(path: Union[str, List[str]], schema: Union[pyspark.sql.types.StructType, str, NoneType] = None, sep: Optional[str] = None, encoding: Optional[str] = None, quote: Optional[str] = None, escape: Optional[str] = None, comment: Optional[str] = None, header: Union[bool, str, NoneType] = None, inferSchema: Union[bool, str, NoneType] = None, ignoreLeadingWhiteSpace: Union[bool, str, NoneType] = None, ignoreTrailingWhiteSpace: Union[bool, str, NoneType] = None, nullValue: Optional[str] = None, nanValue: Optional[str] = None, positiveInf: Optional[str] = None, negativeInf: Optional[str] = None, dateFormat: Optional[str] = None, timestampFormat: Optional[str] = None, maxColumns: Union[str, int, NoneType] = None, maxCharsPerColumn: Union[str, int, NoneType] = None, maxMalformedLogPerPartition: Union[str, int, NoneType] = None, mode: Optional[str] = None, columnNameOfCorruptRecord: Optional[str] = None, multiLine: Union[bool, str, NoneType] 

###By using catalog we can  deal with HiveMetastore
####Database Functions:
- currentDatabase
- listDatabases
- setCurrentDatabase 

In [0]:
spark.catalog.currentDatabase()
#creat database
spark.sql('create database if not exists test')

#Setcurrentdatabase
spark.catalog.setCurrentDatabase('test')
print(spark.catalog.currentDatabase())
spark.catalog.setCurrentDatabase('default')
print(spark.catalog.currentDatabase())

test
default


In [0]:
#It will list all database so to find specifi database wether is present are not
spark.catalog.listDatabases()
print(type(spark.catalog.listDatabases()))
print(type(spark.catalog.listDatabases()[0]))
print(spark.catalog.listDatabases()[0])
print(spark.catalog.listDatabases()[0].name)#default

print('test' in [ i .name for i in spark.catalog.listDatabases()])#true 

[ print(f'{i.name} is present ')  for i in spark.catalog.listDatabases() if i.name=='test']

[ print(f'{i.name} is present ') if i.name=='test' else print(f'{i.name} is present not test ')  for i in spark.catalog.listDatabases() ]

<class 'list'>
<class 'pyspark.sql.catalog.Database'>
Database(name='default', catalog='spark_catalog', description='Default Hive database', locationUri='dbfs:/user/hive/warehouse')
default
True
test is present 
default is present not test 
test is present 


[None, None]

####Table Functions:
- listColumns
- listTables
- cacheTable
- isCached
- uncacheTable
- clearCache
- recoverPartitions
- refreshTable
- refreshByPath


In [0]:
help(spark.catalog.listColumns)
spark.catalog.listColumns('manage_productdetails','default')

Help on method listColumns in module pyspark.sql.catalog:

listColumns(tableName: str, dbName: Optional[str] = None) -> List[pyspark.sql.catalog.Column] method of pyspark.sql.catalog.Catalog instance
    Returns a list of columns for the given table/view in the specified database.
    
    .. versionadded:: 2.0.0
    
    Parameters
    ----------
    tableName : str
        name of the table to list columns.
    
        .. versionchanged:: 3.4.0
           Allow ``tableName`` to be qualified with catalog name when ``dbName`` is None.
    
    dbName : str, optional
        name of the database to find the table to list columns.
    
    Returns
    -------
    list
        A list of :class:`Column`.
    
    Notes
    -----
    The order of arguments here is different from that of its JVM counterpart
    because Python does not support method overloading.
    
    If no database is specified, the current database and catalog
    are used. This API includes all temporary views.
    
 



[Column(name='ProductID', description=None, dataType='string', nullable=True, isPartition=False, isBucket=False, isCluster=False),
 Column(name='Name', description=None, dataType='string', nullable=True, isPartition=False, isBucket=False, isCluster=False),
 Column(name='ProductNumber', description=None, dataType='string', nullable=True, isPartition=False, isBucket=False, isCluster=False),
 Column(name='Color', description=None, dataType='string', nullable=True, isPartition=False, isBucket=False, isCluster=False),
 Column(name='StandardCost', description=None, dataType='string', nullable=True, isPartition=False, isBucket=False, isCluster=False),
 Column(name='ListPrice', description=None, dataType='string', nullable=True, isPartition=False, isBucket=False, isCluster=False),
 Column(name='Size', description=None, dataType='string', nullable=True, isPartition=False, isBucket=False, isCluster=False),
 Column(name='Weight', description=None, dataType='string', nullable=True, isPartition=Fal

In [0]:
help(spark.catalog.listTables)
print(spark.catalog.listTables('default',pattern='manage_*'),end='\n')
print(spark.catalog.listTables('default',pattern='emp'))


Help on method listTables in module pyspark.sql.catalog:

listTables(dbName: Optional[str] = None, pattern: Optional[str] = None) -> List[pyspark.sql.catalog.Table] method of pyspark.sql.catalog.Catalog instance
    Returns a list of tables/views in the specified database.
    
    .. versionadded:: 2.0.0
    
    Parameters
    ----------
    dbName : str, optional
        name of the database to list the tables.
    
        .. versionchanged:: 3.4.0
           Allow ``dbName`` to be qualified with catalog name.
    
    pattern : str, optional
        The pattern that the database name needs to match.
    
        .. versionadded: 3.5.0
    
    Returns
    -------
    list
        A list of :class:`Table`.
    
    Notes
    -----
    If no database is specified, the current database and catalog
    are used. This API includes all temporary views.
    
    Examples
    --------
    >>> spark.range(1).createTempView("test_view")
    >>> spark.catalog.listTables()
    [Table(name='te

In [0]:
spark.catalog.uncacheTable('emp')
print(spark.catalog.isCached('emp'))
spark.catalog.cacheTable('emp')
print(spark.catalog.isCached('emp'))

False
True


####View Functions:
- dropGlobalTempView
- dropTempView
####Function based functions:
- listFunctions
- registerFunction (= spark.udf.register)


In [0]:
spark.catalog.dropTempView('emp')
print(spark.catalog.listTables('default',pattern='emp'))

[]


In [0]:
spark.catalog.listFunctions()#list all functions you created udf function and recently used inbuilt functions
'slen' in [i.name for i in spark.catalog.listFunctions()]


True

##creating new spark session
#####every spark has separate SQLConf, registered temporary views and UDFs, but shared SparkContext and table cache(i.e, hivemetastore tables can acess in both sessions).

In [0]:
spark
import string
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType,IntegerType
@udf(returnType=StringType())
def initCap(str): 
    finalStr=""
    ar = str.split(" ") 
    for word in ar:
        finalStr= finalStr + word[0:1].upper() + word[1:len(word)] + " "
    return string.strip(finalStr)
spark.udf.register("initcap1", initCap)
spark.sql("""select initcap1('emp name') """).show()

+-----------------+
|initcap(emp name)|
+-----------------+
|         Emp Name|
+-----------------+



In [0]:
#error because of calling udf defined in another spark session in new sparksession is not possible
#The error is occurring because you are trying to call a UDF (initcap1) that was defined in a different Spark session (spark) in a new Spark session (new_spark).
new_spark=spark.newSession()
new_spark
new_spark.sql("""select initcap1('emp name') """).show()
new_spark.stop()#it will stop new_spark section

[0;31m---------------------------------------------------------------------------[0m
[0;31mAnalysisException[0m                         Traceback (most recent call last)
File [0;32m<command-4347346809463467>, line 4[0m
[1;32m      2[0m new_spark[38;5;241m=[39mspark[38;5;241m.[39mnewSession()
[1;32m      3[0m new_spark
[0;32m----> 4[0m new_spark[38;5;241m.[39msql([38;5;124m"""[39m[38;5;124mselect initcap1([39m[38;5;124m'[39m[38;5;124memp name[39m[38;5;124m'[39m[38;5;124m) [39m[38;5;124m"""[39m)[38;5;241m.[39mshow()
[1;32m      5[0m new_spark[38;5;241m.[39mstop()

File [0;32m/databricks/spark/python/pyspark/instrumentation_utils.py:47[0m, in [0;36m_wrap_function.<locals>.wrapper[0;34m(*args, **kwargs)[0m
[1;32m     45[0m start [38;5;241m=[39m time[38;5;241m.[39mperf_counter()
[1;32m     46[0m [38;5;28;01mtry[39;00m:
[0;32m---> 47[0m     res [38;5;241m=[39m [43mfunc[49m[43m([49m[38;5;241;43m*[39;49m[43margs[49m[43m,[49m[4