<a href="https://colab.research.google.com/github/MWFK/Databricks-Study-Materials/blob/main/00-%20PySpark_RDD_Cheat_Sheet.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

### Libs

In [None]:
pip install pyspark

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/


### Configuration

In [None]:
from pyspark import SparkConf
conf = (SparkConf()
                  .setMaster('local[3]')
                  .setAppName('First App')
                  .set('spark.executor.memory','1g'))

### Initializing Spark

In [None]:
### Spark Context
from pyspark import SparkContext
sc = SparkContext(conf = conf)

### Inspect SparkContext

In [None]:
print([1], sc.version)              # Retrieve SparkContext version
print([2], sc.pythonVer)            # Retrieve Python version
print([3], sc.master)               # Master URL to connect to
print([4], str(sc.sparkHome))       # Path where Spark is installed on worker nodes
print([5], str(sc.sparkUser()))     # Retrieve name of the Spark User running SparkContext
print([6], sc.appName)              # Return application name
print([7], sc.applicationId)        # Retrieve application ID
print([8], sc.defaultParallelism)   # Return default level of parallelism
print([9], sc.defaultMinPartitions) # Default minimum number of partitions forRDDs

[1] 3.3.0
[2] 3.7
[3] local[3]
[4] None
[5] root
[6] First App
[7] local-1656579071118
[8] 3
[9] 2


### Using Shell

In [25]:
# %%shell
# pyspark --master local[4]

### Using Spark Session instead Spark Context

In [None]:
!apt-get update
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q http://archive.apache.org/dist/spark/spark-2.3.1/spark-2.3.1-bin-hadoop2.7.tgz
!tar xf spark-2.3.1-bin-hadoop2.7.tgz
!pip install -q findspark

import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-2.3.1-bin-hadoop2.7"

!ls

import findspark
findspark.init()

import pyspark
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate() 
spark

### Parallelized Collections

In [84]:
rdd = sc.parallelize([('x',10),('x',11),('y',10),('b',2),('c',2)])
rdd3 = sc.parallelize(range(100))
rdd4 = sc.parallelize([( 'a',[ 'x', 'y', 'z']),( 'b',[ 'p', 'r'])])

### External Data

In [35]:
textFile = sc.textFile('/my/directory/*.txt')
textFile2 = sc.wholeTextFiles('/my/directory/')

### Retrieving RDD information

In [52]:
print(rdd.getNumPartitions()) # List the number of partitions
print(rdd.count())            # Count RDD instances
print(rdd.countByKey())       # Count RDD instances by key
print(rdd.countByValue())     # Count RDD instances by value
print(rdd.collectAsMap())     # Return (key, value) pairs as a dictionary
print(rdd3.sum())             # Sum of RDD elements 
print(rdd.isEmpty())          # Check whether RDD is empty
print(rdd3.max())             # Maximum value of RDD elements
print(rdd3.mean())            # Mean value of RDD elements
print(rdd3.stdev())           # Standard Deviation (how dispersed the data is in relation to the mean.) of RDD elements
print(rdd3.variance())        # Variance (Variance is the average squared deviations from the mean, while standard deviation is the square root of this number. ) value of RDD elements
print(rdd3.stats())           # Summary statistics (count, mean, stdev, max & min)
print(rdd3.histogram(3))      # Compute histogram by bins

3
3
defaultdict(<class 'int'>, {'a': 1, 'b': 1, 'c': 1})
defaultdict(<class 'int'>, {('a', 7): 1, ('b', 2): 1, ('c', 2): 1})
{'a': 7, 'b': 2, 'c': 2}
4950
False
99
49.5
28.86607004772212
833.25
(count: 100, mean: 49.5, stdev: 28.86607004772212, max: 99.0, min: 0.0)
([0, 33, 66, 99], [33, 33, 34])


### Applying Functions

In [70]:
# Apply a function to each RDD element
print(rdd.map(lambda x: (x[1],x[0])).collect()) # Inverse elements
# Collect() is the function, operation for RDD or Dataframe that is used to retrieve the data from the Dataframe. It is used useful in retrieving all the elements of the row from each partition in an RDD and brings that over the driver node/program.

# Apply a function to each RDD element and flatten the result
print(rdd.flatMap(lambda x: (x[1],x[0])).collect())

#Apply a flatMap function to each (key,value) pair of rdd4 without changing the keys
print(rdd4.flatMapValues(lambda x: x).collect())

[(7, 'a'), (2, 'b'), (2, 'c')]
[7, 'a', 2, 'b', 2, 'c']
[('a', 'x'), ('a', 'y'), ('a', 'z'), ('b', 'p'), ('b', 'r')]


### Selecting Data

In [85]:
print(rdd.collect()) # Return a list with all RDD elements
print(rdd.take(2))   # Take first 2 RDD elements
print(rdd.first())   # Take first element
print(rdd.top(2))    # ASCII sorting of the top elememts

