#### RDD (Resilient Distributed Dataset)
##### Terminologies
* RDD stands for Resilient Distributed Dataset, these are the elements that run and operate on multiple nodes to do parallel processing on a cluster.

##### RDDs are...

* immutable
* fault tolerant / automatic recovery
* can apply multiple ops on RDDs
##### RDD operation are...

* Transformation
* Action
##### Basic Operations (Ops)
* count(): Number of elements in the RDD is returned.
* collect(): All the elements in the RDD are returned.
* foreach(f): input callable, and returns only those elements which meet the condition of the function inside foreach.
* filter(f): input callable, and returns new RDDs containing the elements which satisfy the given callable
* map(f, preservesPartitioning = False): A new RDD is returned by applying a function to each element in the RDD
* reduce(f): After performing the specified commutative and associative binary operation, the element in the RDD is returned.
* join(other, numPartitions = None): It returns RDD with a pair of elements with the matching keys and all the values for that particular key.
* cache(): Persist this RDD with the default storage level (MEMORY_ONLY). You can also check if the RDD is cached or not


##### Narrow transformations
* map()
* filter()
* flatMap(j
* distinct()
##### Wide (Broad) transformations
* reduce()
* groupby()
* sortBy()
* join()
##### Actions
* count()
* take()
* takeOrdered()
* top()
* collect()
* saveAsTextFile()
* first()
* reduce()
* fold()
* aggregate()
* foreach()


##### Dictionary functions
* keys()
* values()
* keyBy()
##### Functional transformations
* mapValues()
* flatMapValues()
##### Grouping, sorting and aggregation
* groupByKey()
* reduceByKey()
* foldByKey()
* sortByKey()
##### Joins
* join()
* leftOuterJoin()
* rightOuterJoin()
* fullOuterJoin()
* cogroup()
* cartesian()
##### Set operations
* union()
* intersection()
* subtract()
* subtractByKey()

##### Numeric RDD
* min()
* max()
* sum()
* mean()
* stdev()
* variance()

#### Lambda Function
* A lambda function is a small anonymous function.
* A lambda function can take any number of arguments, but can only have one expression.

In [0]:
def my_sum(a,b):
  
  return a+b
  

In [0]:
my_sum(10,20)

Out[2]: 30

In [0]:
%%time
a=55

CPU times: user 12 µs, sys: 1 µs, total: 13 µs
Wall time: 17.2 µs


In [0]:
print(a)

55


In [0]:
sc.defaultParallelism

Out[7]: 8

In [0]:
def my_func(a,b):
  return a+b

In [0]:
a=55
b=77
my_func(a,b)

Out[9]: 132

In [0]:
x = lambda a : a + 10
print(x(3))
print(x(20))

13
30


In [0]:

x = lambda a, b : a + b
print(x(5, 6))
print(x(2, 50))

11
52


In [0]:
list_a=[1,2,3,4,5,6,7,8]
for i in list_a:
	if i<4:
		print(i)

1
2
3


In [0]:
my_list = [1,2,3,4,5,6,7,8,8,34,3,34,34,343,5656]
my_list

Out[13]: [1, 2, 3, 4, 5, 6, 7, 8, 8, 34, 3, 34, 34, 343, 5656]

In [0]:
my_rdd = sc.parallelize([1,2,3,4,5,6,7,8,9,23,23,232323,2323])
my_rdd.collect()

Out[14]: [1, 2, 3, 4, 5, 6, 7, 8, 9, 23, 23, 232323, 2323]

In [0]:
x = sc.parallelize([1,2,3,4,5,6,7,8],4)
new_rdd = x.filter(lambda x: x<4)

In [0]:
x.glom().collect()

Out[17]: [[1, 2], [3, 4], [5, 6], [7, 8]]

In [0]:
new_rdd.count()

Out[18]: 3

In [0]:
type(new_rdd)

Out[20]: pyspark.rdd.PipelinedRDD

In [0]:
type(new_rdd)

Out[21]: pyspark.rdd.PipelinedRDD

In [0]:
text_rdd = sc.textFile("dbfs:/databricks-datasets/SPARK_README.md")
text_rdd

Out[24]: dbfs:/databricks-datasets/SPARK_README.md MapPartitionsRDD[8] at textFile at NativeMethodAccessorImpl.java:0

In [0]:
type(text_rdd)

Out[25]: pyspark.rdd.RDD

In [0]:
%sql
select * from sample_db.emp where deptno=10

[0;31m---------------------------------------------------------------------------[0m
[0;31mAnalysisException[0m                         Traceback (most recent call last)
File [0;32m<command-3737989515766111>:7[0m
[1;32m      5[0m     display(df)
[1;32m      6[0m     [38;5;28;01mreturn[39;00m df
[0;32m----> 7[0m   _sqldf [38;5;241m=[39m [43m____databricks_percent_sql[49m[43m([49m[43m)[49m
[1;32m      8[0m [38;5;28;01mfinally[39;00m:
[1;32m      9[0m   [38;5;28;01mdel[39;00m ____databricks_percent_sql

File [0;32m<command-3737989515766111>:4[0m, in [0;36m____databricks_percent_sql[0;34m()[0m
[1;32m      2[0m [38;5;28;01mdef[39;00m [38;5;21m____databricks_percent_sql[39m():
[1;32m      3[0m   [38;5;28;01mimport[39;00m [38;5;21;01mbase64[39;00m
[0;32m----> 4[0m   df [38;5;241m=[39m [43mspark[49m[38;5;241;43m.[39;49m[43msql[49m[43m([49m[43mbase64[49m[38;5;241;43m.[39;49m[43mstandard_b64decode[49m[43m([49m[38;5;124;43m"[39;

#### What is sparkContext
* A SparkContext represents the connection to a Spark cluster, 
* and can be used to create RDDs, accumulators and broadcast variables on that cluster
* Note: Only one SparkContext should be active per JVM. You must stop() the active SparkContext before creating a new one. 
* param: config a Spark Config object describing the application configuration. 
* Any settings in this config overrides the default configs as well as system properties.

#### Creating RDD using SparkContext Parallelize Method.
* Creating list [1,2,3,4,5,6,7,8,9,10]
* The sc.parallelize() method is the SparkContext's parallelize method to create a parallelized collection
* This allows Spark to distribute the data across multiple nodes, instead of depending on a single node to process the data

In [0]:
rdd = sc.parallelize([1,2,3,4,5,6,7,8],5)

In [0]:
range_rdd = sc.range(10)
range_rdd.collect()

Out[28]: [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]

#### Creating RDD using file.

In [0]:
textFile_rdd = sc.textFile("/databricks-datasets/samples/docs/README.md")
textFile_rdd.count()

Out[29]: 65

#### Creating RDD using SparkContext 
* Applying MAP Transformation in RDD.
* MAP(func)
* Return a new distributed dataset formed by passing each element of the source through a function func.

In [0]:
x=sc.parallelize([1,2,3,4,5,6,7])
y=x.map(lambda a: a+10 )
print(x.collect())
print(y.collect())

[1, 2, 3, 4, 5, 6, 7]
[11, 12, 13, 14, 15, 16, 17]


In [0]:
print("X RDD VAlues :",x.collect())
print("Y RDD Values after applying map and lambda transformation",y.collect())

X RDD VAlues : [1, 2, 3, 4, 5, 6, 7]
Y RDD Values after applying map and lambda transformation [11, 12, 13, 14, 15, 16, 17]


In [0]:
x_rdd=sc.parallelize([1,2,3,4,5,6,7])
y_odd_rdd=x_rdd.filter(lambda z: z%2==1 )
print(x_rdd.collect())
print(y_odd_rdd.collect())

[1, 2, 3, 4, 5, 6, 7]
[1, 3, 5, 7]


In [0]:
y_odd_rdd.collect()

Out[34]: [1, 3, 5, 7]

In [0]:
print('RDD X values : ',x.collect())
print('RDD Y VALUES : ',y.collect())

RDD X values :  [1, 2, 3, 4, 5, 6, 7]
RDD Y VALUES :  [11, 12, 13, 14, 15, 16, 17]


#### filter(func) Transformation
* Return a new dataset formed by selecting those elements of the source on which func returns true.

In [0]:
x = sc.parallelize([1,2,3,4,5,6,7,8,9,10])
y = x.filter(lambda x: x%2 == 1) #keep odd values
print('RDD X Values Before Filter : ',x.collect())
print('RDD Y Values After apllying Filter in X RDD : ',y.collect())

RDD X Values Before Filter :  [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
RDD Y Values After apllying Filter in X RDD :  [1, 3, 5, 7, 9]


#### flatMap(func)  Transformation
* Similar to map, but each input item can be mapped to 0 or more output items 
* (so func should return a Seq rather than a single item).

In [0]:
x.flatMap(lambda x: (x,x+55,x*400)).collect()

Out[38]: [1,
 56,
 400,
 2,
 57,
 800,
 3,
 58,
 1200,
 4,
 59,
 1600,
 5,
 60,
 2000,
 6,
 61,
 2400,
 7,
 62,
 2800,
 8,
 63,
 3200,
 9,
 64,
 3600,
 10,
 65,
 4000]

In [0]:
x = sc.parallelize([1,2,3])
y_map = x.map(lambda x: (x, x*100))
z_flatmap = x.flatMap(lambda x: (x, x*100))
print(x.collect())
print(y_map.collect())
print(z_flatmap.collect())

[1, 2, 3]
[(1, 100), (2, 200), (3, 300)]
[1, 100, 2, 200, 3, 300]


In [0]:
#When using PySpark, the flatMap() function does the flattening for us.
data = [[1, 2, 3], [4, 5, 6], [7, 8, 9]]
sc.parallelize(data).flatMap(lambda x: x).collect()

Out[40]: [1, 2, 3, 4, 5, 6, 7, 8, 9]

#### groupBy(func) Transformation in RDD
* Group the data in the original RDD. Create pairs where the key is the output of
* a user function, and the value is all items for which the function yields this key

In [0]:
x = sc.parallelize(['John', 'Fred', 'Anna', 'James','Jan','Frend','Axe'])
y = x.groupBy(lambda w: w[0])
print('Before Group By',x.collect())
#print(y.collect())
print('After group by ',[(k,tuple(v)) for (k, v) in y.collect()])

Before Group By ['John', 'Fred', 'Anna', 'James', 'Jan', 'Frend', 'Axe']
After group by  [('J', ('John', 'James', 'Jan')), ('F', ('Fred', 'Frend')), ('A', ('Anna', 'Axe'))]


In [0]:
x = sc.parallelize(['Ravi', 'Sridhar', 'Prasad', 'Raj'])
y = x.groupBy(lambda w: w[0])
print(x.collect())
print([(k, list(v)) for (k, v) in y.collect()])

['Ravi', 'Sridhar', 'Prasad', 'Raj']
[('R', ['Ravi', 'Raj']), ('S', ['Sridhar']), ('P', ['Prasad'])]


#### What is iterable
* anything that can be looped over (i.e. you can loop over a string or file) or
* anything that can appear on the right-side of a for-loop: for x in iterable: ...

In [0]:
print(y.collect())

[('R', <pyspark.resultiterable.ResultIterable object at 0x7f138e5abbb0>), ('S', <pyspark.resultiterable.ResultIterable object at 0x7f138e5bb100>), ('P', <pyspark.resultiterable.ResultIterable object at 0x7f138e5bb1f0>)]


#### Sample Transformation
* Return a sampled subset of this RDD.

* __Parameters__
* withReplacement – can elements be sampled multiple times (replaced when sampled out)

* fraction – expected size of the sample as a fraction of this RDD’s size without replacement: probability that each element is chosen; fraction must be [0,    with replacement: expected number of times each element is chosen; fraction must be >= 0

* seed – seed for the random number generator

In [0]:
x = sc.parallelize([1,2,3,4,5,6,7])
x.take(3)

Out[44]: [1, 2, 3]

In [0]:
x = sc.parallelize([1,2,3,4,5,6,7,8,9,10])
y = x.sample(True,5,300)
print(x.collect())
print(y.collect())
print(y.count())

[1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
[1, 1, 1, 1, 1, 1, 1, 1, 2, 2, 2, 2, 2, 2, 3, 3, 3, 3, 3, 4, 4, 4, 4, 4, 5, 5, 5, 5, 5, 5, 5, 6, 6, 6, 6, 6, 6, 7, 7, 7, 7, 7, 7, 8, 8, 8, 8, 8, 8, 8, 9, 9, 9, 9, 9, 9, 9, 9, 9, 9, 9, 9, 9, 10, 10, 10, 10]
67


#### Union(DataSet) Transformation
* Return a new dataset that contains the union of the elements in the source dataset and the argument.
* glom() Return an RDD created by coalescing all elements within each partition into an array

In [0]:
#UNION ALL ( it will give all records including duplicates and its same as UNION IN PYSPARK)
#UNION (it will eliminate duplicates)

In [0]:
x = sc.parallelize([1,2,3,4,5])
y = sc.parallelize([3,4,5,6,7])
z = y.subtract(x)
print(z.collect())
#print(z.glom().collect())

[6, 7]


#### intersection(otherDataset)
* Return a new RDD that contains the intersection of elements in the source dataset and the argument.
* The output will not contain any duplicate elements, even if the input RDDs did.

In [0]:
x = sc.parallelize([1,2,3,3,4,5], 2)
y = sc.parallelize([3,4,4,5,6,7], 1)
z = x.intersection(y)
print(x.glom().collect())
print(y.glom().collect())
print(z.collect())
print(z.glom().collect())

[[1, 2, 3], [3, 4, 5]]
[[3, 4, 4, 5, 6, 7]]
[3, 4, 5]
[[3], [4], [5]]


In [0]:
x = sc.parallelize([1,2,3], 2)
y = sc.parallelize([3,4], 1)
z = x.intersection(y)
print(z.collect())
print(z.glom().collect())

[3]
[[3], [], []]


#### substract(otherdataset) Transformation
* It returns an RDD that has only value present in the first RDD and not in second RDD.
* its returns if first RDD is having any duplicates. its wont remove any duplicate

In [0]:
x = sc.parallelize([1,2,2,3,3,4,5,7,8], 2)
y = sc.parallelize([3,4,3,4,5,6], 1)
z = x.subtract(y)
print(z.collect())
print(z.glom().collect())

[1, 7, 2, 2, 8]
[[], [1, 7], [2, 2, 8]]


#### Cartesian Transformation
* Provides cartesian product of 2 RDDs
* like it will return new RDD multiplication 1st RDD each value Into 2nd RDD each Value...

In [0]:
x = sc.parallelize([1,2,3,4,5,6,7,8,9], 2)
y = sc.parallelize([3,4,5,6,7,8,9,10,11], 1)
z = x.cartesian(y)
print(z.collect())

[(1, 3), (1, 4), (1, 5), (1, 6), (1, 7), (1, 8), (1, 9), (1, 10), (1, 11), (2, 3), (2, 4), (2, 5), (2, 6), (2, 7), (2, 8), (2, 9), (2, 10), (2, 11), (3, 3), (3, 4), (3, 5), (3, 6), (3, 7), (3, 8), (3, 9), (3, 10), (3, 11), (4, 3), (4, 4), (4, 5), (4, 6), (4, 7), (4, 8), (4, 9), (4, 10), (4, 11), (5, 3), (5, 4), (5, 5), (5, 6), (5, 7), (5, 8), (5, 9), (5, 10), (5, 11), (6, 3), (6, 4), (6, 5), (6, 6), (6, 7), (6, 8), (6, 9), (6, 10), (6, 11), (7, 3), (7, 4), (7, 5), (7, 6), (7, 7), (7, 8), (7, 9), (7, 10), (7, 11), (8, 3), (8, 4), (8, 5), (8, 6), (8, 7), (8, 8), (8, 9), (8, 10), (8, 11), (9, 3), (9, 4), (9, 5), (9, 6), (9, 7), (9, 8), (9, 9), (9, 10), (9, 11)]


#### Distinct Transformation
* Return a new RDD containing distinct items from the original RDD (omitting all duplicates)

In [0]:
x = sc.parallelize([1,2,3,3,4,5,5,5,6,6,6,6,7,7,7,3,2,1,8,9,4])
y = x.distinct()
print(y.collect())

[8, 1, 9, 2, 3, 4, 5, 6, 7]


#### coalesce(numPartitions)	 Transformation
* Decrease the number of partitions in the RDD to numPartitions. 
* Useful for running operations more efficiently after filtering down a large dataset.

In [0]:
%%time
x = sc.parallelize([1, 2, 3, 4, 5,6,7,8,9,5,4,5,6,8,9,10,12,14,12,12,14,15,16,44,44,45], 10)
y = x.coalesce(4)
print(x.glom().collect())
print(y.glom().collect())
print(x.collect())

[[1, 2], [3, 4], [5, 6], [7, 8, 9, 5], [4, 5], [6, 8], [9, 10, 12, 14], [12, 12], [14, 15], [16, 44, 44, 45]]
[[1, 2, 3, 4], [5, 6, 7, 8, 9, 5, 4, 5], [6, 8, 9, 10, 12, 14], [12, 12, 14, 15, 16, 44, 44, 45]]
[1, 2, 3, 4, 5, 6, 7, 8, 9, 5, 4, 5, 6, 8, 9, 10, 12, 14, 12, 12, 14, 15, 16, 44, 44, 45]
CPU times: user 20.4 ms, sys: 7.87 ms, total: 28.3 ms
Wall time: 622 ms


In [0]:
%fs ls /user/hive/warehouse/mydb.db/dept_csv/

#### repartition(numPartitions)	Transformation
* Reshuffle the data in the RDD randomly to create either more or fewer partitions and balance it across them. 
* This always shuffles all data over the network. Repartition by column We can also repartition by columns.
* syntax: `repartition(numPartitions, *cols)`

In [0]:
x = sc.parallelize([1, 2, 3, 4, 5,6,7,8,9,10,11,12,13,14,15,12,434,545,65,3434,232,4545,234234,33,434,3434], 5)
y = x.repartition(8)
print(x.glom().collect())
print(x.getNumPartitions())
print(y.glom().collect())
print(y.getNumPartitions())

[[1, 2, 3, 4, 5], [6, 7, 8, 9, 10], [11, 12, 13, 14, 15], [12, 434, 545, 65, 3434], [232, 4545, 234234, 33, 434, 3434]]
5
[[11, 12, 13, 14, 15], [], [1, 2, 3, 4, 5, 6, 7, 8, 9, 10], [], [], [232, 4545, 234234, 33, 434, 3434], [12, 434, 545, 65, 3434], []]
8


#### PartitionBy Transformation
* Return a new RDD with the specified number of partitions, 
* placing original items into the partition returned by a user supplied function

In [0]:
x = sc.parallelize([('J','James'),('F','Fred'),('A','Anna'),('J','John'),('R','Ravi'),('E','Eswar'),('T','Tagore'),('F','FSDF')], 3)
y = x.partitionBy(2, lambda w: 0 if w[0] < 'H' else 1)
print (x.glom().collect())
print (y.glom().collect())
print('X RDD No.OF Partitiones : ',x.getNumPartitions())
print('Y RDD No.OF Partitiones : ',y.getNumPartitions())

[[('J', 'James'), ('F', 'Fred')], [('A', 'Anna'), ('J', 'John')], [('R', 'Ravi'), ('E', 'Eswar'), ('T', 'Tagore'), ('F', 'FSDF')]]
[[('F', 'Fred'), ('A', 'Anna'), ('E', 'Eswar'), ('F', 'FSDF')], [('J', 'James'), ('J', 'John'), ('R', 'Ravi'), ('T', 'Tagore')]]
X RDD No.OF Partitiones :  3
Y RDD No.OF Partitiones :  2


#### ZIP Transformation
* Return a new RDD containing pairs whose key is the item in the original RDD, and whose
* value is that item’s corresponding element (same partition, same index) in a second RDD

In [0]:
x= sc.parallelize([1,2,3,4,5,6])
y = sc.parallelize([4,6,8,10,12,6])
z = x.zip(y)
print(z.collect())

[(1, 4), (2, 6), (3, 8), (4, 10), (5, 12), (6, 6)]


In [0]:
x = sc.parallelize([1, 2, 3])
y = x.map(lambda n:n*n)
z = x.zip(y)
print('display X RDD Values: ',x.collect())
print('Display Y RDD Values: ',y.collect())
print(z.collect())

display X RDD Values:  [1, 2, 3]
Display Y RDD Values:  [1, 4, 9]
[(1, 1), (2, 4), (3, 9)]


In [0]:
x = sc.parallelize([1, 2, 3,5,6])
y = sc.parallelize([3,4,5,4,8])
z = x.zip(y)
print(z.collect())

[(1, 3), (2, 4), (3, 5), (5, 4), (6, 8)]


#### groupByKey Transformation in RDD
* When called on a dataset of (K, V) pairs, returns a dataset of (K, Iterable<V>) pairs.
* Note: If you are grouping in order to perform an aggregation (such as a sum or average) over each key, using reduceByKey or aggregateByKey will yield much better performance.
* Note: By default, the level of parallelism in the output depends on the number of partitions of the parent RDD. You can pass an optional numPartitions argument to set a different number of tasks.

In [0]:
x = sc.parallelize([('B',5),('B',4),('A',3),('A',2),('A',1),('C',4),('C',5),('B',4)])
y = x.groupByKey()
print(x.collect())
print(list((j[0], list(j[1])) for j in y.collect()))

[('B', 5), ('B', 4), ('A', 3), ('A', 2), ('A', 1), ('C', 4), ('C', 5), ('B', 4)]
[('B', [5, 4, 4]), ('C', [4, 5]), ('A', [3, 2, 1])]


#### JOINS
* join(self, other, numPartitions=None)
* Return an RDD containing all pairs of elements with matching keys in self and other.
* Each pair of elements will be returned as a (k, (v1, v2)) tuple, where (k, v1) is in self and (k, v2) is in other.
* Performs a hash join across the cluster.

In [0]:
x = sc.parallelize([("a", 1), ("b", 4)])
y = sc.parallelize([("a", 2), ("a", 3)])
z = x.join(y)
print(z.collect())
#print(sorted(z.collect()))

[('a', (1, 2)), ('a', (1, 3))]


#### leftOuterJoin(self, other, numPartitions=None)
 
* Perform a left outer join of self and other.
* For each element (k, v) in self, the resulting RDD will either contain all pairs (k, (v, w)) for w in other, or the pair (k, (v, None)) if no elements in other have key k.
* Hash-partitions the resulting RDD into the given number of partitions.

In [0]:
x = sc.parallelize([("a", 1), ("b", 4)])
y = sc.parallelize([("a", 2)])
z = x.leftOuterJoin(y)
print(z.collect())
#print(sorted(z.collect()))


[('b', (4, None)), ('a', (1, 2))]


#### rightOuterJoin(self, other, numPartitions=None)
* Perform a right outer join of self and other.
* For each element (k, w) in other, the resulting RDD will either contain all pairs (k, (v, w)) for v in this, or the pair (k, (None, w)) if no elements in self have key k.
* Hash-partitions the resulting RDD into the given number of partitions.

In [0]:
x = sc.parallelize([("a", 1)])
y = sc.parallelize([("a", 2),("c", 4)])
z = x.rightOuterJoin(y)
print(z.collect())
#print(sorted(z.collect()))

[('c', (None, 4)), ('a', (1, 2))]


#### fullOuterJoin(self, other, numPartitions=None)
* Perform a full outer join of self and other and it will return both matching and unmatching result set.

In [0]:
x = sc.parallelize([("a", 1), ("b", 4)])
y = sc.parallelize([("a", 2), ("c", 5)])
z = x.fullOuterJoin(y)
print(z.collect())
#print(sorted(z.collect()))

[('b', (4, None)), ('c', (None, 5)), ('a', (1, 2))]


#### cogroup(self, other, numPartitions=None)
* For each key k in self or other, return a resulting RDD that contains a tuple with the list of values for that key in self as well as other.

In [0]:
x = sc.parallelize([("a", 1), ("b", 4)])
y = sc.parallelize([("a", 2)])
z = x.cogroup(y)
[(x, list(map(list,y))) for x, y in z.collect()]

Out[63]: [('b', [[4], []]), ('a', [[1], [2]])]

#### What is glom()
* flattens elements on the same partition
* glom() Return an RDD created by coalescing all elements within each partition into an array.

#### Shuffle
* The Shuffle is an expensive operation since it involves disk I/O, data serialization, and network I/O. To organize data for the shuffle, Spark generates sets of tasks - map tasks to organize the data, and a set of reduce tasks to aggregate it. This nomenclature comes from MapReduce and does not directly relate to Spark’s map and reduce operations.

* Operations which can cause a shuffle include `repartition` operations like `repartition` and `coalesce`, ‘`ByKey` operations (except for counting) like `groupByKey` and `reduceByKey`, and `join` operations like cogroup and join.

#### GetNumPartitions()

In [0]:
x = sc.parallelize([1,2,3,4,5,6,7,8,9,10,12,122,134,1456,1212,121212,98], 5)
y = x.getNumPartitions()
print(x.glom().collect())
print(y)
print(x.collect())

[[1, 2, 3], [4, 5, 6], [7, 8, 9], [10, 12, 122], [134, 1456, 1212, 121212, 98]]
5
[1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 12, 122, 134, 1456, 1212, 121212, 98]


#### reduceByKey(func, [numPartitions])	
* When called on a dataset of (K, V) pairs, returns a dataset of (K, V) pairs where the values for each key are aggregated using the given reduce function func, which must be of type (V,V) => V. Like in groupByKey, the number of reduce tasks is configurable through an optional second argument.

* NOTE: Note If you are grouping using (groupbykey) in order to perform an aggregation (such as a sum or average) over each key, using `reduceByKey` or `aggregateByKey` will provide much better performance.

In [0]:
%%time
wordsRDD = sc.parallelize(['cat', 'elephant', 'rat', 'rat', 'cat'], 4)
split_rdd = wordsRDD.map(lambda w:(w,1))
result = split_rdd.reduceByKey(lambda x,y:x+y).collect()
print(split_rdd.collect())
result

[('cat', 1), ('elephant', 1), ('rat', 1), ('rat', 1), ('cat', 1)]
CPU times: user 30.2 ms, sys: 4.04 ms, total: 34.2 ms
Wall time: 565 ms
Out[65]: [('cat', 2), ('elephant', 1), ('rat', 2)]

In [0]:
type(result)

Out[66]: list

In [0]:
%%time
wordsList = ['cat', 'elephant', 'rat', 'rat', 'cat']
wordsRDD = sc.parallelize(wordsList, 4)
wordCountsCollected = (wordsRDD.map(lambda w: (w, 1)).reduceByKey(lambda x,y: x+y).collect())
print(wordsRDD.collect())
print(wordCountsCollected)

['cat', 'elephant', 'rat', 'rat', 'cat']
[('cat', 2), ('elephant', 1), ('rat', 2)]
CPU times: user 11.1 ms, sys: 14.8 ms, total: 25.9 ms
Wall time: 438 ms


## ACTIONS

#### reduce(func)  Action
* Aggregate the elements of the dataset using a function func (which takes two arguments and returns one). 
* The function should be commutative and associative so that it can be computed correctly in parallel.

In [0]:
# reduce numbers 1 to 10 by adding them up
x = sc.parallelize([1,2,3,4,5,6,7,8,9,10])
y = x.reduce(lambda a,b: a+b)
print(x.collect())
print(y)
type(y)

[1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
55
Out[68]: int

#### count()
* Return the number of elements in the dataset.

In [0]:
x = sc.parallelize([1,2,3,4,5,6,7,8,9,34,34,34,34,6,45,23,2,0])
x.count()

Out[70]: 18

#### first() Action
* Return the first element of the dataset (similar to take(1)).

In [0]:
x = sc.parallelize([66,77,55,44,33,1,2,3,4])
print(x.first())
print(x.take(4))

66
[66, 77, 55, 44]


#### takeSample(withReplacement, num, [seed])
* Return an array with a random sample of num elements of the dataset, with or without replacement,
* optionally pre-specifying a random number generator seed.
* Return a fixed-size sampled subset of this RDD 
* withReplacement whether sampling is done with replacement
* num             size of the returned sample
* seed            seed for the random number generator
* returns         sample of specified size in an array

In [0]:
rdd = sc.parallelize(range(0, 10))
print(rdd.takeSample(True, 50, 3))
print(rdd.takeSample(False, 20, 1))

print(rdd.takeSample(False, 5, 2))

print(rdd.takeSample(False, 8, 5))


[5, 4, 3, 1, 6, 3, 8, 3, 9, 8, 2, 2, 5, 4, 4, 3, 6, 3, 3, 0, 0, 0, 4, 1, 1, 9, 7, 3, 7, 1, 5, 0, 1, 6, 4, 0, 9, 9, 0, 6, 5, 8, 1, 1, 5, 4, 2, 6, 1, 2]
[6, 8, 9, 7, 5, 3, 0, 4, 1, 2]
[5, 9, 3, 4, 6]
[2, 3, 1, 0, 8, 7, 6, 5]


#### take(n) Action
* Return an array with the first n elements of the dataset.

In [0]:
x = sc.parallelize([1,2,3,4])
print('Take Action : ',x.take(2))
print('No of records :',x.count())

Take Action :  [1, 2]
No of records : 4


#### countByValue(self)
* Return the count of each unique value in this RDD as a dictionary of (value, count) pairs.

In [0]:
a={44,23,23,23}
type(a)

Out[96]: set

In [0]:
x = sc.parallelize([1, 2, 1, 2, 2,3,3,4,3,3,4,4,5,5,6,6])
y = x.countByValue().items()
print(y)
type(y)

dict_items([(1, 2), (2, 3), (3, 4), (4, 3), (5, 2), (6, 2)])
Out[75]: dict_items

#### isEmpty()
* Returns true if and only if the RDD contains no elements at all.
* note:: an RDD may be empty even when it has at least 1 partition

In [0]:
x =  sc.parallelize(range(10))
print(x.collect())
print(x.isEmpty())
y =  sc.parallelize(range(0))
print(y.collect())
print(y.isEmpty())

[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
False
[]
True


#### keys()
* Return an RDD with the keys of each tuple.

In [0]:
x = sc.parallelize([(1, 2), (3, 4),(5,55),(6,66)])
y = x.keys()
print(y.collect())

[1, 3, 5, 6]


#### saveAsTextFile(path, compressionCodecClass=None)
* Save the RDD to the filesystem indicated in the path

In [0]:
dbutils.fs.rm("dbfs:/tmp/test_data/",True)


Out[78]: False

In [0]:
rdd_text = sc.textFile('dbfs:/tmp/test_data/saveAs/part-00007')

In [0]:
rdd_text.collect()

[0;31m---------------------------------------------------------------------------[0m
[0;31mPy4JJavaError[0m                             Traceback (most recent call last)
File [0;32m<command-3737989515766202>:1[0m
[0;32m----> 1[0m [43mrdd_text[49m[38;5;241;43m.[39;49m[43mcollect[49m[43m([49m[43m)[49m

File [0;32m/databricks/spark/python/pyspark/instrumentation_utils.py:48[0m, in [0;36m_wrap_function.<locals>.wrapper[0;34m(*args, **kwargs)[0m
[1;32m     46[0m start [38;5;241m=[39m time[38;5;241m.[39mperf_counter()
[1;32m     47[0m [38;5;28;01mtry[39;00m:
[0;32m---> 48[0m     res [38;5;241m=[39m [43mfunc[49m[43m([49m[38;5;241;43m*[39;49m[43margs[49m[43m,[49m[43m [49m[38;5;241;43m*[39;49m[38;5;241;43m*[39;49m[43mkwargs[49m[43m)[49m
[1;32m     49[0m     logger[38;5;241m.[39mlog_success(
[1;32m     50[0m         module_name, class_name, function_name, time[38;5;241m.[39mperf_counter() [38;5;241m-[39m start, signature
[1;32m 

In [0]:
%fs ls dbfs:/tmp/test_data/saveAs/

In [0]:
x = sc.parallelize(['Raveendra','Eswar','Vamsi','Lakshmi','Vinod'])
x.saveAsTextFile("dbfs:/tmp/test_data/saveAs")

In [0]:
%fs ls dbfs:/tmp/test_data/saveAs/

path,name,size,modificationTime
dbfs:/tmp/test_data/saveAs/_SUCCESS,_SUCCESS,0,1718288990000
dbfs:/tmp/test_data/saveAs/part-00000,part-00000,0,1718288990000
dbfs:/tmp/test_data/saveAs/part-00001,part-00001,10,1718288990000
dbfs:/tmp/test_data/saveAs/part-00002,part-00002,0,1718288990000
dbfs:/tmp/test_data/saveAs/part-00003,part-00003,6,1718288990000
dbfs:/tmp/test_data/saveAs/part-00004,part-00004,6,1718288990000
dbfs:/tmp/test_data/saveAs/part-00005,part-00005,0,1718288990000
dbfs:/tmp/test_data/saveAs/part-00006,part-00006,8,1718288990000
dbfs:/tmp/test_data/saveAs/part-00007,part-00007,6,1718288990000


In [0]:
y = sc.textFile("dbfs:/tmp/test_data/saveAs")
print(y.collect())

['Raveendra', 'Eswar', 'Vamsi', 'Lakshmi', 'Vinod']


#### saveAsPickleFile(self, path, batchSize=10)
* Save this RDD as a SequenceFile of serialized objects. The serializer
* used is :class:`pyspark.serializers.PickleSerializer`, default batch size is 10.

In [0]:
dbutils.fs.rm("dbfs:/tmp/test_data/picklefile/",True)


Out[83]: False

In [0]:
x = sc.parallelize(['Raveendra','Eswar','Vinod','Lakshmi','Vamsi'])
x.saveAsPickleFile("dbfs:/tmp/test_data/picklefile")




In [0]:
%fs ls dbfs:/tmp/test_data/picklefile

path,name,size,modificationTime
dbfs:/tmp/test_data/picklefile/_SUCCESS,_SUCCESS,0,1718288995000
dbfs:/tmp/test_data/picklefile/part-00000,part-00000,95,1718288995000
dbfs:/tmp/test_data/picklefile/part-00001,part-00001,185,1718288995000
dbfs:/tmp/test_data/picklefile/part-00002,part-00002,95,1718288995000
dbfs:/tmp/test_data/picklefile/part-00003,part-00003,181,1718288995000
dbfs:/tmp/test_data/picklefile/part-00004,part-00004,181,1718288995000
dbfs:/tmp/test_data/picklefile/part-00005,part-00005,95,1718288995000
dbfs:/tmp/test_data/picklefile/part-00006,part-00006,183,1718288995000
dbfs:/tmp/test_data/picklefile/part-00007,part-00007,181,1718288995000


In [0]:
text_rdd = sc.textFile('dbfs:/tmp/test_data/picklefile/part-00004')
text_rdd.collect()

Out[85]: ['SEQ\x06!org.apache.hadoop.io.NullWritable"org.apache.hadoop.io.BytesWritable\x00\x00\x00\x00\x00\x00U�\x197Śl�$|��H��G\x00\x00\x00N\x00\x00\x00\x00\x00\x00\x00J��\x00\x05ur\x00\x03[[BK�\x19\x15gg�7\x02\x00\x00xp\x00\x00\x00\x01ur\x00\x02[B��\x17�\x06\x08T�\x02\x00\x00xp\x00\x00\x00\x17�\x05�\x0c\x00\x00\x00\x00\x00\x00\x00]��\x05Vinod�a.']

In [0]:
y = sc.pickleFile("dbfs:/tmp/test_data/picklefile")
print(y.collect())

['Raveendra', 'Eswar', 'Vinod', 'Lakshmi', 'Vamsi']


In [0]:
%fs ls dbfs:/tmp/test_data/picklefile/


path,name,size,modificationTime
dbfs:/tmp/test_data/picklefile/_SUCCESS,_SUCCESS,0,1718288995000
dbfs:/tmp/test_data/picklefile/part-00000,part-00000,95,1718288995000
dbfs:/tmp/test_data/picklefile/part-00001,part-00001,185,1718288995000
dbfs:/tmp/test_data/picklefile/part-00002,part-00002,95,1718288995000
dbfs:/tmp/test_data/picklefile/part-00003,part-00003,181,1718288995000
dbfs:/tmp/test_data/picklefile/part-00004,part-00004,181,1718288995000
dbfs:/tmp/test_data/picklefile/part-00005,part-00005,95,1718288995000
dbfs:/tmp/test_data/picklefile/part-00006,part-00006,183,1718288995000
dbfs:/tmp/test_data/picklefile/part-00007,part-00007,181,1718288995000


#### STDEV()
* Return the standard deviation of the items in the RDD

In [0]:
x = sc.parallelize([2,4,1])
y = x.stdev()
print(x.collect())
print(y)

[2, 4, 1]
1.247219128924647


#### MIN()
* Return the MIN value of the items in the RDD

In [0]:
x = sc.parallelize([2,4,1,3,5,6,7])
y = x.min()
print(x.collect())
print(y)

[2, 4, 1, 3, 5, 6, 7]
1


#### MAX()
* REturn the MAX Value of the items in the RDD

In [0]:
x = sc.parallelize([2,4,1,55,66,77,8845,454545,])
y = x.max()
print(x.collect())
print(y)

[2, 4, 1, 55, 66, 77, 8845, 454545]
454545


#### MEAN()
* Return the mean of the items in the RDD

In [0]:
x = sc.parallelize([2,4,1])
y = x.mean()
print(x.collect())
print(y)

[2, 4, 1]
2.3333333333333335


#### SUM()
* Return the Sum of the items in the RDD

In [0]:
x = sc.parallelize([2,4,1,55,44,34,34,344])
y = x.sum()
print(x.collect())
print(y)

[2, 4, 1, 55, 44, 34, 34, 344]
518


#### Stats()
* Return a StatCounter object that captures the mean, variance and count of the RDD’s elements in one operation.

In [0]:
list_rdd = sc.parallelize([1,2,3,4,5,6,7,8,9,10,11,12,13,14,15])
list_rdd.stats()

Out[92]: (count: 15, mean: 8.0, stdev: 4.320493798938574, max: 15.0, min: 1.0)

In [0]:
my_rdd = sc.parallelize(['test1','test2','test3'])

In [0]:
help(my_rdd)

Help on RDD in module pyspark.rdd object:

class RDD(typing.Generic)
 |  RDD(jrdd: 'JavaObject', ctx: 'SparkContext', jrdd_deserializer: pyspark.serializers.Serializer = AutoBatchedSerializer(CloudPickleSerializer()))
 |  
 |  A Resilient Distributed Dataset (RDD), the basic abstraction in Spark.
 |  Represents an immutable, partitioned collection of elements that can be
 |  operated on in parallel.
 |  
 |  Method resolution order:
 |      RDD
 |      typing.Generic
 |      builtins.object
 |  
 |  Methods defined here:
 |  
 |  __add__(self: 'RDD[T]', other: 'RDD[U]') -> 'RDD[Union[T, U]]'
 |      Return the union of this RDD and another one.
 |      
 |      Examples
 |      --------
 |      >>> rdd = sc.parallelize([1, 1, 2, 3])
 |      >>> (rdd + rdd).collect()
 |      [1, 1, 2, 3, 1, 1, 2, 3]
 |  
 |  __getnewargs__(self) -> NoReturn
 |  
 |  __init__(self, jrdd: 'JavaObject', ctx: 'SparkContext', jrdd_deserializer: pyspark.serializers.Serializer = AutoBatchedSerializer(CloudPick