[('x', 10), ('x', 11), ('y', 10), ('b', 2), ('c', 2)]
[('x', 10), ('x', 11)]
('x', 10)
[('y', 10), ('x', 11)]


In [91]:
# Return sampled subset of rdd3 sample(frac, seed)
print(rdd3.sample(False, 0.15, 50).collect())
print(rdd3.sample(False, 0.15, 10).collect())

[2, 5, 10, 12, 17, 26, 29, 37, 61, 71, 72, 75, 86, 97, 98, 99]
[5, 14, 19, 20, 21, 29, 33, 44, 45, 54, 55, 60, 64, 67, 81, 85, 86, 88, 95, 97]


In [94]:
# Filter the RDD
rdd.filter(lambda x: 'a' in x).collect()

# Return distinct RDD values
rdd.distinct().collect()

# Return (key,value) RDD's keys
rdd.keys().collect()

['x', 'x', 'y', 'b', 'c']

In [97]:
# Apply a function to all RDD elements
def g(x): print(x)
rdd.foreach(g)

### Reshaping Data

In [111]:
# Reduce
print(rdd.reduceByKey(lambda x,y : x+y).collect()) # Merge the rdd values for each key 
print(rdd.reduce(lambda x,y : x+y))                # Merge the rdd values

(4950, 100)


In [112]:
# Group By
print(rdd3.groupBy(lambda x: x %2).flatMapValues(list).collect()) # Return RDD of grouped values
print(rdd.groupByKey().mapValues(list).collect())                 # Group rdd by key

[(0, 0), (0, 2), (0, 4), (0, 6), (0, 8), (0, 10), (0, 12), (0, 14), (0, 16), (0, 18), (0, 20), (0, 22), (0, 24), (0, 26), (0, 28), (0, 30), (0, 32), (0, 34), (0, 36), (0, 38), (0, 40), (0, 42), (0, 44), (0, 46), (0, 48), (0, 50), (0, 52), (0, 54), (0, 56), (0, 58), (0, 60), (0, 62), (0, 64), (0, 66), (0, 68), (0, 70), (0, 72), (0, 74), (0, 76), (0, 78), (0, 80), (0, 82), (0, 84), (0, 86), (0, 88), (0, 90), (0, 92), (0, 94), (0, 96), (0, 98), (1, 1), (1, 3), (1, 5), (1, 7), (1, 9), (1, 11), (1, 13), (1, 15), (1, 17), (1, 19), (1, 21), (1, 23), (1, 25), (1, 27), (1, 29), (1, 31), (1, 33), (1, 35), (1, 37), (1, 39), (1, 41), (1, 43), (1, 45), (1, 47), (1, 49), (1, 51), (1, 53), (1, 55), (1, 57), (1, 59), (1, 61), (1, 63), (1, 65), (1, 67), (1, 69), (1, 71), (1, 73), (1, 75), (1, 77), (1, 79), (1, 81), (1, 83), (1, 85), (1, 87), (1, 89), (1, 91), (1, 93), (1, 95), (1, 97), (1, 99)]
[('x', [10, 11]), ('y', [10]), ('b', [2]), ('c', [2])]


In [117]:
# Aggregating
seqOp = lambda x,y: (x[0]+y, x[1]+1)
print(seqOp)
combOp = lambda x,y: (x[0]+y[0], x[1]+y[1])
print(combOp)
print(rdd3.aggregate((0,0),seqOp,combOp)) # #Aggregate RDD elements of each partition and then the results

<function <lambda> at 0x7f5ffcc305f0>
<function <lambda> at 0x7f5ffcc30ef0>
(4950, 100)


### Mathematical Operations

In [124]:
print(rdd.subtract(rdd3).collect())       # return each rdd value not contained in rdd3
print(rdd3.substractByKey(rdd).collect()) # return each (key, value) pair of rdd3 with no matching key in rdd
print(rdd.cartesian(rdd3).collect()) # return the cartesian product of rdd and rdd#

[('x', 11), ('b', 2), ('c', 2), ('x', 10), ('y', 10)]


### Sort


In [125]:
print(rdd.sortBy(lambda x : x[1]).collect())
print(rdd.sortByKey().collect())

[('b', 2), ('c', 2), ('x', 10), ('y', 10), ('x', 11)]

### Repartitioning

In [126]:
print(rdd.repartition(4))
print(rdd.coalesce(1))

MapPartitionsRDD[365] at coalesce at NativeMethodAccessorImpl.java:0
CoalescedRDD[366] at coalesce at NativeMethodAccessorImpl.java:0


### Saving

In [None]:
rdd.saveAsTextFile('rdd.txt')
#rdd.saveAsHadoopFile("hdfs://namenodehost/parent/child",'org.apache.hadoop.mapred.TextOutputFormat')

### Stopping Spark Context

In [128]:
sc.stop()

### Execution

In [131]:
# %%bash
# ./bin/spark-submit examples/src/main/python/pi.py